This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <tho...@us...> - 2013-11-11 15:31:30
|
Revision: 7526 http://bigdata.svn.sourceforge.net/bigdata/?rev=7526&view=rev Author: thompsonbry Date: 2013-11-11 15:31:24 +0000 (Mon, 11 Nov 2013) Log Message: ----------- added log @ INFO to bracket test start/end in the HA CI test suite. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2013-11-11 13:23:50 UTC (rev 7525) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2013-11-11 15:31:24 UTC (rev 7526) @@ -155,7 +155,7 @@ @Override protected void setUp() throws Exception { - + super.setUp(); if(log.isInfoEnabled()) log.info("---- TEST START "+getName() + "----"); executorService = Executors .newCachedThreadPool(new DaemonThreadFactory(getName())); @@ -209,7 +209,7 @@ oldProcessHelperLevel = null; } - + if(log.isInfoEnabled()) log.info("---- TEST END "+getName() + "----"); } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-11 13:23:57
|
Revision: 7525 http://bigdata.svn.sourceforge.net/bigdata/?rev=7525&view=rev Author: thompsonbry Date: 2013-11-11 13:23:50 +0000 (Mon, 11 Nov 2013) Log Message: ----------- adding commit counter to the commitNow() log @ INFO. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-10 16:21:06 UTC (rev 7524) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-11 13:23:50 UTC (rev 7525) @@ -3868,8 +3868,8 @@ final long elapsedNanos = System.nanoTime() - cs.beginNanos; if (BigdataStatics.debug || log.isInfoEnabled()) { - final String msg = "commit: commitTime=" - + cs.commitTime + final String msg = "commit: commitTime=" + cs.commitTime + + ", commitCounter=" + cs.newCommitCounter + ", latency=" + TimeUnit.NANOSECONDS.toMillis(elapsedNanos); // + ", nextOffset=" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-10 16:21:12
|
Revision: 7524 http://bigdata.svn.sourceforge.net/bigdata/?rev=7524&view=rev Author: thompsonbry Date: 2013-11-10 16:21:06 +0000 (Sun, 10 Nov 2013) Log Message: ----------- Further modified to copy reggie.jar into bigdata-test/lib for lookupStarter start/stop. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/build.xml Modified: branches/BIGDATA_RELEASE_1_3_0/build.xml =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/build.xml 2013-11-10 16:11:10 UTC (rev 7523) +++ branches/BIGDATA_RELEASE_1_3_0/build.xml 2013-11-10 16:21:06 UTC (rev 7524) @@ -1715,6 +1715,7 @@ <delete file="${bigdata-test.lib}/jsk-platform.jar" quiet="true" /> <delete file="${bigdata-test.lib}/jsk-lib.jar" quiet="true" /> <delete file="${bigdata-test.lib}/start.jar" quiet="true" /> + <delete file="${bigdata-test.lib}/reggie.jar" quiet="true" /> <delete file="${bigdata-test.lib}/bigdata.jar" quiet="true" /> <copy file="${dist.lib}/log4j.jar" @@ -1725,6 +1726,8 @@ todir="${bigdata-test.lib}" /> <copy file="${dist.lib}/start.jar" todir="${bigdata-test.lib}" /> + <copy file="${dist.lib}/reggie.jar" + todir="${bigdata-test.lib}" /> <copy file="${dist.lib}/bigdata.jar" todir="${bigdata-test.lib}" /> </target> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-10 16:11:17
|
Revision: 7523 http://bigdata.svn.sourceforge.net/bigdata/?rev=7523&view=rev Author: thompsonbry Date: 2013-11-10 16:11:10 +0000 (Sun, 10 Nov 2013) Log Message: ----------- Apparently com.sun.jini.reggie.ConstrainableRegistrarProxy was in reggie.jar and that file was not included in the lookupstarter.jar manifest. Including that jar in the classpath appears to be enough for the LUS to start. I am going to commit this change to CI and then verify that lookup is properly started and stopped on ci.bigdata.com Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/build.xml Modified: branches/BIGDATA_RELEASE_1_3_0/build.xml =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/build.xml 2013-11-10 15:01:49 UTC (rev 7522) +++ branches/BIGDATA_RELEASE_1_3_0/build.xml 2013-11-10 16:11:10 UTC (rev 7523) @@ -1693,7 +1693,7 @@ <jar destfile="${bigdata-test.lib}/lookupstarter.jar" index="false"> <manifest> <attribute name="Manifest-Version" value="1.0" /> - <attribute name="Class-Path" value="log4j.jar jsk-platform.jar jsk-lib.jar start.jar bigdata.jar" /> + <attribute name="Class-Path" value="log4j.jar jsk-platform.jar jsk-lib.jar start.jar reggie.jar bigdata.jar" /> <attribute name="Main-Class" value="com.bigdata.service.jini.util.LookupStarter" /> </manifest> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-10 15:01:56
|
Revision: 7522 http://bigdata.svn.sourceforge.net/bigdata/?rev=7522&view=rev Author: thompsonbry Date: 2013-11-10 15:01:49 +0000 (Sun, 10 Nov 2013) Log Message: ----------- Bug fix to test case. It was failing to ensure that the default file did not exist before running the zero-argument BigdataSail constructor. This was failing the test on AWS using EBS. Added Options.DEFAULT_FILE to BigdataSail to make this default value visible. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBootstrapBigdataSail.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2013-11-09 14:42:27 UTC (rev 7521) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2013-11-10 15:01:49 UTC (rev 7522) @@ -377,6 +377,15 @@ public static final String DESCRIBE_STATEMENT_LIMIT = BigdataSail.class .getPackage().getName() + ".describeIterationStatementLimit"; + /** + * The name of the default value used for the + * {@link Journal.Options#FILE} property by the + * {@link BigdataSail#BigdataSail()} convenience constructor. + * + * @see BigdataSail#BigdataSail() + */ + public static final String DEFAULT_FILE = "bigdata" + JNL; + } /** @@ -557,9 +566,9 @@ */ private static Properties getDefaultProperties() { - Properties properties = new Properties(); + final Properties properties = new Properties(); - properties.setProperty(Options.FILE, "bigdata" + Options.JNL); + properties.setProperty(Options.FILE, Options.DEFAULT_FILE); return properties; Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBootstrapBigdataSail.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBootstrapBigdataSail.java 2013-11-09 14:42:27 UTC (rev 7521) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBootstrapBigdataSail.java 2013-11-10 15:01:49 UTC (rev 7522) @@ -29,6 +29,7 @@ import info.aduna.iteration.CloseableIteration; import java.io.File; +import java.io.IOException; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -76,7 +77,7 @@ /** * @param arg0 */ - public TestBootstrapBigdataSail(String arg0) { + public TestBootstrapBigdataSail(final String arg0) { super(arg0); } @@ -84,13 +85,32 @@ * Test create and shutdown of the default store. * * @throws SailException + * @throws IOException */ - public void test_ctor_1() throws SailException { + public void test_ctor_1() throws SailException, IOException { + final File file = new File(BigdataSail.Options.DEFAULT_FILE); + + /* + * If the default file exists, then delete it before creating the SAIL. + */ + if (file.exists()) { + + if (!file.delete()) { + + throw new IOException("Unable to remove default file:" + file); + + } + + } + final BigdataSail sail = new BigdataSail(); try { + if (!file.exists()) + fail("Expected file does not exist: " + file); + sail.initialize(); sail.shutDown(); @@ -111,15 +131,15 @@ public void test_ctor_2() throws SailException { final File file = new File(getName() + Options.JNL); - - if(file.exists()) { - - if(!file.delete()) { - + + if (file.exists()) { + + if (!file.delete()) { + fail("Could not delete file before test: " + file); } - + } final Properties properties = new Properties(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-09 14:42:33
|
Revision: 7521 http://bigdata.svn.sourceforge.net/bigdata/?rev=7521&view=rev Author: thompsonbry Date: 2013-11-09 14:42:27 +0000 (Sat, 09 Nov 2013) Log Message: ----------- Added the file to the thrown exceptions. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/FileMetadata.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/FileMetadata.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2013-11-09 14:42:00 UTC (rev 7520) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2013-11-09 14:42:27 UTC (rev 7521) @@ -990,13 +990,13 @@ */ magic = raf.readInt(); } catch (IOException ex) { - throw new RuntimeException("Can not read magic. Is file locked by another process?", ex); + throw new RuntimeException("Can not read magic. Is file locked by another process? file="+file, ex); } if (magic != MAGIC) - throw new RuntimeException("Bad journal magic: expected=" + MAGIC + ", actual=" + magic); + throw new RuntimeException("Bad journal magic: file="+file+", expected=" + MAGIC + ", actual=" + magic); version = raf.readInt(); if (version != VERSION1) - throw new RuntimeException("Bad journal version: expected=" + VERSION1 + ", actual=" + version); + throw new RuntimeException("Bad journal version: file="+file+", expected=" + VERSION1 + ", actual=" + version); /* * Check root blocks (magic, timestamps), choose root block, read @@ -1081,7 +1081,7 @@ if (closeTime != 0L && !readOnly) { - throw new RuntimeException("Journal is closed for writes: closedTime=" + closeTime); + throw new RuntimeException("Journal is closed for writes: file="+file+", closedTime=" + closeTime); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-09 14:42:07
|
Revision: 7520 http://bigdata.svn.sourceforge.net/bigdata/?rev=7520&view=rev Author: thompsonbry Date: 2013-11-09 14:42:00 +0000 (Sat, 09 Nov 2013) Log Message: ----------- TestCase3: final attribute and @Override annotation. com.bigdata.services test suite was failing to clean up after itself. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/io/TestCase3.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerBootstrapTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerTestCase.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/io/TestCase3.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/io/TestCase3.java 2013-11-09 14:00:45 UTC (rev 7519) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/io/TestCase3.java 2013-11-09 14:42:00 UTC (rev 7520) @@ -66,7 +66,7 @@ /** * @param name */ - public TestCase3(String name) { + public TestCase3(final String name) { super(name); } @@ -78,6 +78,7 @@ } + @Override protected void tearDown() throws Exception { super.tearDown(); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerBootstrapTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerBootstrapTestCase.java 2013-11-09 14:00:45 UTC (rev 7519) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerBootstrapTestCase.java 2013-11-09 14:42:00 UTC (rev 7520) @@ -51,7 +51,7 @@ /** * @param arg0 */ - public AbstractResourceManagerBootstrapTestCase(String arg0) { + public AbstractResourceManagerBootstrapTestCase(final String arg0) { super(arg0); } @@ -85,7 +85,8 @@ /** * Sets up the per-test data directory. */ - public void setUp() throws Exception { + @Override + protected void setUp() throws Exception { super.setUp(); @@ -106,4 +107,62 @@ } + @Override + protected void tearDown() throws Exception { + + super.tearDown(); + + if (dataDir != null) { + + recursiveDelete(dataDir); + + } + + dataDir = null; + journalsDir = null; + segmentsDir = null; + tmpDir = null; + + } + + + /** + * Recursively removes any files and subdirectories and then removes the + * file (or directory) itself. + * + * @param f + * A file or directory. + */ + private void recursiveDelete(final File f) { + + if (f.isDirectory()) { + + final File[] children = f.listFiles(); + + if (children == null) { + + // No such file or directory exists. + return; + + } + + for (int i = 0; i < children.length; i++) { + + recursiveDelete(children[i]); + + } + + } + + if (log.isInfoEnabled()) + log.info("Removing: " + f); + + if (f.exists() && !f.delete()) { + + log.warn("Could not remove: " + f); + + } + + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerTestCase.java 2013-11-09 14:00:45 UTC (rev 7519) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerTestCase.java 2013-11-09 14:42:00 UTC (rev 7520) @@ -138,7 +138,8 @@ /** * Setup test fixtures. */ - public void setUp() throws Exception { + @Override + protected void setUp() throws Exception { super.setUp(); @@ -209,7 +210,8 @@ } - public void tearDown() throws Exception { + @Override + protected void tearDown() throws Exception { if(executorService != null) executorService.shutdownNow(); @@ -233,6 +235,8 @@ txService.destroy(); } + super.tearDown(); + } /** @@ -663,7 +667,7 @@ * @throws ExecutionException * @throws InterruptedException */ - protected void registerIndex(String name) throws InterruptedException, ExecutionException { + protected void registerIndex(final String name) throws InterruptedException, ExecutionException { final IndexMetadata indexMetadata = new IndexMetadata(name, UUID.randomUUID()); { @@ -699,7 +703,7 @@ * @param expected * @param actual */ - protected void assertSameResources(IRawStore[] expected, Set<UUID> actual) { + protected void assertSameResources(final IRawStore[] expected, final Set<UUID> actual) { if(log.isInfoEnabled()) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-09 14:00:52
|
Revision: 7519 http://bigdata.svn.sourceforge.net/bigdata/?rev=7519&view=rev Author: thompsonbry Date: 2013-11-09 14:00:45 +0000 (Sat, 09 Nov 2013) Log Message: ----------- Added delegate to trace to see which proxy version of this test fails in CI. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestConcurrentKBCreate.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestConcurrentKBCreate.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestConcurrentKBCreate.java 2013-11-09 12:51:08 UTC (rev 7518) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestConcurrentKBCreate.java 2013-11-09 14:00:45 UTC (rev 7519) @@ -369,9 +369,9 @@ errorCount.incrementAndGet(); log.error(t); + // Added delegate to trace to see which proxy version of this test fails in CI. + throw new RuntimeException("delegate="+getOurDelegate()+", t=" + t, t); - throw new RuntimeException(t); - } finally { if (conn != null) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-09 12:51:17
|
Revision: 7518 http://bigdata.svn.sourceforge.net/bigdata/?rev=7518&view=rev Author: thompsonbry Date: 2013-11-09 12:51:08 +0000 (Sat, 09 Nov 2013) Log Message: ----------- Properties object was not being passed in to initialize the BigdataSail instance of the BigdataGraphFixture. This was noticed when running CI on EC2 EBS was causing OverlappingFileLock exceptions in TestSSSP. It is my expectation that Amazon EBS behaves poorly when re-openning channels. We might need to try using instance store for running CI (and HA). com.bigdata.rdf.graph.impl.bd.TestSSSP.testSSSP (from com.bigdata.rdf.TestAll) java.lang.RuntimeException: FileLock Overlap at com.bigdata.journal.FileMetadata.reopenChannel(FileMetadata.java:1245) at com.bigdata.journal.FileMetadata.access$000(FileMetadata.java:58) at com.bigdata.journal.FileMetadata$1.reopenChannel(FileMetadata.java:1163) at com.bigdata.journal.FileMetadata$1.reopenChannel(FileMetadata.java:1153) at com.bigdata.journal.FileMetadata.<init>(FileMetadata.java:946) at com.bigdata.journal.FileMetadata.createInstance(FileMetadata.java:1470) at com.bigdata.journal.AbstractJournal.<init>(AbstractJournal.java:1140) at com.bigdata.journal.Journal.<init>(Journal.java:217) at com.bigdata.journal.Journal.<init>(Journal.java:210) at com.bigdata.rdf.sail.BigdataSail.createLTS(BigdataSail.java:651) at com.bigdata.rdf.sail.BigdataSail.<init>(BigdataSail.java:630) at com.bigdata.rdf.sail.BigdataSail.<init>(BigdataSail.java:618) at com.bigdata.rdf.graph.impl.bd.BigdataGraphFixture.<init>(BigdataGraphFixture.java:20) at com.bigdata.rdf.graph.impl.bd.AbstractBigdataGraphTestCase$1.newGraphFixture(AbstractBigdataGraphTestCase.java:61) at com.bigdata.rdf.graph.AbstractGraphTestCase.setUp(AbstractGraphTestCase.java:74) Caused by: java.nio.channels.OverlappingFileLockException at sun.nio.ch.FileChannelImpl$SharedFileLockTable.checkList(FileChannelImpl.java:1166) at sun.nio.ch.FileChannelImpl$SharedFileLockTable.add(FileChannelImpl.java:1068) at sun.nio.ch.FileChannelImpl.tryLock(FileChannelImpl.java:868) at com.bigdata.journal.FileMetadata.reopenChannel(FileMetadata.java:1210) Added alternative constructor to allow reuse of an existing AbstractTripleStore. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGraphFixture.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGraphFixture.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGraphFixture.java 2013-11-08 20:24:01 UTC (rev 7517) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGraphFixture.java 2013-11-09 12:51:08 UTC (rev 7518) @@ -9,6 +9,7 @@ import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor; import com.bigdata.rdf.graph.util.AbstractGraphFixture; import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.store.AbstractTripleStore; public class BigdataGraphFixture extends AbstractGraphFixture { @@ -17,12 +18,21 @@ public BigdataGraphFixture(final Properties properties) throws SailException { - sail = new BigdataSail(); + sail = new BigdataSail(properties); sail.initialize(); } - + + public BigdataGraphFixture(final AbstractTripleStore kb) + throws SailException { + + sail = new BigdataSail(kb); + + sail.initialize(); + + } + @Override public BigdataSail getSail() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-08 20:24:07
|
Revision: 7517 http://bigdata.svn.sourceforge.net/bigdata/?rev=7517&view=rev Author: thompsonbry Date: 2013-11-08 20:24:01 +0000 (Fri, 08 Nov 2013) Log Message: ----------- Added ability to specify which digests should be computed (All, None, Journal, Snapshots, HALogs) and made the default when "digests=" is specified "Journal". This makes the digest computation request through the /status page useable when there are a LOT of HALog files on hand. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java 2013-11-08 19:42:57 UTC (rev 7516) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java 2013-11-08 20:24:01 UTC (rev 7517) @@ -62,6 +62,7 @@ import com.bigdata.quorum.Quorum; import com.bigdata.quorum.zk.ZKQuorumClient; import com.bigdata.quorum.zk.ZKQuorumImpl; +import com.bigdata.rdf.sail.webapp.StatusServlet.DigestEnum; import com.bigdata.zookeeper.DumpZookeeper; /** @@ -162,7 +163,19 @@ quorumService = t; } - final boolean digests = req.getParameter(StatusServlet.DIGESTS) != null; + final DigestEnum digestEnum; + { + final String str = req.getParameter(StatusServlet.DIGESTS); + if (str == null) { + digestEnum = null; + } else { + if (str.trim().isEmpty()) { + digestEnum = StatusServlet.DEFAULT_DIGESTS; + } else { + digestEnum = DigestEnum.valueOf(str.trim()); + } + } + } current.node("h1", "High Availability"); @@ -270,7 +283,8 @@ final File file = journal.getFile(); if (file != null) { String digestStr = null; - if (digests) { + if (digestEnum != null + && (digestEnum == DigestEnum.All || digestEnum == DigestEnum.Journal)) { try { final MessageDigest digest = MessageDigest .getInstance("MD5"); @@ -361,7 +375,8 @@ + (currentFile == null ? "N/A" : currentFile .getName())).node("br").close(); } - if (digests) { + if (digestEnum != null + && (digestEnum == DigestEnum.All || digestEnum == DigestEnum.HALogs)) { /* * List each historical HALog file together with its digest. * @@ -379,7 +394,7 @@ final IHALogReader r = nexus.getHALogWriter() .getReader(closingCommitCounter); try { - if (digests && !r.isEmpty()) { + if (!r.isEmpty()) { try { final MessageDigest digest = MessageDigest .getInstance("MD5"); @@ -462,7 +477,8 @@ // final File file = journal.getSnapshotManager() // .getSnapshotFile(rb.getCommitCounter()); String digestStr = null; - if (digests) { + if (digestEnum != null + && (digestEnum == DigestEnum.All || digestEnum == DigestEnum.Snapshots)) { try { final MessageDigest digest = MessageDigest .getInstance("MD5"); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2013-11-08 19:42:57 UTC (rev 7516) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2013-11-08 20:24:01 UTC (rev 7517) @@ -175,9 +175,19 @@ * on the journal are only valid if there are no concurrent writes on the * journal and the journal has been through either a commit or an abort * protocol. + * <p> + * The value is a {@link DigestEnum} and defaults to + * {@link DigestEnum#Journal} when {@link #DIGESTS} is specified without an + * explicit {@link DigestEnum} value. */ static final String DIGESTS = "digests"; + + static final DigestEnum DEFAULT_DIGESTS = DigestEnum.Journal; + static enum DigestEnum { + None, Journal, HALogs, Snapshots, All; + } + /** * Special HA status request designed for clients that poll to determine the * status of an HAJournalServer. This option is exclusive of other This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-08 19:43:04
|
Revision: 7516 http://bigdata.svn.sourceforge.net/bigdata/?rev=7516&view=rev Author: thompsonbry Date: 2013-11-08 19:42:57 +0000 (Fri, 08 Nov 2013) Log Message: ----------- Bug fix for "logically empty HALog" due to failure to flush the file when writing the closing root block on the HALog. Also added doubleSync parameter that is set from com.bigdata.journal.Options.DOUBLE_SYNC and now also controls whether we double sync the HALog (before and after writing the closing root block). See #679 (Logically empty HALog) See #738 (Longevity testing) See #724 (Sudden kill tests) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-11-08 18:08:21 UTC (rev 7515) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-11-08 19:42:57 UTC (rev 7516) @@ -110,6 +110,21 @@ /** HA log directory. */ private final File m_haLogDir; + /** + * When <code>true</code>, the HALog is flushed to the disk before the + * closing root block is written and then once again after the closing root + * block is written. When <code>false</code>, the HALog is flushed only + * once, after the closing root block is written. + * + * @see <a + * href="http://sourceforge.net/apps/trac/bigdata/ticket/738#comment:13" + * >Longevity and stress test protocol for HA QA </a> + * + * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/679"> + * HAJournalServer can not restart due to logically empty log file </a> + */ + private final boolean doubleSync; + /** * The root block of the leader at the start of the current write set. */ @@ -250,13 +265,45 @@ } - public HALogWriter(final File logDir) { + /** + * + * @param logDir + * The directory in which the HALog files reside. + * @param doubleSync + * When <code>true</code>, the HALog is flushed to the disk + * before the closing root block is written and then once again + * after the closing root block is written. When + * <code>false</code>, the HALog is flushed only once, after the + * closing root block is written. + * + * @see <a + * href="http://sourceforge.net/apps/trac/bigdata/ticket/738#comment:13" + * >Longevity and stress test protocol for HA QA </a> + * + * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/679"> + * HAJournalServer can not restart due to logically empty log file </a> + */ + public HALogWriter(final File logDir, final boolean doubleSync) { m_haLogDir = logDir; + + this.doubleSync = doubleSync; } /** + * + * @param logDir + * + * @deprecated This is ony used by the test suite. + */ + HALogWriter(final File logDir) { + + this(logDir, true/* doubleSync */); + + } + + /** * Open an HA log file for the write set starting with the given root block. * * @param rootBlock @@ -430,7 +477,26 @@ } - flush(); // current streamed data + if (doubleSync) { + /** + * Flush the HALog records to the disk. + * + * Note: This is intended to avoid the possibility that the + * writes might be reordered such that closing root block was + * written to the disk before the HALog records were flushed to + * the disk. However, better durability guarantees are provided + * by battery backup on the disk controller and similar such + * nicities. If such techniques are used, you can disable the + * doubleSync option and still have a guarantee that writes on + * the HALog are durable and respect the applications ordering + * of write requests (in terms of restart safe visibility). + * + * @see <a + * href="http://sourceforge.net/apps/trac/bigdata/ticket/738#comment:13" + * >Longevity and stress test protocol for HA QA </a> + */ + flush(); + } /* * The closing root block is written into which ever slot @@ -442,8 +508,14 @@ */ writeRootBlock(rootBlock.isRootBlock0(), rootBlock); - // // The closing root block is always in slot 1. - // writeRootBlock(false/* isRootBlock0 */, rootBlock); + /** + * Flush the backing channel. + * + * @see <a + * href="http://sourceforge.net/apps/trac/bigdata/ticket/738#comment:13" + * >Longevity and stress test protocol for HA QA </a> + */ + flush(); m_state.committed(); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2013-11-08 18:08:21 UTC (rev 7515) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2013-11-08 19:42:57 UTC (rev 7516) @@ -236,7 +236,7 @@ } // Set up the HA log writer. - haLogWriter = new HALogWriter(haLogDir); + haLogWriter = new HALogWriter(haLogDir, journal.isDoubleSync()); haLogIndex = HALogIndex.createTransient(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-08 18:08:27
|
Revision: 7515 http://bigdata.svn.sourceforge.net/bigdata/?rev=7515&view=rev Author: thompsonbry Date: 2013-11-08 18:08:21 +0000 (Fri, 08 Nov 2013) Log Message: ----------- added periodic log @ WARN when reading a large number HALog files during startup (better ergonomics) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2013-11-07 14:45:09 UTC (rev 7514) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2013-11-08 18:08:21 UTC (rev 7515) @@ -532,6 +532,20 @@ haLogIndex.add(new HALogRecord(closingRootBlock, sizeOnDisk)); + final long nentries = haLogIndex.getEntryCount(); + + if (nentries % 1000 == 0) { + + /* + * Provide an indication that the server is doing work during + * startup. If there are a lot of HALog files, then we can spend + * quite a bit of time in this procedure. + */ + + haLog.warn("Indexed " + nentries + " HALog files"); + + } + } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-07 14:45:15
|
Revision: 7514 http://bigdata.svn.sourceforge.net/bigdata/?rev=7514&view=rev Author: thompsonbry Date: 2013-11-07 14:45:09 +0000 (Thu, 07 Nov 2013) Log Message: ----------- Added script to run the HAClient. This can be used to verify access to zookeeper and river and list out the discovered services. Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAClient.sh Added: branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAClient.sh =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAClient.sh (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAClient.sh 2013-11-07 14:45:09 UTC (rev 7514) @@ -0,0 +1,25 @@ +#!/bin/bash + +# Setup the environment. +source src/resources/HAJournal/HAJournal.env + +# Uncomment to enable profiler. +#profilerAgent=-agentpath:/nas/install/yjp-12.0.6/bin/linux-x86-64/libyjpagent.so + +# Uncomment to have all profiling initially disabled. +#profilerAgentOptions=-agentlib:yjpagent=disableexceptiontelemetry,disablestacktelemetry + +# Uncomment to enable remote debugging at the specified port. +#debug=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1046 + +# Run HAClient. +java\ + ${JAVAOPTS}\ + -cp ${CLASSPATH}\ + -Djava.security.policy=${POLICY_FILE}\ + -Dlog4j.configuration=${LOG4J_CONFIG}\ + -Djava.util.logging.config.file=${LOGGING_CONFIG}\ + ${debug}\ + ${profilerAgent} ${profilerAgentOptions}\ + com.bigdata.journal.jini.ha.HAClient\ + ${HAJOURNAL_CONFIG} \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-06 18:39:48
|
Revision: 7513 http://bigdata.svn.sourceforge.net/bigdata/?rev=7513&view=rev Author: thompsonbry Date: 2013-11-06 18:39:41 +0000 (Wed, 06 Nov 2013) Log Message: ----------- Fixed case-based spelling error in the HATest.dir property name that was preventing CI builds from succeeding on EC2. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/build.xml Modified: branches/BIGDATA_RELEASE_1_3_0/build.xml =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/build.xml 2013-11-06 16:12:07 UTC (rev 7512) +++ branches/BIGDATA_RELEASE_1_3_0/build.xml 2013-11-06 18:39:41 UTC (rev 7513) @@ -1992,8 +1992,8 @@ <mkdir dir="${test.results.dir}" /> <!-- Clear out the old HA test suite logs. --> - <property name="HAtest.dir" location="benchmark/CI-HAJournal-1" /> - <delete dir="${HAtest.dir}" quiet="true" /> + <property name="HATest.dir" location="benchmark/CI-HAJournal-1" /> + <delete dir="${HATest.dir}" quiet="true" /> <mkdir dir="${HATest.dir}" /> <condition property="testClass" value="${testName}"> @@ -2187,7 +2187,7 @@ <tar destfile="${test.results.dir}/report.tgz" basedir="${test.results.dir}/report" compression="gzip"/> <!-- Archive the HA test suite output logs. --> - <tar destfile="${test.results.dir}/HAtest-report.tgz" basedir="${HAtest.dir}" compression="gzip"/> + <tar destfile="${test.results.dir}/HAtest-report.tgz" basedir="${HATest.dir}" compression="gzip"/> </target> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-06 16:12:14
|
Revision: 7512 http://bigdata.svn.sourceforge.net/bigdata/?rev=7512&view=rev Author: thompsonbry Date: 2013-11-06 16:12:07 +0000 (Wed, 06 Nov 2013) Log Message: ----------- added -quiet for javadoc target Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/build.xml Modified: branches/BIGDATA_RELEASE_1_3_0/build.xml =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/build.xml 2013-11-06 16:07:58 UTC (rev 7511) +++ branches/BIGDATA_RELEASE_1_3_0/build.xml 2013-11-06 16:12:07 UTC (rev 7512) @@ -387,6 +387,7 @@ classpathref="build.classpath" > <arg value="-J-Xmx1000m" /> + <arg value="-quiet" /> <packageset dir="${bigdata.dir}/bigdata/src/java" /> <packageset dir="${bigdata.dir}/bigdata/src/samples" /> <packageset dir="${bigdata.dir}/bigdata-jini/src/java" /> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-06 16:08:05
|
Revision: 7511 http://bigdata.svn.sourceforge.net/bigdata/?rev=7511&view=rev Author: thompsonbry Date: 2013-11-06 16:07:58 +0000 (Wed, 06 Nov 2013) Log Message: ----------- forcing javadoc build target to verbose=no Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/build.xml Modified: branches/BIGDATA_RELEASE_1_3_0/build.xml =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/build.xml 2013-11-04 20:43:34 UTC (rev 7510) +++ branches/BIGDATA_RELEASE_1_3_0/build.xml 2013-11-06 16:07:58 UTC (rev 7511) @@ -381,7 +381,7 @@ <target name="javadoc" depends="prepare" if="javadoc"> <mkdir dir="${build.dir}/docs/api" /> <javadoc destdir="${build.dir}/docs/api" defaultexcludes="yes" - author="true" version="true" use="true" + author="true" version="true" use="true" verbose="no" overview="${bigdata.dir}/overview.html" windowtitle="bigdata® v${build.ver}" classpathref="build.classpath" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-04 20:43:40
|
Revision: 7510 http://bigdata.svn.sourceforge.net/bigdata/?rev=7510&view=rev Author: thompsonbry Date: 2013-11-04 20:43:34 +0000 (Mon, 04 Nov 2013) Log Message: ----------- javadoc Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java 2013-11-04 20:33:12 UTC (rev 7509) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java 2013-11-04 20:43:34 UTC (rev 7510) @@ -464,6 +464,33 @@ * Review commit2Phase semantics when a follower fails </a> * * @see TestHAJournalServerOverride#testStartABC_commit2Phase_B_failCommit_beforeWritingRootBlockOnJournal_HALogsPurgedAtCommit() + * + * TODO There should probably be different tests to examine how the HA + * cluster handles the case where we have fewer than the required + * number of services that correctly perform the commit. We should be + * able to write tests that actually cause the HA cluster meet on the + * previous commit point (majority fail to commit), the new commit + * point (majority commit), or on NO commit point (bare majority with + * 3rd service at an earlier commit point, 2 services vote YES and one + * fails before writing the root block on the journal - this last case + * should cause generate an alert, even if only because the HA cluster + * is unable to form a quorum). + * + * TODO Write test where commit2Phase() fails after writing the root + * block on the journal but before writing the root block on the HALog. + * Use this to explore what can happen when the live HALog file is not + * properly closed. The journal will be at a commit point in advance of + * the most recent HALog file. It needs to either reach to the quorum + * and recover the root block (and any subsequent commits) from the + * leader. Note that it could cure the missing HALog root block locally + * as well, but better to find a common pattern. + * + * TODO Develop annotations on the commit2Phase protocol diagram that + * show the different tests that we have to examine the failure modes. + * + * TODO Explore GATHER failure modes. + * + * TODO Test failover reads. */ public void testStartABC_commit2Phase_B_failCommit_beforeWritingRootBlockOnJournal_HALogsNotPurgedAtCommit() throws Exception { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-04 20:33:20
|
Revision: 7509 http://bigdata.svn.sourceforge.net/bigdata/?rev=7509&view=rev Author: thompsonbry Date: 2013-11-04 20:33:12 +0000 (Mon, 04 Nov 2013) Log Message: ----------- Added test coverage for spurious exception throw out of commit2Phase() before the root block is written on the Journal. See #760 (Code review for 2-phase commit protocol). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HA2PhaseCommitMessage.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseAbortMessage.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseCommitMessage.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhasePrepareMessage.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseCommitProtocolMessage.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/Mock2PhaseCommitProtocolException.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HA2PhaseCommitMessage.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HA2PhaseCommitMessage.java 2013-11-04 16:42:25 UTC (rev 7508) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HA2PhaseCommitMessage.java 2013-11-04 20:33:12 UTC (rev 7509) @@ -66,5 +66,15 @@ + didAllServicesPrepare + "}"; } + + @Override + public boolean failCommit_beforeWritingRootBlockOnJournal() { + return false; + } + + @Override + public boolean failCommit_beforeClosingHALog() { + return false; + } } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseAbortMessage.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseAbortMessage.java 2013-11-04 16:42:25 UTC (rev 7508) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseAbortMessage.java 2013-11-04 20:33:12 UTC (rev 7509) @@ -28,7 +28,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ -public interface IHA2PhaseAbortMessage extends IHAMessage { +public interface IHA2PhaseAbortMessage extends IHA2PhaseCommitProtocolMessage { /** * The token for the quorum for which this request was made. Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseCommitMessage.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseCommitMessage.java 2013-11-04 16:42:25 UTC (rev 7508) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseCommitMessage.java 2013-11-04 20:33:12 UTC (rev 7509) @@ -35,7 +35,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ -public interface IHA2PhaseCommitMessage extends IHAMessage { +public interface IHA2PhaseCommitMessage extends IHA2PhaseCommitProtocolMessage { /** * <code>true</code> iff the service was recognized as being joined with the @@ -60,5 +60,23 @@ * the commit will still be performed). */ boolean didAllServicesPrepare(); - + + /** + * When <code>true</code> the COMMIT message will fail within the + * commit2Phase implementation. An exception will be thrown immeditely + * before the new root block is written onto the journal. + * <p> + * Note: This is for unit tests only. + */ + boolean failCommit_beforeWritingRootBlockOnJournal(); + + /** + * When <code>true</code> the COMMIT message will fail within the + * commit2Phase implementation. An exception will be thrown immeditely + * before the closing root block is written onto the HALog file. + * <p> + * Note: This is for unit tests only. + */ + boolean failCommit_beforeClosingHALog(); + } Added: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseCommitProtocolMessage.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseCommitProtocolMessage.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhaseCommitProtocolMessage.java 2013-11-04 20:33:12 UTC (rev 7509) @@ -0,0 +1,33 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.msg; + +/** + * Message for one of the 2-phase commit protocol operations. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public interface IHA2PhaseCommitProtocolMessage extends IHAMessage { + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhasePrepareMessage.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhasePrepareMessage.java 2013-11-04 16:42:25 UTC (rev 7508) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHA2PhasePrepareMessage.java 2013-11-04 20:33:12 UTC (rev 7509) @@ -36,7 +36,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ -public interface IHA2PhasePrepareMessage extends IHAMessage { +public interface IHA2PhasePrepareMessage extends IHA2PhaseCommitProtocolMessage { /** * The consensus release time from the GATHER. @@ -91,6 +91,8 @@ /** * When <code>true</code>, always vote note. + * <p> + * Note: This is for unit tests only. */ boolean voteNo(); Added: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/Mock2PhaseCommitProtocolException.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/Mock2PhaseCommitProtocolException.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/Mock2PhaseCommitProtocolException.java 2013-11-04 20:33:12 UTC (rev 7509) @@ -0,0 +1,51 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.msg; + +/** + * Instances of this class are used when one of the + * {@link IHA2PhaseCommitProtocolMessage}s is configured to force a runtime + * exception during the 2-phase commit protocol. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class Mock2PhaseCommitProtocolException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public Mock2PhaseCommitProtocolException() { + super(); + } + + public Mock2PhaseCommitProtocolException(final String msg) { + super(msg); + } + + public Mock2PhaseCommitProtocolException(final RuntimeException cause) { + + super(cause); + + } + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-04 16:42:25 UTC (rev 7508) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-04 20:33:12 UTC (rev 7509) @@ -140,6 +140,7 @@ import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.ha.msg.IHAWriteSetStateRequest; import com.bigdata.ha.msg.IHAWriteSetStateResponse; +import com.bigdata.ha.msg.Mock2PhaseCommitProtocolException; import com.bigdata.htree.HTree; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.IDataRecord; @@ -7040,16 +7041,6 @@ } } // class VoteNoTask - -// /** -// * Method must be extended by subclass to coordinate the rejected -// * commit. -// */ -// protected void doRejectedCommit() { -// -// doLocalAbort(); -// -// } /** * Task prepares for a 2-phase commit (syncs to the disk) and votes YES @@ -7337,9 +7328,9 @@ /* * Hook allows the test suite to force a NO vote. */ - - throw new RuntimeException("Force NO vote"); + throw new Mock2PhaseCommitProtocolException("Force NO vote"); + } // Vote YES. @@ -7640,11 +7631,23 @@ // verify that the qourum has not changed. quorum.assertQuorum(rootBlock.getQuorumToken()); + if (commitMessage.failCommit_beforeWritingRootBlockOnJournal()) { + + throw new Mock2PhaseCommitProtocolException(); + + } + /* * Write the root block on the local journal. */ AbstractJournal.this.doLocalCommit(localService, rootBlock); + if (commitMessage.failCommit_beforeClosingHALog()) { + + throw new Mock2PhaseCommitProtocolException(); + + } + /* * Write the root block on the HALog file, closing out that * file. Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-11-04 16:42:25 UTC (rev 7508) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-11-04 20:33:12 UTC (rev 7509) @@ -265,6 +265,16 @@ public void voteNo() throws IOException; /** + * @see IHA2PhaseCommitMessage#failCommit_beforeWritingRootBlockOnJournal() + */ + public void failCommit_beforeWritingRootBlockOnJournal() throws IOException; + + /** + * @see IHA2PhaseCommitMessage#failCommit_beforeClosingHALog() + */ + public void failCommit_beforeClosingHALog() throws IOException; + + /** * Set the next value to be reported by {@link BasicHA#nextTimestamp()}. * <p> * Note: Only a few specific methods call against @@ -278,7 +288,7 @@ * by {@link BasicHA#nextTimestamp()}, after which the * behavior will revert to the default. * - * TODO Add a "clearNextTimestamp() method. + * TODO Add a "clearNextTimestamp()" method. */ public void setNextTimestamp(long nextTimestamp) throws IOException; @@ -424,9 +434,29 @@ /** * Flag used to force the service to vote "NO" on the next two-phase * commit. + * + * @see IHA2PhasePrepareMessage#voteNo() */ private final AtomicBoolean voteNo = new AtomicBoolean(false); + /** + * Flag used to force the service to fail rather than laying down the + * new root block in the COMMIT message. + * + * @see IHA2PhaseCommitMessage#failCommit_beforeWritingRootBlockOnJournal() + */ + private final AtomicBoolean failCommit_beforeWritingRootBlockOnJournal = new AtomicBoolean( + false); + + /** + * Flag used to force the service to fail rather than laying down the + * new root block in the COMMIT message. + * + * @see IHA2PhaseCommitMessage#failCommit_beforeClosingHALog() + */ + private final AtomicBoolean failCommit_beforeClosingHALog = new AtomicBoolean( + false); + private final AtomicLong nextTimestamp = new AtomicLong(-1L); private HAGlueTestImpl(final UUID serviceId) { @@ -487,10 +517,26 @@ @Override public void voteNo() throws IOException { + voteNo.set(true); + } @Override + public void failCommit_beforeWritingRootBlockOnJournal() throws IOException { + + failCommit_beforeWritingRootBlockOnJournal.set(true); + + } + + @Override + public void failCommit_beforeClosingHALog() throws IOException { + + failCommit_beforeClosingHALog.set(true); + + } + + @Override public void setNextTimestamp(long nextTimestamp) throws IOException { this.nextTimestamp.set(nextTimestamp); @@ -915,8 +961,17 @@ if (voteNo.compareAndSet(true/* expect */, false/* update */)) { - return super.prepare2Phase(new MyPrepareMessage(prepareMessage)); + return super + .prepare2Phase(new MyPrepareMessage(prepareMessage) { + + private static final long serialVersionUID = 1L; + @Override + public boolean voteNo() { + return true; + } + }); + } else { return super.prepare2Phase(prepareMessage); @@ -926,13 +981,42 @@ } @Override - public Future<Void> commit2Phase(IHA2PhaseCommitMessage commitMessage) { + public Future<Void> commit2Phase(final IHA2PhaseCommitMessage commitMessage) { checkMethod("commit2Phase", new Class[] { IHA2PhaseCommitMessage.class }); - return super.commit2Phase(commitMessage); + if (failCommit_beforeWritingRootBlockOnJournal.compareAndSet( + true/* expect */, false/* update */)) { + return super.commit2Phase(new MyCommitMessage(commitMessage) { + + private static final long serialVersionUID = 1L; + + @Override + public boolean failCommit_beforeWritingRootBlockOnJournal() { + return true; + } + }); + } else if (failCommit_beforeClosingHALog.compareAndSet( + true/* expect */, false/* update */)) { + + return super.commit2Phase(new MyCommitMessage(commitMessage) { + + private static final long serialVersionUID = 1L; + + @Override + public boolean failCommit_beforeClosingHALog() { + return true; + } + }); + + } else { + + return super.commit2Phase(commitMessage); + + } + } @Override @@ -950,7 +1034,8 @@ */ @Override - public Future<IHAReadResponse> readFromDisk(IHAReadRequest readMessage) { + public Future<IHAReadResponse> readFromDisk( + final IHAReadRequest readMessage) { checkMethod("readFromDisk", new Class[] { IHAReadResponse.class }); @@ -979,8 +1064,8 @@ } @Override - public Future<Void> receiveAndReplicate(IHASyncRequest req, - IHAWriteMessage msg) throws IOException { + public Future<Void> receiveAndReplicate(final IHASyncRequest req, + final IHAWriteMessage msg) throws IOException { checkMethod("receiveAndReplicate", new Class[] { IHASyncRequest.class, IHAWriteMessage.class }); @@ -1157,7 +1242,7 @@ // // try { // -// // FIXME: hould already be closed, can we check this? +// // Should already be closed, can we check this? // // // Obtain a new connection. // ((ZKQuorumImpl) getQuorum()).getZookeeper(); @@ -1239,6 +1324,11 @@ } // class HAGlueTestImpl + /** + * Delegation pattern allows us to override select methods easily. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ private static class MyPrepareMessage implements IHA2PhasePrepareMessage { /** @@ -1288,13 +1378,57 @@ } /** - * Force the PREPARE to vote NO. + * {@inheritDoc} + * <p> + * Overridden to force the PREPARE to vote NO. */ @Override public boolean voteNo() { - return true; + return delegate.voteNo(); } } + + /** + * Delegation pattern allows us to override select methods easily. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + private static class MyCommitMessage implements IHA2PhaseCommitMessage { + + private static final long serialVersionUID = 1L; + + private final IHA2PhaseCommitMessage delegate; + + public MyCommitMessage(final IHA2PhaseCommitMessage msg) { + this.delegate = msg; + } + + @Override + public boolean isJoinedService() { + return delegate.isJoinedService(); + } + + @Override + public long getCommitTime() { + return delegate.getCommitTime(); + } + + @Override + public boolean didAllServicesPrepare() { + return delegate.didAllServicesPrepare(); + } + + @Override + public boolean failCommit_beforeWritingRootBlockOnJournal() { + return delegate.failCommit_beforeWritingRootBlockOnJournal(); + } + + @Override + public boolean failCommit_beforeClosingHALog() { + return delegate.failCommit_beforeClosingHALog(); + } + + } } // class HAJournalTest Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java 2013-11-04 16:42:25 UTC (rev 7508) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java 2013-11-04 20:33:12 UTC (rev 7509) @@ -30,10 +30,14 @@ import net.jini.config.Configuration; +import com.bigdata.ha.HACommitGlue; import com.bigdata.ha.HAGlue; +import com.bigdata.ha.HAStatusEnum; import com.bigdata.ha.halog.HALogReader; import com.bigdata.ha.halog.IHALogReader; +import com.bigdata.ha.msg.IHA2PhasePrepareMessage; import com.bigdata.journal.CommitCounterUtility; +import com.bigdata.journal.jini.ha.HAJournalTest.HAGlueTest; /** * Test suite when we are using the {@link DefaultSnapshotPolicy} and @@ -443,4 +447,98 @@ } + /** + * Three services are started in [A,B,C] order. B is setup for + * {@link HACommitGlue#prepare2Phase(IHA2PhasePrepareMessage)} to throw an + * exception inside of the commit2Phase() method rather than at the external + * RMI interface. + * <p> + * A simple transaction is performed. We verify that the transaction + * completes successfully, that the quorum token is unchanged, and that + * [A,C] both participated in the commit. We also verify that B is moved to + * the end of the pipeline (by doing a serviceLeave and then re-entering the + * pipeline) and that it resyncs with the met quorum and finally re-joins + * with the met quorum. The quorum should not break across this test. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/760" > + * Review commit2Phase semantics when a follower fails </a> + * + * @see TestHAJournalServerOverride#testStartABC_commit2Phase_B_failCommit_beforeWritingRootBlockOnJournal_HALogsPurgedAtCommit() + */ + public void testStartABC_commit2Phase_B_failCommit_beforeWritingRootBlockOnJournal_HALogsNotPurgedAtCommit() + throws Exception { + + // Enforce the join order. + final ABC startup = new ABC(true /*sequential*/); + + //HAJournalTest.dumpThreads(); + + final long token = awaitFullyMetQuorum(); + + // Should be one commit point. + awaitCommitCounter(1L, startup.serverA, startup.serverB, + startup.serverC); + + /* + * Setup B to fail the "COMMIT" message (specifically, it will throw + * back an exception rather than executing the commit. + */ + ((HAGlueTest) startup.serverB) + .failCommit_beforeWritingRootBlockOnJournal(); + + /* + * Simple transaction. + * + * Note: B will fail the commit without laying down the root block and + * will transition into the ERROR state. From there, it will move to + * SeekConsensus and then RESYNC. While in RESYNC it will pick up the + * missing HALog and commit point. Finally, it will transition into + * RunMet. + */ + simpleTransaction(); + + // Verify quorum is unchanged. + assertEquals(token, quorum.token()); + + // Should be two commit points on {A,C}. + awaitCommitCounter(2L, startup.serverA, startup.serverC); + + /* + * Just one commit point on B + * + * TODO This is a data race. It is only transiently true. + */ + awaitCommitCounter(1L, startup.serverB); + + /* + * B is NotReady + * + * TODO This is a data race. It is only transiently true. + */ + awaitHAStatus(startup.serverB, HAStatusEnum.NotReady); + + /* + * The pipeline should be reordered. B will do a service leave, then + * enter seek consensus, and then re-enter the pipeline. + */ + awaitPipeline(new HAGlue[] { startup.serverA, startup.serverC, + startup.serverB }); + + awaitFullyMetQuorum(); + + /* + * There should be two commit points on {A,C,B} (note that this assert + * does not pay attention to the pipeline order). + */ + awaitCommitCounter(2L, startup.serverA, startup.serverC, + startup.serverB); + + // B should be a follower again. + awaitHAStatus(startup.serverB, HAStatusEnum.Follower); + + // quorum token is unchanged. + assertEquals(token, quorum.token()); + + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-11-04 16:42:25 UTC (rev 7508) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-11-04 20:33:12 UTC (rev 7509) @@ -38,10 +38,10 @@ import com.bigdata.ha.HACommitGlue; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; -import com.bigdata.ha.msg.IHA2PhaseCommitMessage; import com.bigdata.ha.msg.IHA2PhasePrepareMessage; import com.bigdata.ha.msg.IHANotifyReleaseTimeRequest; import com.bigdata.journal.AbstractTask; +import com.bigdata.journal.jini.ha.HAJournalServer.RunStateEnum; import com.bigdata.journal.jini.ha.HAJournalTest.HAGlueTest; import com.bigdata.journal.jini.ha.HAJournalTest.SpuriousTestException; import com.bigdata.quorum.zk.ZKQuorumImpl; @@ -174,14 +174,37 @@ * When we add concurrent unisolated writers, the user level transaction * abort will just discard the buffered writes for a specific * {@link AbstractTask}. - * - * @throws Exception */ public void testStartABC_userLevelAbortDoesNotCauseQuorumBreak() throws Exception { - fail("write test"); + final ABC x = new ABC(true/*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // Now run several transactions + final int NTX = 5; + for (int i = 0; i < NTX; i++) + simpleTransaction(); + // wait until the commit point is registered on all services. + awaitCommitCounter(NTX + 1L, new HAGlue[] { x.serverA, x.serverB, + x.serverC }); + + // Verify order. + awaitPipeline(new HAGlue[] { x.serverA, x.serverB, x.serverC }); + awaitJoined(new HAGlue[] { x.serverA, x.serverB, x.serverC }); + + // Run a transaction that forces a 2-phase abort. + ((HAGlueTest) x.serverA).simpleTransaction_abort(); + + // Reverify order. + awaitPipeline(new HAGlue[] { x.serverA, x.serverB, x.serverC }); + awaitJoined(new HAGlue[] { x.serverA, x.serverB, x.serverC }); + + // Verify no failover of the leader. + assertEquals(token, awaitFullyMetQuorum()); + } /** @@ -375,13 +398,14 @@ /** * Three services are started in [A,B,C] order. B is setup for - * {@link HACommitGlue#prepare2Phase(IHA2PhasePrepareMessage)} to vote "NO". - * A simple transaction is performed. We verify that the transaction - * completes successfully, that the quorum token is unchanged, and that - * [A,C] both participated in the commit. We also verify that B is moved to - * the end of the pipeline (by doing a serviceLeave and then re-entering the - * pipeline) and that it resyncs with the met quorum and finally re-joins - * with the met quorum. The quorum should not break across this test. + * {@link HACommitGlue#prepare2Phase(IHA2PhasePrepareMessage)} to throw an + * exception. A simple transaction is performed. We verify that the + * transaction completes successfully, that the quorum token is unchanged, + * and that [A,C] both participated in the commit. We also verify that B is + * moved to the end of the pipeline (by doing a serviceLeave and then + * re-entering the pipeline) and that it resyncs with the met quorum and + * finally re-joins with the met quorum. The quorum should not break across + * this test. */ public void testStartABC_prepare2Phase_B_throws_exception() throws Exception { @@ -472,36 +496,36 @@ /** * Three services are started in [A,B,C] order. B is setup for * {@link HACommitGlue#prepare2Phase(IHA2PhasePrepareMessage)} to throw an - * exeption. A simple transaction is performed. We verify that the - * transaction completes successfully, that the quorum token is unchanged, - * and that [A,C] both participated in the commit. We also verify that B is - * moved to the end of the pipeline (by doing a serviceLeave and then - * re-entering the pipeline) and that it resyncs with the met quorum and - * finally re-joins with the met quorum. The quorum should not break across - * this test. - * - * FIXME Variant where the commit2Phase fails. Note: The COMMIT message is - * design to do as little work as possible. In practice, this requires an - * RMI to the followers, each follower must not encounter an error when it - * validates the COMMIT message, and each follower must put down its new - * root block (from the prepare message) and then sync the disk. Finally, - * the RMI response must be returned. + * exception inside of the commit2Phase() method rather than at the external + * RMI interface. * <p> - * Under what conditions can a COMMIT message fail where we can still - * recover? Single node failure? Leader failure? (QuorumCommitImpl currently - * fails the commit if there is a single failure, even though the quourm - * might have a consensus around the new commit point.) + * A simple transaction is performed. We verify that the transaction + * completes successfully, that the quorum token is unchanged, and that + * [A,C] both participated in the commit. We also verify that B is moved to + * the end of the pipeline (by doing a serviceLeave and then re-entering the + * pipeline). For this test, B DOES NOT resync and join. This is because A + * and C go through their commit2Phase() methods for a fully met quorum. + * Because we have explicitly disabled the {@link DefaultRestorePolicy}, + * this allows them to purge their HALogs. This means that B can not resync + * with the met quorum. As a consequence, B transitions to the + * {@link RunStateEnum#Operator} state and remains + * {@link HAStatusEnum#NotReady}. + * <p> + * The quorum should not break across this test. * - * TODO Consider leader failure scenarios in this test suite, not just - * scenarios where B fails. We MUST also cover failures of C (the 2nd - * follower). We should also cover scenarios where the quorum is barely met - * and a single failure causes a rejected commit (local decision) or 2-phase - * abort (joined services in joint agreement). + * TODO Consider leader failure scenarios in this test suite (commit2Phase() + * fails on the leader), not just scenarios where B fails. We MUST also + * cover failures of C (the 2nd follower). We should also cover scenarios + * where the quorum is barely met and a single failure causes a rejected + * commit (local decision) or 2-phase abort (joined services in joint + * agreement). * * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/760" > * Review commit2Phase semantics when a follower fails </a> + * + * @see TestHA3JournalServerWithHALogs#testStartABC_commit2Phase_B_failCommit_beforeWritingRootBlockOnJournal_HALogsNotPurgedAtCommit() */ - public void testStartABC_commit2Phase_B_fails() + public void testStartABC_commit2Phase_B_failCommit_beforeWritingRootBlockOnJournal_HALogsPurgedAtCommit() throws Exception { // Enforce the join order. @@ -518,120 +542,69 @@ /* * Setup B to fail the "COMMIT" message (specifically, it will throw * back an exception rather than executing the commit. - * - * FIXME We need to cause B to actually fail the commit such that it - * enters the ERROR state. This is only causing the RMI to be rejected - * so B is not being failed out of the pipeline. Thus, B will remain - * joined with the met quorum (but at the wrong commit point) until we - * send down another replicated write. At that point B will notice that - * it is out of whack and enter the ERROR state. */ ((HAGlueTest) startup.serverB) - .failNext("commit2Phase", - new Class[] { IHA2PhaseCommitMessage.class }, - 0/* nwait */, 1/* nfail */); + .failCommit_beforeWritingRootBlockOnJournal(); - /** - * FIXME We need to resolve the correct behavior when B fails the commit - * after having prepared. Two code paths are outlined below. The - * implementation currently does an abort2Phase() when the - * commit2Phase() observe an error for B. That causes the commit point - * to NOT advance. + /* + * Simple transaction. * - * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/760" > - * Review commit2Phase semantics when a follower fails </a> + * Note: B will fail the commit without laying down the root block and + * will transition into the ERROR state. From there, it will move to + * SeekConsensus and then RESYNC. While in RESYNC it will pick up the + * missing HALog and commit point. Finally, it will transition into + * RunMet. */ - - if(true) { + simpleTransaction(); - // Simple transaction. - simpleTransaction(); + // Verify quorum is unchanged. + assertEquals(token, quorum.token()); - // Verify quorum is unchanged. - assertEquals(token, quorum.token()); + // Should be two commit points on {A,C}. + awaitCommitCounter(2L, startup.serverA, startup.serverC); - // Should be two commit points on {A,C}. - awaitCommitCounter(2L, startup.serverA, startup.serverC); + /* + * Just one commit point on B + * + * TODO This is a data race. It is only transiently true. + */ + awaitCommitCounter(1L, startup.serverB); - // Just one commit point on B. - awaitCommitCounter(1L, startup.serverB); + /* + * B is NotReady + * + * TODO This is a data race. It is only transiently true. + */ + awaitHAStatus(startup.serverB, HAStatusEnum.NotReady); - // B is still a follower. - awaitHAStatus(startup.serverB, HAStatusEnum.Follower); - - /* - * B should go into an ERROR state and then into SeekConsensus and - * from there to RESYNC and finally back to RunMet. We can not - * reliably observe the intervening states. So what we really need - * to do is watch for B to move to the end of the pipeline and catch - * up to the same commit point. - * - * FIXME This is forcing B into an error state to simulate what - * would happen if B had encountered an error during the 2-phase - * commit above. - */ - ((HAGlueTest)startup.serverB).enterErrorState(); + /* + * The pipeline should be reordered. B will do a service leave, then + * enter seek consensus, and then re-enter the pipeline. + */ + awaitPipeline(new HAGlue[] { startup.serverA, startup.serverC, + startup.serverB }); - /* - * The pipeline should be reordered. B will do a service leave, then - * enter seek consensus, and then re-enter the pipeline. - */ - awaitPipeline(new HAGlue[] { startup.serverA, startup.serverC, - startup.serverB }); + /* + * IF you allow the purge of the HALog files on a fully met commit AND a + * service fails in commit2Phase() for a fully met quorum THEN the other + * services will have purged their HALog files and the service that + * failed in commit2Phase() will be unable to resync and join the met + * quorum. + */ + awaitRunStateEnum(RunStateEnum.Operator, startup.serverB); + awaitHAStatus(startup.serverB, HAStatusEnum.NotReady); - awaitFullyMetQuorum(); - - /* - * There should be two commit points on {A,C,B} (note that this - * assert does not pay attention to the pipeline order). - */ - awaitCommitCounter(2L, startup.serverA, startup.serverC, - startup.serverB); + // There should be two commit points on {A,C}. + awaitCommitCounter(2L, startup.serverA, startup.serverC); - // B should be a follower again. - awaitHAStatus(startup.serverB, HAStatusEnum.Follower); + // Just one commit point on B. + awaitCommitCounter(1L, startup.serverB); - // quorum token is unchanged. - assertEquals(token, quorum.token()); + // quorum token is unchanged. + assertEquals(token, quorum.token()); - } else { - - try { - - // Simple transaction. - simpleTransaction(); - - fail("Expecting failed transaction"); - - } catch (Exception t) { - - if (!t.getMessage().contains( - SpuriousTestException.class.getName())) { - /* - * Wrong inner cause. - * - * Note: The stack trace of the local exception does not - * include the remote stack trace. The cause is formatted - * into the HTTP response body. - */ - fail("Expecting " + SpuriousTestException.class, t); - } - - } - - // Verify quorum is unchanged. - assertEquals(token, quorum.token()); - - // Should be ONE commit point on {A,B, C]. - awaitCommitCounter(1L, startup.serverA, startup.serverB, - startup.serverC); - - fail("finish test under these assumptions"); - - } - } - + /** * Unit test for failure to RESYNC having a root cause that the live HALog * file did not exist on the quorum leader after an abort2Phase() call. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-04 16:42:36
|
Revision: 7508 http://bigdata.svn.sourceforge.net/bigdata/?rev=7508&view=rev Author: thompsonbry Date: 2013-11-04 16:42:25 +0000 (Mon, 04 Nov 2013) Log Message: ----------- See #764 (RESYNC failure) The root cause that abort2Phase() discards the live HALog file and does not immediately re-create that HALog file. Given A + B + C, if all three services are at the same commit point and there is an attempt to commit which fails, causing abort2Phase() to be invoked. If C is then shutdown it will fail to RESYNC (per this ticket) since the HALog file is missing on the leader (A). abort2Phase() called doLocalAbort() which called through to discardWriteSet(), which was calling disableHALog() and deleting the live HALog file. We have modified discardWriteSet() to create the new live HALog file IF the service is joined with the met quorum at the time that discardWriteSet() was invoked. This should fix the problem with abort2Phase() leaving the leader without a live HALog file. We have also modified getHARootBlocksForWriteSet() to conditionally create the live HALog file if it does not exist. This is the method that is invoked by the RESYNC task. Thus, by ensuring that the live HALog file exists here we can make RESYNC much more robust. We believe that a data race exists between when a service is elected as the leader and when the HALog file would normally be created. It is possible that a 3rd service in a simultaneous restart of {A,B,C} could attempt to obtain the live HALog file before the leader would have otherwise created it. Explicitly creating the live HALog within getHARootBlocksForWriteSet() closes out this possible deadlock. - conditionalCreateHALog() was moved to HALogNexus. - AbstractJournal._abort() was made robust to the case were the HAQuorumService was not running. - HALogWriter: there were some methods that failed to obtain the m_stateLock before examining fields that were protected by that lock. Those methods included: getCommitCounter(), getSequence(), and isHALogOpen(). - CommitCounterUtility: exposed method to format the commitCounter as a 21-digit string with leading zeros. - HAStatuServlet: modified to disclose the most current historical HALog file (if any) and the live HALog file (if it exists). This was done to help diagnose the problem associated with this ticket. - HAJournalServer::ResyncTask().doRun() - remove the conditionalCreateHALog() invocation. There was no reason to create the live HALog on the service that was trying to resync. The code comment indicated that this might have been done because of test suite expectations, but the test suite runs fine without this. - HA CI test suite: Added a number of startABC_restartX tests in an effort to recreate the problem associated with this ticket. These tests did not managed to recreate the problem, but they are being retained because I do not see obvious overlap in the existing test suite with these tests. - TestHAJournalServerOverride.testStartABC_abort2Phase_restartC() :: Created a unit test that replicates the problem against the revision of the code before this commit. This test can be observed to fail if you disable the HALog create in both HAJournal.getHARootBlocksForWriteSet() and HAJournalServer.discardWriteSet(). Note: If this solution proves to be robust, then we could consider re-enabling the automatic logic to do a disaster rebuild rather than transitioning to an OPERATOR state. However, transitioning to an OPERATOR state is perhaps wise regardless. HA CI is locally green. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/CommitCounterUtility.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -37,6 +37,7 @@ import org.apache.log4j.Logger; +import com.bigdata.ha.msg.HAWriteMessage; import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; @@ -51,10 +52,10 @@ /** * Wrapper class to handle process log creation and output for HA. - * - * The process log stores the HAWriteMessages and buffers to support reading and - * reprocessing as part of the HA synchronization protocol. - * + * <p> + * The process log stores the {@link HAWriteMessage} and buffers to support + * reading and reprocessing as part of the HA synchronization protocol. + * <p> * The writer encapsulates not only the writing of individual messages but also * the closing and creation of new files. * @@ -128,27 +129,32 @@ /** current write point on the channel. */ private long m_position = headerSize0; - /** - * Return the commit counter that is expected for the writes that will be - * logged (the same commit counter that is on the opening root block). - */ + @Override public long getCommitCounter() { - assertOpen(); + final Lock lock = m_stateLock.readLock(); + lock.lock(); + try { + assertOpen(); + return m_rootBlock.getCommitCounter(); + } finally { + lock.unlock(); + } - return m_rootBlock.getCommitCounter(); - } - /** - * Return the sequence number that is expected for the next write. - */ + @Override public long getSequence() { - assertOpen(); + final Lock lock = m_stateLock.readLock(); + lock.lock(); + try { + assertOpen(); + return m_nextSequence; + } finally { + lock.unlock(); + } - return m_nextSequence; - } /** @@ -162,9 +168,16 @@ } + @Override public boolean isHALogOpen() { - - return m_state != null && !m_state.isCommitted(); + + final Lock lock = m_stateLock.readLock(); + lock.lock(); + try { + return m_state != null && !m_state.isCommitted(); + } finally { + lock.unlock(); + } } @@ -179,7 +192,6 @@ } finally { lock.unlock(); } - } /** @@ -225,6 +237,7 @@ } + @Override public String toString() { final IRootBlockView tmp = m_rootBlock; @@ -375,6 +388,7 @@ * The final root block for the write set. * @throws IOException */ + @Override public void closeHALog(final IRootBlockView rootBlock) throws FileNotFoundException, IOException { @@ -472,6 +486,7 @@ * @throws IOException * if we can not write on the log. */ + @Override public void writeOnHALog(final IHAWriteMessage msg, final ByteBuffer data) throws IOException, IllegalStateException { Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -2835,10 +2835,26 @@ * last live HA message). */ - final QuorumService<HAGlue> localService = quorum.getClient(); + QuorumService<HAGlue> localService = null; + try { + + localService = quorum.getClient(); + + } catch (IllegalStateException ex) { + + /* + * Note: Thrown if the QuorumService is not running. + */ + + // ignore. + } - localService.discardWriteSet(); - + if (localService != null) { + + localService.discardWriteSet(); + + } + } if (log.isInfoEnabled()) @@ -3854,11 +3870,11 @@ final String msg = "commit: commitTime=" + cs.commitTime + ", latency=" - + TimeUnit.NANOSECONDS.toMillis(elapsedNanos) - + ", nextOffset=" - + cs.newRootBlock.getNextOffset() - + ", byteCount=" - + (cs.newRootBlock.getNextOffset() - cs.byteCountBefore); + + TimeUnit.NANOSECONDS.toMillis(elapsedNanos); +// + ", nextOffset=" +// + cs.newRootBlock.getNextOffset() +// + ", byteCount=" +// + (cs.newRootBlock.getNextOffset() - cs.byteCountBefore); if (BigdataStatics.debug) System.err.println(msg); else if (log.isInfoEnabled()) @@ -7694,6 +7710,7 @@ this.abortMessage = abortMessage; } + @Override public void run() { try { @@ -7731,7 +7748,7 @@ // ALWAYS go through the local abort. doLocalAbort(); - + } } @@ -7750,6 +7767,7 @@ * @todo Since these are rare events it may not be worthwhile to setup a * separate low-level socket service to send/receive the data. */ + @Override public Future<IHAReadResponse> readFromDisk( final IHAReadRequest msg) { @@ -7762,7 +7780,8 @@ final FutureTask<IHAReadResponse> ft = new FutureTask<IHAReadResponse>( new Callable<IHAReadResponse>() { - + + @Override public IHAReadResponse call() throws Exception { if (haLog.isInfoEnabled()) Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/CommitCounterUtility.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/CommitCounterUtility.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/CommitCounterUtility.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -68,21 +68,8 @@ * the file name and then partitioning it into groups of THREE (3) * digits. */ - final String basename; - { + final String basename = getCommitCounterStr(commitCounter); - final StringBuilder sb = new StringBuilder(); - - final Formatter f = new Formatter(sb); - - f.format("%021d", commitCounter); - f.flush(); - f.close(); - - basename = sb.toString(); - - } - /* * Now figure out the recursive directory name. */ @@ -105,6 +92,32 @@ } /** + * Format the commit counter with leading zeros such that it will be + * lexically ordered in the file system. + * + * @param commitCounter + * The commit counter. + * + * @return The basename of the file consisting of the foramtted commit + * counter with the appropriate leading zeros. + */ + public static String getCommitCounterStr(final long commitCounter) { + + final StringBuilder sb = new StringBuilder(21); + + final Formatter f = new Formatter(sb); + + f.format("%021d", commitCounter); + f.flush(); + f.close(); + + final String basename = sb.toString(); + + return basename; + + } + + /** * Parse out the commitCounter from the file name. * * @param name Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -102,6 +102,7 @@ import com.bigdata.service.proxy.RemoteFuture; import com.bigdata.service.proxy.RemoteFutureImpl; import com.bigdata.service.proxy.ThickFuture; +import com.bigdata.util.StackInfoReport; /** * A {@link Journal} that that participates in a write replication pipeline. The @@ -846,7 +847,44 @@ final Lock logLock = getHALogNexus().getLogLock(); logLock.lock(); try { - + + /* + * Verify that this service is the quorum leader. + */ + final long token = getQuorum().token(); + + // Method is only allowed on the quorum leader. + getQuorumService().assertLeader(token); + + if (!getHALogNexus().isHALogOpen()) { + + /** + * The live HALog should always exist on the leader. + * However, the leader is defined by the zookeeper state and + * it is possible that the leader has not yet gone through + * the code path to create the HALog. Thus, for safety, we + * ensure that the live HALog exists here. + * + * Note: we are holding the logLock. + * + * Note: This only causes the HALog to be created on the + * quorum leader. HAJournalServer.discardWriteSet() handles + * this for both the leader and the joined followers. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/764" + * > RESYNC fails (HA) </a> + */ + + if (haLog.isInfoEnabled()) + log.info( + "Live HALog does not exist on the quorum leader", + new StackInfoReport()); + + getHALogNexus().conditionalCreateHALog(); + + } + // The commit counter of the desired closing root block. final long commitCounter = msg.getCommitCounter(); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -1382,17 +1382,30 @@ journal.getHALogNexus().lastLiveHAWriteMessage = null; if (journal.getHALogNexus().isHALogOpen()) { - /* - * Note: Closing the HALog is necessary for us to be able to - * re-enter SeekConsensus without violating a pre-condition - * for that run state. - */ + /** + * Note: Closing the HALog is necessary for us to be able to + * re-enter SeekConsensus without violating a pre-condition + * for that run state. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/764" + * > RESYNC fails (HA) </a> + */ try { journal.getHALogNexus().disableHALog(); } catch (IOException e) { log.error(e, e); } - } + final long token = getQuorum().token(); + if (isJoinedMember(token)) { + try { + journal.getHALogNexus().createHALog( + journal.getRootBlockView()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } } finally { logLock.unlock(); } @@ -1767,6 +1780,7 @@ public QuorumMeetTask(final long token, final UUID leaderId) { this.token = token; } + @Override public Void call() throws Exception { journal.setQuorumToken(token); if (isJoinedMember(token)) { @@ -2834,9 +2848,6 @@ journal.doLocalAbort(); - // Sets up expectations (maybe just for the test suite?) - conditionalCreateHALog(); - /* * We will do a local commit with each HALog (aka write set) * that is replicated. This let's us catch up incrementally with @@ -3393,49 +3404,6 @@ } - /** - * Conditionally create the HALog. - * <p> - * Refactored out of {@link #pipelineSetup()} since - * {@link #discardWriteSet()} now removes the current HALog. Therefore, - * the {@link ResyncTask} needs to call - * {@link #conditionalCreateHALog()} <em>after</em> it calls - * {@link AbstractJournal#doLocalAbort()}. - * - * @throws FileNotFoundException - * @throws IOException - */ - private void conditionalCreateHALog() throws FileNotFoundException, - IOException { - - logLock.lock(); - - try { - - if (!journal.getHALogNexus().isHALogOpen()) { - - /* - * Open the HALogWriter for our current root blocks. - * - * Note: We always use the current root block when receiving - * an HALog file, even for historical writes. This is - * because the historical log writes occur when we ask the - * leader to send us a prior commit point in RESYNC. - */ - - journal.getHALogNexus().createHALog( - journal.getRootBlockView()); - - } - - } finally { - - logLock.unlock(); - - } - - } - @Override protected void handleReplicatedWrite(final IHASyncRequest req, final IHAWriteMessage msg, final ByteBuffer data) @@ -3472,7 +3440,7 @@ logLock.lock(); try { - conditionalCreateHALog(); + journal.getHALogNexus().conditionalCreateHALog(); if (haLog.isDebugEnabled()) haLog.debug("msg=" + msg + ", buf=" + data); @@ -4078,7 +4046,7 @@ try { - conditionalCreateHALog(); + journal.getHALogNexus().conditionalCreateHALog(); /* * Throws IllegalStateException if the message is not Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -33,7 +33,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -1054,6 +1053,43 @@ } + /** + * Conditionally create the HALog. + * + * @throws FileNotFoundException + * @throws IOException + */ + public void conditionalCreateHALog() throws FileNotFoundException, + IOException { + + logLock.lock(); + + try { + + if (!isHALogOpen()) { + + /* + * Open the HALogWriter for our current root blocks. + * + * Note: We always use the current root block when receiving an + * HALog file, even for historical writes. This is because the + * historical log writes occur when we ask the leader to send us + * a prior commit point in RESYNC. + */ + + createHALog(journal.getRootBlockView()); + + } + + } finally { + + logLock.unlock(); + + } + + } + + @Override public boolean isHALogOpen() { logLock.lock(); @@ -1070,6 +1106,7 @@ } + @Override public void closeHALog(final IRootBlockView rootBlock) throws IOException { @@ -1116,6 +1153,7 @@ } + @Override public void disableHALog() throws IOException { logLock.lock(); @@ -1132,6 +1170,7 @@ } + @Override public void writeOnHALog(final IHAWriteMessage msg, final ByteBuffer data) throws IOException, IllegalStateException { Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -606,16 +606,26 @@ }, 5, TimeUnit.SECONDS); } - + + /** + * Start A then B then C. As each service starts, this method waits for that + * service to appear in the pipeline in the proper position. + * + * @return The ordered array of services <code>[A, B, C]</code> + */ protected HAGlue[] startSequenceABC() throws Exception { - startA(); - awaitPipeline(new HAGlue[] {serverA}); - startB(); - awaitPipeline(new HAGlue[] {serverA, serverB}); - startC(); - awaitPipeline(new HAGlue[] {serverA, serverB, serverC}); - - return new HAGlue[] {serverA, serverB, serverC}; + + startA(); + awaitPipeline(new HAGlue[] { serverA }); + + startB(); + awaitPipeline(new HAGlue[] { serverA, serverB }); + + startC(); + awaitPipeline(new HAGlue[] { serverA, serverB, serverC }); + + return new HAGlue[] { serverA, serverB, serverC }; + } /* Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -85,11 +85,16 @@ import com.bigdata.ha.msg.IHAWriteSetStateResponse; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.IRootBlockView; +import com.bigdata.journal.ITx; import com.bigdata.journal.jini.ha.HAJournalServer.HAQuorumService; import com.bigdata.journal.jini.ha.HAJournalServer.RunStateEnum; import com.bigdata.quorum.AsynchronousQuorumCloseException; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.zk.ZKQuorumImpl; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.sail.BigdataSailRepository; +import com.bigdata.rdf.sail.BigdataSailRepositoryConnection; +import com.bigdata.rdf.store.AbstractTripleStore; import com.bigdata.service.jini.RemoteDestroyAdmin; /** @@ -287,6 +292,12 @@ */ public RunStateEnum getRunStateEnum() throws IOException; + /** + * Run a simple update transaction on the quorum leader, but abort() + * rather than committing the transaction. + */ + public void simpleTransaction_abort() throws IOException, Exception; + } /** @@ -1167,6 +1178,65 @@ // @Override + @Override + public void simpleTransaction_abort() throws IOException, Exception { + + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = getQuorum(); + + // Note: Throws IllegalStateException if quorum is not running. + final QuorumService<HAGlue> quorumService = quorum.getClient(); + + final long token = quorum.token(); + + // This service must be the quorum leader. + quorumService.assertLeader(token); + + // The default namespace. + final String namespace = BigdataSail.Options.DEFAULT_NAMESPACE; + + // resolve the default namespace. + final AbstractTripleStore tripleStore = (AbstractTripleStore) getIndexManager() + .getResourceLocator().locate(namespace, ITx.UNISOLATED); + + if (tripleStore == null) { + + throw new RuntimeException("Not found: namespace=" + namespace); + + } + + // Wrap with SAIL. + final BigdataSail sail = new BigdataSail(tripleStore); + + final BigdataSailRepository repo = new BigdataSailRepository(sail); + + repo.initialize(); + + final BigdataSailRepositoryConnection conn = (BigdataSailRepositoryConnection) repo + .getUnisolatedConnection(); + + try { + +// conn.setAutoCommit(false); +// +// final ValueFactory f = sail.getValueFactory(); +// +// conn.add(f.createStatement( +// f.createURI("http://www.bigdata.com"), RDF.TYPE, +// RDFS.RESOURCE), null/* contexts */); +// +// conn.flush(); + + // Fall through. + + } finally { + + // Force abort. + conn.rollback(); + + } + + } + } // class HAGlueTestImpl private static class MyPrepareMessage implements IHA2PhasePrepareMessage { Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -82,7 +82,6 @@ */ public class TestHA3JournalServer extends AbstractHA3JournalServerTestCase { - /** * {@inheritDoc} * <p> @@ -558,6 +557,181 @@ } /** + * Unit test for a situation in which A B and C start. A quorum mets and the + * third service resyncs with the met quorum. The quorum then fully meets. + * Once the fully met quorum is stable, C is then restarted. This test + * exercises a code path that handles the case where C is current, but is + * forced into RESYNC in case there are writes in progress on the leader. + * <p> + * Note: In this version of the test, the HALog files are purged at each + * commit of the fully met quorum. Another version of this test exists in + * which the HALog files are NOT purged at each commit of a fully met + * quorum. + */ + public void testStartABC_restartC() throws Exception { + + final ABC x = new ABC(true/*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // Now run several transactions + final int NTX = 5; + for (int i = 0; i < NTX; i++) + simpleTransaction(); + + // wait until the commit point is registered on all services. + awaitCommitCounter(NTX + 1L, new HAGlue[] { x.serverA, x.serverB, + x.serverC }); + + /* + * The same number of HALog files should exist on all services. + * + * Note: the restore policy is setup such that we are purging the HALog + * files at each commit of a fully met quorum. + */ + awaitLogCount(getHALogDirA(), 1L); + awaitLogCount(getHALogDirB(), 1L); + awaitLogCount(getHALogDirC(), 1L); + + // shutdown C. + shutdownC(); + + // wait for C to be gone from zookeeper. + awaitPipeline(new HAGlue[] { x.serverA, x.serverB }); + awaitMembers(new HAGlue[] { x.serverA, x.serverB }); + awaitJoined(new HAGlue[] { x.serverA, x.serverB }); + + // restart C. + /*final HAGlue serverC =*/ startC(); + + // wait until the quorum fully meets again (on the same token). + assertEquals(token, awaitFullyMetQuorum()); + + // Verify expected HALog files. + awaitLogCount(getHALogDirA(), 1L); + awaitLogCount(getHALogDirB(), 1L); + awaitLogCount(getHALogDirC(), 1L); + + } + + /** + * Unit test for a situation in which A B and C start. A quorum mets and the + * third service resyncs with the met quorum. The quorum then fully meets. + * Once the fully met quorum is stable, B is then restarted. The pipeline is + * reorganized when B is shutdown but the quorum does not break. This test + * exercises a code path that handles the case where B is current, but is + * forced into RESYNC in case there are writes in progress on the leader. + * <p> + * Note: In this version of the test, the HALog files are NOT purged at each + * commit of the fully met quorum. + */ + public void testStartABC_restartB() throws Exception { + + final ABC x = new ABC(true/*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // Now run several transactions + final int NTX = 5; + for (int i = 0; i < NTX; i++) + simpleTransaction(); + + // wait until the commit point is registered on all services. + awaitCommitCounter(NTX + 1L, new HAGlue[] { x.serverA, x.serverB, + x.serverC }); + + /* + * The same number of HALog files should exist on all services. + * + * Note: the restore policy is setup such that we are purging the HALog + * files at each commit of a fully met quorum. + */ + awaitLogCount(getHALogDirA(), 1L); + awaitLogCount(getHALogDirB(), 1L); + awaitLogCount(getHALogDirC(), 1L); + + // shutdown B. + shutdownB(); + + // wait for B to be gone from zookeeper. + awaitPipeline(new HAGlue[] { x.serverA, x.serverC }); + awaitMembers(new HAGlue[] { x.serverA, x.serverC }); + awaitJoined(new HAGlue[] { x.serverA, x.serverC }); + + // restart B. + /*final HAGlue serverB =*/ startB(); + + // wait until the quorum fully meets again (on the same token). + assertEquals(token, awaitFullyMetQuorum()); + + // Verify expected HALog files. + awaitLogCount(getHALogDirA(), 1L); + awaitLogCount(getHALogDirB(), 1L); + awaitLogCount(getHALogDirC(), 1L); + + } + + /** + * Unit test for a situation in which A B and C start. A quorum mets and the + * third service resyncs with the met quorum. The quorum then fully meets. + * Once the fully met quorum is stable, A is then restarted. The pipeline is + * reorganized when A is shutdown and a new leader is elected. This test + * exercises a code path that handles the case where A is current, but is + * forced into RESYNC in case there are writes in progress on the leader. + * <p> + * Note: In this version of the test, the HALog files are NOT purged at each + * commit of the fully met quorum. + */ + public void testStartABC_restartA() throws Exception { + + final ABC x = new ABC(true/*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // Now run several transactions + final int NTX = 5; + for (int i = 0; i < NTX; i++) + simpleTransaction(); + + // wait until the commit point is registered on all services. + awaitCommitCounter(NTX + 1L, new HAGlue[] { x.serverA, x.serverB, + x.serverC }); + + /* + * The same number of HALog files should exist on all services. + * + * Note: the restore policy is setup such that we are purging the HALog + * files at each commit of a fully met quorum. + */ + awaitLogCount(getHALogDirA(), 1L); + awaitLogCount(getHALogDirB(), 1L); + awaitLogCount(getHALogDirC(), 1L); + + // shutdown A. + shutdownA(); + + // wait for A to be gone from zookeeper. +// awaitPipeline(new HAGlue[] { x.serverA, x.serverC }); +// awaitMembers(new HAGlue[] { x.serverA, x.serverC }); +// awaitJoined(new HAGlue[] { x.serverA, x.serverC }); + + // since the leader failed over, the quorum meets on a new token. + final long token2 = awaitNextQuorumMeet(token); + + // restart A. + /*final HAGlue serverA =*/ startA(); + + // wait until the quorum fully meets again (on the same token). + assertEquals(token2, awaitFullyMetQuorum()); + + // Verify expected HALog files. + awaitLogCount(getHALogDirA(), 1L); + awaitLogCount(getHALogDirB(), 1L); + awaitLogCount(getHALogDirC(), 1L); + + } + + /** * Unit test of the ability to go through a simultaneous restart of all * services once those services are no longer at commit point 0. Two * services will meet on the lastCommitTime. The third will need to RESYNC @@ -2405,7 +2579,6 @@ } - /** * Tests shutdown of met quorum, but leader first forces re-organisation concurrent * with service shutdown Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -269,5 +269,178 @@ awaitLogCount(getHALogDirC(), commitCounter2+1); } + + /** + * Unit test for a situation in which A B and C start. A quorum mets and the + * third service resyncs with the met quorum. The quorum then fully meets. + * Once the fully met quorum is stable, C is then restarted. This test + * exercises a code path that handles the case where C is current, but is + * forced into RESYNC in case there are writes in progress on the leader. + * <p> + * Note: In this version of the test, the HALog files are NOT purged at each + * commit of the fully met quorum. + */ + public void testStartABC_restartC() throws Exception { + + final ABC x = new ABC(true/*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // Now run several transactions + final int NTX = 5; + for (int i = 0; i < NTX; i++) + simpleTransaction(); + // wait until the commit point is registered on all services. + awaitCommitCounter(NTX + 1L, new HAGlue[] { x.serverA, x.serverB, + x.serverC }); + + /* + * The same number of HALog files should exist on all services. + * + * Note: the restore policy is setup such that we are NOT purging the HALog + * files at each commit of a fully met quorum. + */ + awaitLogCount(getHALogDirA(), NTX + 2L); + awaitLogCount(getHALogDirB(), NTX + 2L); + awaitLogCount(getHALogDirC(), NTX + 2L); + + // shutdown C. + shutdownC(); + + // wait for C to be gone from zookeeper. + awaitPipeline(new HAGlue[] { x.serverA, x.serverB }); + awaitMembers(new HAGlue[] { x.serverA, x.serverB }); + awaitJoined(new HAGlue[] { x.serverA, x.serverB }); + + // restart C. + /*final HAGlue serverC =*/ startC(); + + // wait until the quorum fully meets again (on the same token). + assertEquals(token, awaitFullyMetQuorum()); + + // Verify expected HALog files. + awaitLogCount(getHALogDirA(), NTX + 2L); + awaitLogCount(getHALogDirB(), NTX + 2L); + awaitLogCount(getHALogDirC(), NTX + 2L); + + } + + /** + * Unit test for a situation in which A B and C start. A quorum mets and the + * third service resyncs with the met quorum. The quorum then fully meets. + * Once the fully met quorum is stable, B is then restarted. The pipeline is + * reorganized when B is shutdown but the quorum does not break. This test + * exercises a code path that handles the case where B is current, but is + * forced into RESYNC in case there are writes in progress on the leader. + * <p> + * Note: In this version of the test, the HALog files are NOT purged at each + * commit of the fully met quorum. + */ + public void testStartABC_restartB() throws Exception { + + final ABC x = new ABC(true/*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // Now run several transactions + final int NTX = 5; + for (int i = 0; i < NTX; i++) + simpleTransaction(); + + // wait until the commit point is registered on all services. + awaitCommitCounter(NTX + 1L, new HAGlue[] { x.serverA, x.serverB, + x.serverC }); + + /* + * The same number of HALog files should exist on all services. + * + * Note: the restore policy is setup such that we are purging the HALog + * files at each commit of a fully met quorum. + */ + awaitLogCount(getHALogDirA(), NTX + 2L); + awaitLogCount(getHALogDirB(), NTX + 2L); + awaitLogCount(getHALogDirC(), NTX + 2L); + + // shutdown B. + shutdownB(); + + // wait for B to be gone from zookeeper. + awaitPipeline(new HAGlue[] { x.serverA, x.serverC }); + awaitMembers(new HAGlue[] { x.serverA, x.serverC }); + awaitJoined(new HAGlue[] { x.serverA, x.serverC }); + + // restart B. + /*final HAGlue serverB =*/ startB(); + + // wait until the quorum fully meets again (on the same token). + assertEquals(token, awaitFullyMetQuorum()); + + // Verify expected HALog files. + awaitLogCount(getHALogDirA(), NTX + 2L); + awaitLogCount(getHALogDirB(), NTX + 2L); + awaitLogCount(getHALogDirC(), NTX + 2L); + + } + + /** + * Unit test for a situation in which A B and C start. A quorum mets and the + * third service resyncs with the met quorum. The quorum then fully meets. + * Once the fully met quorum is stable, A is then restarted. The pipeline is + * reorganized when A is shutdown and a new leader is elected. This test + * exercises a code path that handles the case where A is current, but is + * forced into RESYNC in case there are writes in progress on the leader. + * <p> + * Note: In this version of the test, the HALog files are NOT purged at each + * commit of the fully met quorum. + */ + public void testStartABC_restartA() throws Exception { + + final ABC x = new ABC(true/*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // Now run several transactions + final int NTX = 5; + for (int i = 0; i < NTX; i++) + simpleTransaction(); + + // wait until the commit point is registered on all services. + awaitCommitCounter(NTX + 1L, new HAGlue[] { x.serverA, x.serverB, + x.serverC }); + + /* + * The same number of HALog files should exist on all services. + * + * Note: the restore policy is setup such that we are NOT purging the HALog + * files at each commit of a fully met quorum. + */ + awaitLogCount(getHALogDirA(), NTX + 2L); + awaitLogCount(getHALogDirB(), NTX + 2L); + awaitLogCount(getHALogDirC(), NTX + 2L); + + // shutdown A. + shutdownA(); + + // wait for A to be gone from zookeeper. +// awaitPipeline(new HAGlue[] { x.serverA, x.serverC }); +// awaitMembers(new HAGlue[] { x.serverA, x.serverC }); +// awaitJoined(new HAGlue[] { x.serverA, x.serverC }); + + // since the leader failed over, the quorum meets on a new token. + final long token2 = awaitNextQuorumMeet(token); + + // restart A. + /*final HAGlue serverA =*/ startA(); + + // wait until the quorum fully meets again (on the same token). + assertEquals(token2, awaitFullyMetQuorum()); + + // Verify expected HALog files. + awaitLogCount(getHALogDirA(), NTX + 2L); + awaitLogCount(getHALogDirB(), NTX + 2L); + awaitLogCount(getHALogDirC(), NTX + 2L); + + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -550,7 +550,7 @@ // Verify quorum is unchanged. assertEquals(token, quorum.token()); - // Should be two commit points on {A,C]. + // Should be two commit points on {A,C}. awaitCommitCounter(2L, startup.serverA, startup.serverC); // Just one commit point on B. @@ -633,6 +633,76 @@ } /** + * Unit test for failure to RESYNC having a root cause that the live HALog + * file did not exist on the quorum leader after an abort2Phase() call. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/764" > + * RESYNC fails (HA) </a> + */ + public void testStartABC_abort2Phase_restartC() throws Exception { + + final ABC x = new ABC(true/*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // Now run several transactions + final int NTX = 5; + for (int i = 0; i < NTX; i++) + simpleTransaction(); + + // wait until the commit point is registered on all services. + awaitCommitCounter(NTX + 1L, new HAGlue[] { x.serverA, x.serverB, + x.serverC }); + + /* + * The same number of HALog files should exist on all services. + * + * Note: the restore policy is setup such that we are purging the HALog + * files at each commit of a fully met quorum. + */ + awaitLogCount(getHALogDirA(), 1L); + awaitLogCount(getHALogDirB(), 1L); + awaitLogCount(getHALogDirC(), 1L); + + // shutdown C. + shutdownC(); + + // wait for C to be gone from zookeeper. + awaitPipeline(new HAGlue[] { x.serverA, x.serverB }); + awaitMembers(new HAGlue[] { x.serverA, x.serverB }); + awaitJoined(new HAGlue[] { x.serverA, x.serverB }); + + /* + * Run a transaction that forces a 2-phase abort. + * + * Note: Historically, this would cause the live HALog on the leader to + * be disabled (aka deleted) without causing that live HALog file to be + * recreated. This is ticket #764. + */ + ((HAGlueTest) x.serverA).simpleTransaction_abort(); + + /* + * Restart C. + * + * Note: C will go into RESYNC. Since all services are at the same + * commit point, C will attempt to replicate the live HALog from the + * leader. Once it obtains that HALog, it should figure out that it has + * the live HALog and attempt to transition atomically to a joined + * service. + */ + /*final HAGlue serverC =*/ startC(); + + // wait until the quorum fully meets again (on the same token). + assertEquals(token, awaitFullyMetQuorum()); + + // Verify expected HALog files. + awaitLogCount(getHALogDirA(), 1L); + awaitLogCount(getHALogDirB(), 1L); + awaitLogCount(getHALogDirC(), 1L); + + } + + /** * 2 services start, quorum meets then we bounce the zookeeper connection * for the follower and verify that the quorum meets again. * <p> Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -625,7 +625,7 @@ * * @see Options */ - public BigdataSail(Properties properties) { + public BigdataSail(final Properties properties) { this(createLTS(properties)); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java 2013-11-01 21:10:45 UTC (rev 7507) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java 2013-11-04 16:42:25 UTC (rev 7508) @@ -48,6 +48,7 @@ import com.bigdata.ha.msg.HARemoteRebuildRequest; import com.bigdata.ha.msg.HASnapshotRequest; import com.bigdata.ha.msg.IHARemoteRebuildRequest; +import com.bigdata.journal.CommitCounterUtility; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.RootBlockView; @@ -326,8 +327,9 @@ int nfiles = 0; long nbytes = 0L; final Iterator<IHALogRecord> itr = nexus.getHALogs(); + IHALogRecord r = null; while (itr.hasNext()) { - final IHALogRecord r = itr.next(); + r = itr.next(); nbytes += r.sizeOnDisk(); nfiles++; } @@ -339,13 +341,25 @@ nbytes += currentFile.length(); nfiles++; } - final String compressorKey = journal.getProperties().getProperty( - com.bigdata.journal.Options.HALOG_COMPRESSOR, - com.bigdata.journal.Options.DEFAULT_HALOG_COMPRESSOR); - p.text("HALogDir: nfiles=" + nfiles + ", nbytes=" + nbytes - + ", path=" + nexus.getHALogDir() - + ", compressorKey=" + compressorKey).node("br") - .close(); + final String compressorKey = journal + .getProperties() + .getProperty( + com.bigdata.journal.Options.HALOG_COMPRESSOR, + com.bigdata.journal.Options.DEFAULT_HALOG_COMPRESSOR); + p.text("HALogDir: nfiles=" + + nfiles + + ", nbytes=" + + nbytes + + ", path=" + + nexus.getHALogDir() + + ", compressorKey=" + + compressorKey + + ", lastHALogClosed=" + + (r == null ? "N/A" : CommitCounterUtility + .getCommitCounterStr(r.getCommitCounter())) + + ", liveLog=" + + (currentFile == null ? "N/A" : currentFile + .getName())).node("br").close(); } if (digests) { /* This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-01 21:10:53
|
Revision: 7507 http://bigdata.svn.sourceforge.net/bigdata/?rev=7507&view=rev Author: thompsonbry Date: 2013-11-01 21:10:45 +0000 (Fri, 01 Nov 2013) Log Message: ----------- Continued work on #760 (2-phase commit semantics review) I have analyzed the root cause of the override test in which B is told to reject the commit. The test actually relies on an exception being thrown back when the commit2Phase message is received. Thus, B is not actually encountering an error when handling that message and is not being transitioned automatically to the ERROR state. I have further cleaned up the 2-phase commit logic in both QuorumCommitImpl? and AbstractJournal?. The COMMIT behavior for 2-phase commit is now rejected IFF we lack a majority of services that successfully executed the COMMIT (and for which the leader has received notice of that success). The leader will failover if the COMMIT phase fails and the transaction will be failed. (If the PREPARE phase fails, then the transaction is failed, but the leader does not fail over). I am now encountering a problem with this test where B goes into the Operator state because it is looking for an HALog that does not exist. I had observed this in the recent longevity tests. Now we have a unit test that can recreate the problem, which is great. http://192.168.1.133:8091 : is not joined, pipelineOrder=2, writePipelineAddr=localhost/127.0.0.1:9091, service=other, extendedRunState={server=Running, quorumService=Operator @ 1, haReady=-1, haStatus=NotReady, serviceId=e832b5c0-6c9b-444d-8946-514eacc5e329, now=1383332021358, msg=[HALog not available: commitCounter=2]} QuorumCommitImpl? was modified to return a class containing a summary of the commit response. This makes it possible to write more flexible behaviors in AbstractJournal?.commitNow(). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommit.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/CommitResponse.java Added: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/CommitResponse.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/CommitResponse.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/CommitResponse.java 2013-11-01 21:10:45 UTC (rev 7507) @@ -0,0 +1,150 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Nov 1, 2013 + */ +package com.bigdata.ha; + +import java.util.ArrayList; + +import com.bigdata.util.concurrent.ExecutionExceptions; + +/** + * Response for a 2-phase commit. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class CommitResponse { + + /** + * The COMMIT message. + */ + private final CommitRequest req; + + /** + * An array of the root cause exceptions for any errors encountered when + * instructing the services to execute the COMMIT message. The indices into + * this collection are correlated with the service join order and the + * PREPARE vote order. The leader is always at index zero. + */ + private final ArrayList<Throwable> causes; + + /** + * The number of COMMIT messages that are known to have been processed + * successfully. + */ + private final int nok; + /** + * The number of COMMIT messages that were issued and which failed. + */ + private final int nfail; + + public CommitResponse(final CommitRequest req, + final ArrayList<Throwable> causes) { + + this.req = req; + this.causes = causes; + + int nok = 0, nfail = 0; + + for (Throwable t : causes) { + + if (t == null) + nok++; // request issued and was Ok. + else + nfail++; // request issued and failed. + + } + + this.nok = nok; + this.nfail = nfail; + + } + + public boolean isLeaderOk() { + + return causes.get(0) == null; + + } + + /** + * Number of COMMIT messages that were generated and succeeded. + */ + public int getNOk() { + + return nok; + + } + + /** + * Number of COMMIT messages that were generated and failed. + */ + public int getNFail() { + + return nfail; + + } + + /** + * Return the root cause for the ith service -or- <code>null</code> if the + * COMMIT did not produce an exception for that service. + */ + public Throwable getCause(final int i) { + + return causes.get(i); + + } + + /** + * Throw out the exception(s). + * <p> + * Note: This method is guaranteed to not return normally! + * + * @throws Exception + * if one or more services that voted YES failed the COMMIT. + * + * @throws IllegalStateException + * if all services that voted YES succeeded. + */ + public void throwCauses() throws Exception { + + if (causes.isEmpty()) { + + // There were no errors. + throw new IllegalStateException(); + + } + + // Throw exception back to the leader. + if (causes.size() == 1) + throw new Exception(causes.get(0)); + + final int k = req.getPrepareResponse().replicationFactor(); + + throw new Exception("replicationFactor=" + k + ", nok=" + nok + + ", nfail=" + nfail, new ExecutionExceptions(causes)); + + } + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommit.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommit.java 2013-11-01 13:34:10 UTC (rev 7506) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommit.java 2013-11-01 21:10:45 UTC (rev 7507) @@ -28,10 +28,8 @@ package com.bigdata.ha; import java.io.IOException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import com.bigdata.journal.IRootBlockView; import com.bigdata.quorum.Quorum; /** @@ -64,12 +62,13 @@ /** * Used by the leader to send a message to each joined service in the quorum * telling it to commit using the root block from the corresponding - * {@link #prepare2Phase(IRootBlockView, long, TimeUnit) prepare} message. - * The commit MAY NOT go forward unless both the current quorum token and - * the lastCommitTime on this message agree with the quorum token and + * {@link #prepare2Phase(PrepareRequest) prepare} message. The commit MAY + * NOT go forward unless both the current quorum token and the + * lastCommitTime on this message agree with the quorum token and * lastCommitTime in the root block from the last "prepare" message. */ - void commit2Phase(CommitRequest req) throws IOException, InterruptedException; + CommitResponse commit2Phase(CommitRequest req) throws IOException, + InterruptedException; /** * Used by the leader to send a message to each service joined with the @@ -80,6 +79,6 @@ * @param token * The quorum token. */ - void abort2Phase(final long token) throws IOException, InterruptedException; + void abort2Phase(long token) throws IOException, InterruptedException; } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-11-01 13:34:10 UTC (rev 7506) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-11-01 21:10:45 UTC (rev 7507) @@ -386,8 +386,8 @@ } @Override - public void commit2Phase(final CommitRequest commitRequest) throws IOException, - InterruptedException { + public CommitResponse commit2Phase(final CommitRequest commitRequest) + throws IOException, InterruptedException { if (log.isInfoEnabled()) log.info("req=" + commitRequest); @@ -428,10 +428,26 @@ final boolean didAllServicesPrepare = prepareResponse.getYesCount() == prepareResponse .replicationFactor(); - final List<Future<Void>> localFutures = new LinkedList<Future<Void>>(); - + /* + * Note: These entries are in service join order. The first entry is + * always the leader. If a services did not vote YES for the PREPARE + * then it will have a null entry in this list. + */ + final ArrayList<Future<Void>> localFutures = new ArrayList<Future<Void>>( + joinedServiceIds.length); + + final ArrayList<Throwable> causes = new ArrayList<Throwable>(); + try { + for (int i = 0; i < joinedServiceIds.length; i++) { + + // Pre-size to ensure sufficient room for set(i,foo). + localFutures.add(null); + causes.add(null); + + } + member.assertLeader(token); final IHA2PhaseCommitMessage msgJoinedService = new HA2PhaseCommitMessage( @@ -439,8 +455,6 @@ for (int i = 1; i < joinedServiceIds.length; i++) { - final UUID serviceId = joinedServiceIds[i]; - if (!prepareResponse.getVote(i)) { // Skip services that did not vote YES in PREPARE. @@ -448,6 +462,8 @@ } + final UUID serviceId = joinedServiceIds[i]; + /* * Submit task on local executor. The task will do an RMI to the * remote service. @@ -457,7 +473,7 @@ msgJoinedService)); // add to list of futures we will check. - localFutures.add(rf); + localFutures.set(i, rf); } @@ -471,37 +487,38 @@ final Future<Void> f = leader.commit2Phase(msgJoinedService); - localFutures.add(f); + localFutures.set(0/* leader */, f); } /* * Check the futures for the other services in the quorum. */ - final List<Throwable> causes = new LinkedList<Throwable>(); - for (Future<Void> ft : localFutures) { + for (int i = 0; i < joinedServiceIds.length; i++) { + final Future<Void> ft = localFutures.get(i); + if (ft == null) + continue; try { ft.get(); // FIXME Timeout to await followers in commit2Phase(). // } catch (TimeoutException ex) { // // Timeout on this Future. // log.error(ex, ex); -// causes.add(ex); -// done = false; +// causes.set(i, ex); } catch (CancellationException ex) { // Future was cancelled. log.error(ex, ex); - causes.add(ex); + causes.set(i, ex); } catch (ExecutionException ex) { log.error(ex, ex); - causes.add(ex); + causes.set(i, ex); } catch (RuntimeException ex) { /* - * Note: ClientFuture.get() can throw a RuntimeException - * if there is a problem with the RMI call. In this case - * we do not know whether the Future is done. + * Note: ClientFuture.get() can throw a RuntimeException if + * there is a problem with the RMI call. In this case we do + * not know whether the Future is done. */ log.error(ex, ex); - causes.add(ex); + causes.set(i, ex); } finally { // Note: cancelling a *local* Future wrapping an RMI. ft.cancel(true/* mayInterruptIfRunning */); @@ -511,13 +528,7 @@ /* * If there were any errors, then throw an exception listing them. */ - if (!causes.isEmpty()) { - // Throw exception back to the leader. - if (causes.size() == 1) - throw new RuntimeException(causes.get(0)); - throw new RuntimeException("remote errors: nfailures=" - + causes.size(), new ExecutionExceptions(causes)); - } + return new CommitResponse(commitRequest, causes); } finally { Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-11-01 13:34:10 UTC (rev 7506) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-11-01 21:10:45 UTC (rev 7507) @@ -314,10 +314,10 @@ } @Override - public void commit2Phase(final CommitRequest req) throws IOException, + public CommitResponse commit2Phase(final CommitRequest req) throws IOException, InterruptedException { - commitImpl.commit2Phase(req); + return commitImpl.commit2Phase(req); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-01 13:34:10 UTC (rev 7506) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-01 21:10:45 UTC (rev 7507) @@ -96,6 +96,7 @@ import com.bigdata.counters.CounterSet; import com.bigdata.counters.Instrument; import com.bigdata.ha.CommitRequest; +import com.bigdata.ha.CommitResponse; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; import com.bigdata.ha.HATXSGlue; @@ -3527,6 +3528,26 @@ /** * HA mode commit (2-phase commit). + */ + private void commitHA() { + + try { + + prepare2Phase(); + + commit2Phase(); + + } catch (Exception e) { + + // launder throwable. + throw new RuntimeException(e); + + } + + } + + /** + * PREPARE * <p> * Note: We need to make an atomic decision here regarding whether a * service is joined with the met quorum or not. This information will @@ -3541,12 +3562,37 @@ * metadata for the znode that is the parent of the joined services. * However, we would need an expanded interface to get that metadata * from zookeeper out of the Quorum. + * + * @throws IOException + * @throws TimeoutException + * @throws InterruptedException */ - private void commitHA() { - + private void prepare2Phase() throws InterruptedException, + TimeoutException, IOException { + + boolean didPrepare = false; try { - if(!prepare2Phase()) { + // Atomic decision point for joined vs non-joined services. + prepareJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices( + quorum); + + prepareRequest = new PrepareRequest(// + consensusReleaseTime,// + gatherJoinedAndNonJoinedServices,// + prepareJoinedAndNonJoinedServices,// + newRootBlock,// + quorumService.getPrepareTimeout(), // timeout + TimeUnit.MILLISECONDS// + ); + + // issue prepare request. + prepareResponse = quorumService.prepare2Phase(prepareRequest); + + if (haLog.isInfoEnabled()) + haLog.info(prepareResponse.toString()); + + if (!prepareResponse.willCommit()) { // PREPARE rejected. throw new QuorumException("PREPARE rejected: nyes=" @@ -3554,15 +3600,12 @@ + prepareResponse.replicationFactor()); } + didPrepare = true; - commitRequest = new CommitRequest(prepareRequest, - prepareResponse); + } finally { - quorumService.commit2Phase(commitRequest); + if (!didPrepare) { - } catch (Throwable e) { - - try { /* * Something went wrong. Any services that were in the * pipeline could have a dirty write set. Services that @@ -3581,13 +3624,74 @@ * breaks, then the services will move into the Error state * and will do a local abort as part of that transition. */ - quorumService.abort2Phase(commitToken); - } catch (Throwable t) { - log.warn(t, t); + + try { + quorumService.abort2Phase(commitToken); + } catch (Throwable t) { + log.warn(t, t); + } + } - if (commitRequest != null) { + } + + } + // Fields set by the method above. + private IJoinedAndNonJoinedServices prepareJoinedAndNonJoinedServices; + private PrepareRequest prepareRequest; + private PrepareResponse prepareResponse; + + /** + * COMMIT. + * + * Pre-condition: PREPARE was successful on a majority of the services. + */ + private void commit2Phase() throws Exception { + + boolean didCommit = false; + try { + + /* + * Prepare was successful. COMMIT message has been formed. We + * will now commit. + * + * Note: The overall commit will fail unless we can prove that a + * majority of the services successfully committed. + */ + + commitRequest = new CommitRequest(prepareRequest, + prepareResponse); + + commitResponse = quorumService.commit2Phase(commitRequest); + + if (!store.quorum.isQuorum(commitResponse.getNOk())) { + /* + * Fail the commit. + * + * Note: An insufficient number of services were able to + * COMMIT successfully. + * + * Note: It is possible that a commit could be failed here + * when the commit is in fact stable on a majority of + * services. For example, with k=3 and 2 services running if + * both of them correctly update their root blocks but we + * lose network connectivity to the follower before the RMI + * returns, then we will fail the commit. + */ + + // Note: Guaranteed to not return normally! + commitResponse.throwCauses(); + + } + + didCommit = true; + + } finally { + + if (!didCommit) { + + /* * The quorum voted to commit, but something went wrong. * * This forces the leader to fail over. The quorum can then @@ -3614,55 +3718,18 @@ * another such that it will apply those HALog files on * restart and form a consensus with the other services. */ + quorumService.enterErrorState(); + } - - // Re-throw the root cause exception. - throw new RuntimeException(e); } } // Fields set by the method above. private CommitRequest commitRequest; + private CommitResponse commitResponse; - /** - * Return <code>true</code> iff the 2-phase PREPARE votes to COMMIT. - * - * @throws IOException - * @throws TimeoutException - * @throws InterruptedException - */ - private boolean prepare2Phase() throws InterruptedException, - TimeoutException, IOException { - - // Atomic decision point for joined vs non-joined services. - prepareJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices( - quorum); - - prepareRequest = new PrepareRequest(// - consensusReleaseTime,// - gatherJoinedAndNonJoinedServices,// - prepareJoinedAndNonJoinedServices,// - newRootBlock,// - quorumService.getPrepareTimeout(), // timeout - TimeUnit.MILLISECONDS// - ); - - // issue prepare request. - prepareResponse = quorumService.prepare2Phase(prepareRequest); - - if (haLog.isInfoEnabled()) - haLog.info(prepareResponse.toString()); - - return prepareResponse.willCommit(); - - } - // Fields set by the method above. - private IJoinedAndNonJoinedServices prepareJoinedAndNonJoinedServices; - private PrepareRequest prepareRequest; - private PrepareResponse prepareResponse; - } // class CommitState. /** Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-11-01 13:34:10 UTC (rev 7506) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-11-01 21:10:45 UTC (rev 7507) @@ -494,7 +494,7 @@ * * TODO Consider leader failure scenarios in this test suite, not just * scenarios where B fails. We MUST also cover failures of C (the 2nd - * follower). We should also cover scenariors where the quorum is barely met + * follower). We should also cover scenarios where the quorum is barely met * and a single failure causes a rejected commit (local decision) or 2-phase * abort (joined services in joint agreement). * @@ -515,7 +515,17 @@ awaitCommitCounter(1L, startup.serverA, startup.serverB, startup.serverC); - // Setup B to fail the "COMMIT" message. + /* + * Setup B to fail the "COMMIT" message (specifically, it will throw + * back an exception rather than executing the commit. + * + * FIXME We need to cause B to actually fail the commit such that it + * enters the ERROR state. This is only causing the RMI to be rejected + * so B is not being failed out of the pipeline. Thus, B will remain + * joined with the met quorum (but at the wrong commit point) until we + * send down another replicated write. At that point B will notice that + * it is out of whack and enter the ERROR state. + */ ((HAGlueTest) startup.serverB) .failNext("commit2Phase", new Class[] { IHA2PhaseCommitMessage.class }, @@ -543,13 +553,24 @@ // Should be two commit points on {A,C]. awaitCommitCounter(2L, startup.serverA, startup.serverC); + // Just one commit point on B. + awaitCommitCounter(1L, startup.serverB); + + // B is still a follower. + awaitHAStatus(startup.serverB, HAStatusEnum.Follower); + /* * B should go into an ERROR state and then into SeekConsensus and * from there to RESYNC and finally back to RunMet. We can not * reliably observe the intervening states. So what we really need * to do is watch for B to move to the end of the pipeline and catch * up to the same commit point. + * + * FIXME This is forcing B into an error state to simulate what + * would happen if B had encountered an error during the 2-phase + * commit above. */ + ((HAGlueTest)startup.serverB).enterErrorState(); /* * The pipeline should be reordered. B will do a service leave, then @@ -558,6 +579,8 @@ awaitPipeline(new HAGlue[] { startup.serverA, startup.serverC, startup.serverB }); + awaitFullyMetQuorum(); + /* * There should be two commit points on {A,C,B} (note that this * assert does not pay attention to the pipeline order). This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-11-01 13:34:18
|
Revision: 7506 http://bigdata.svn.sourceforge.net/bigdata/?rev=7506&view=rev Author: thompsonbry Date: 2013-11-01 13:34:10 +0000 (Fri, 01 Nov 2013) Log Message: ----------- javadoc related to QuorumServer.getService(UUID) as implemented by HAQuorumService. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-31 17:04:33 UTC (rev 7505) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-01 13:34:10 UTC (rev 7506) @@ -8001,15 +8001,17 @@ final UUID leaderId = req.getLeaderId(); + // Note: Will throw exception if our HAQuorumService is not running. final HAGlue leader = getQuorum().getClient().getService(leaderId); if (leader == null) throw new RuntimeException( "Could not discover the quorum leader."); + // Get our serviceId. final UUID serviceId = getServiceId(); - if(serviceId == null) + if (serviceId == null) throw new AssertionError(); final Callable<IHANotifyReleaseTimeResponse> task = ((AbstractHATransactionService) AbstractJournal.this Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-31 17:04:33 UTC (rev 7505) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-11-01 13:34:10 UTC (rev 7506) @@ -1689,11 +1689,29 @@ } /** - * Resolve an {@link HAGlue} object from its Service UUID. + * {@inheritDoc} + * <p> + * This implementation resolves an {@link HAGlue} object from its + * Service UUID using the <strong>pre-existing</strong> connection for + * the {@link HAClient} and the cached service discovery lookup for that + * connection. If the {@link HAClient} is not connected, then an + * {@link IllegalStateException} will be thrown. + * + * @param serviceId + * The {@link UUID} of the service to be resolved. + * + * @return The proxy for the service having the specified {@link UUID} + * and never <code>null</code>. + * + * @throws IllegalStateException + * if the {@link HAClient} is not connected. + * @throws QuorumException + * if no service can be discovered for that {@link UUID}. */ @Override public S getService(final UUID serviceId) { + // Throws IllegalStateException if not connected (HAClient). final HAGlueServicesClient discoveryClient = server .getHAClient().getConnection().getHAGlueServicesClient(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-31 17:04:42
|
Revision: 7505 http://bigdata.svn.sourceforge.net/bigdata/?rev=7505&view=rev Author: thompsonbry Date: 2013-10-31 17:04:33 +0000 (Thu, 31 Oct 2013) Log Message: ----------- The issue with the withheld version of QuorumCommitImpl has been tracked down to the non-blocking semantics of HAGlue.getRootBlock(). I have introduced a blocking version of the iHARootBlockRequest message for this method. When that is used from within awaitCommitCounter() in the test suite, then the test suite runs green for the tests that were failing. Thus, this was a data race problem in the test suite. AbstractServer: clean up dead code. HAJournalServer: clean up dead code. AbstractHA3BackupTestCase: clean up import. AbstractHAJournalServerTestCase: modify awaitCommitCounter() to use the blocking version of HAGlue.getRootBlock(). This fixes a test suite problem where increased concurrency in the 2-phase PREPARE, COMMIT, and ABORT protocols could cause test suite failures. QuorumCommitImpl: Increased parallelism. The PREPARE, COMMIT, and ABORT behaviors are now executed in parallel on the followers and the leader. Simplified the code patterns for error handling. (I)HARootBlockRequest: Added boolean parameter for blocking versus non-blocking request. AbstractJournal: Added support for blocking versus non-blocking request. See #760 (commit2Phase code review). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HARootBlockRequest.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHARootBlockRequest.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -28,10 +28,11 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -43,11 +44,14 @@ import com.bigdata.ha.msg.IHA2PhaseAbortMessage; import com.bigdata.ha.msg.IHA2PhaseCommitMessage; import com.bigdata.ha.msg.IHA2PhasePrepareMessage; +import com.bigdata.ha.msg.IHAMessage; import com.bigdata.journal.IRootBlockView; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumMember; import com.bigdata.quorum.QuorumStateChangeListener; import com.bigdata.quorum.QuorumStateChangeListenerBase; +import com.bigdata.service.proxy.ThickFuture; +import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.ExecutionExceptions; /** @@ -60,50 +64,73 @@ static private transient final Logger log = Logger .getLogger(QuorumCommitImpl.class); - protected final QuorumMember<S> member; - - /** - * The downstream service in the write pipeline. - */ - protected volatile UUID downStreamId = null; + private final QuorumMember<S> member; + private final ExecutorService executorService; public QuorumCommitImpl(final QuorumMember<S> member) { + + if (member == null) + throw new IllegalArgumentException(); this.member = member; + this.executorService = member.getExecutor(); + } - protected Quorum<?, ?> getQuorum() { + private Quorum<?, ?> getQuorum() { return member.getQuorum(); } - protected HACommitGlue getService(final UUID serviceId) { + private HACommitGlue getService(final UUID serviceId) { return member.getService(serviceId); } - + /** * Cancel the requests on the remote services (RMI). This is a best effort * implementation. Any RMI related errors are trapped and ignored in order * to be robust to failures in RMI when we try to cancel the futures. + * <p> + * NOte: This is not being done in parallel. However, due to a DGC thread + * leak issue, we now use {@link ThickFuture}s. Thus, the tasks that are + * being cancelled are all local tasks running on the + * {@link #executorService}. If that local task is doing an RMI, then + * cancelling it will cause an interrupt in the NIO request. */ - protected <F extends Future<T>, T> void cancelRemoteFutures( - final List<F> remoteFutures) { + private <F extends Future<T>, T> void cancelFutures(final List<F> futures) { if (log.isInfoEnabled()) log.info(""); - for (F rf : remoteFutures) { + for (F f : futures) { + if (f == null) { + + continue; + + } + try { + + if (!f.isDone()) { + + f.cancel(true/* mayInterruptIfRunning */); - rf.cancel(true/* mayInterruptIfRunning */); + } } catch (Throwable t) { + + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + // Propagate interrupt. + Thread.currentThread().interrupt(); + + } + // ignored (to be robust). } @@ -139,6 +166,7 @@ * from the prepare message. This metadata is used to decide how the service * will handle the prepare, commit, and abort messages. */ + @Override public PrepareResponse prepare2Phase(final PrepareRequest req) throws InterruptedException, IOException { @@ -150,9 +178,6 @@ final UUID[] joinedServiceIds = req.getPrepareAndNonJoinedServices() .getJoinedServiceIds(); -// final Set<UUID> nonJoinedPipelineServiceIds = req -// .getNonJoinedPipelineServiceIds(); - final long timeout = req.getTimeout(); final TimeUnit unit = req.getUnit(); @@ -172,35 +197,36 @@ final long begin = System.nanoTime(); final long nanos = unit.toNanos(timeout); long remaining = nanos; - + /* - * The leader is a local service. The followers and other service in the - * pipeline (but not yet joined) are remote services. + * Random access list of futures. + * + * Note: These are *local* Futures. Except for the leader, the Future is + * for a task that submits an RMI request. This allows us to run the + * PREPARE in parallel for less total latency. On the leader, the task + * is run in the caller's thread (on the leader, the caller holds a lock + * that we need so we have to execute the behavior in the caller's + * thread). + * + * Note: If a follower was joined as of the atomic decision point, but + * did not *participate* in the GATHER protocol, then we still send it + * the PREPARE message. */ + final ArrayList<Future<Boolean>> localFutures = new ArrayList<Future<Boolean>>( + joinedServiceIds.length); - // #of remote followers (joined services, excluding the leader). - final int nfollowers = (joinedServiceIds.length - 1); + try { -// // #of non-joined services in the pipeline. -// final int nNonJoinedPipelineServices = nonJoinedPipelineServiceIds -// .size(); + // #of remote followers (joined services, excluding the leader). + final int nfollowers = joinedServiceIds.length - 1; - // #of remote services (followers plus others in the pipeline). - final int remoteServiceCount = nfollowers;// + nNonJoinedPipelineServices; + for (int i = 0; i <= nfollowers; i++) { - // Random access list of futures. - final ArrayList<Future<Boolean>> remoteFutures = new ArrayList<Future<Boolean>>( - remoteServiceCount); + // Pre-size to ensure sufficient room for set(i,foo). + localFutures.add(null); - for (int i = 0; i <= remoteServiceCount; i++) { - - // Pre-size to ensure sufficient room for set(i,foo). - remoteFutures.add(null); - - } - - try { - + } + // Verify the quorum is valid. member.assertLeader(token); @@ -240,64 +266,20 @@ rootBlock, timeout, unit); /* - * Runnable which will execute this message on the - * remote service. - * - * FIXME Because async futures cause DGC native thread - * leaks this is no longer running the prepare - * asynchronously on the followers. Change the code - * here, and in commit2Phase and abort2Phase to use - * multiple threads to run the tasks on the followers. + * Submit task which will execute this message on the + * remote service. We will await this task below. */ - - final HACommitGlue service = getService(serviceId); - - Future<Boolean> rf = null; - try { - // RMI. - rf = service.prepare2Phase(msgForJoinedService); - } catch (final Throwable t) { - // If anything goes wrong, wrap up exception as Future. - final FutureTask<Boolean> ft = new FutureTask<Boolean>(new Runnable() { - public void run() { - throw new RuntimeException(t); - } - }, Boolean.FALSE); - rf = ft; - ft.run(); // evaluate future. - } + final Future<Boolean> rf = executorService + .submit(new PrepareMessageTask(serviceId, + msgForJoinedService)); // add to list of futures we will check. - remoteFutures.set(i, rf); + localFutures.set(i, rf); } } -// // Next, message the pipeline services NOT met with the quorum. -// { -// -// // message for non-joined services. -// final IHA2PhasePrepareMessage msg = new HA2PhasePrepareMessage( -// false/* isJoinedService */, rootBlock, timeout, unit); -// -// for (UUID serviceId : nonJoinedPipelineServiceIds) { -// -// /* -// * Runnable which will execute this message on the -// * remote service. -// */ -// final Future<Boolean> rf = getService(serviceId) -// .prepare2Phase(msg); -// -// // add to list of futures we will check. -// remoteFutures.set(i, rf); -// -// i++; -// -// } -// } - /* * Finally, run the operation on the leader using local method * call (non-RMI) in the caller's thread to avoid deadlock. @@ -324,7 +306,7 @@ final Future<Boolean> f = leader .prepare2Phase(msgForJoinedService); - remoteFutures.set(0/* index */, f); + localFutures.set(0/* index */, f); } @@ -334,36 +316,24 @@ * Check futures for all services that were messaged. */ int nyes = 0; - assert remoteFutures.size() == remoteServiceCount + 1; - final boolean[] votes = new boolean[remoteServiceCount + 1]; - for (int i = 0; i <= remoteServiceCount; i++) { - final Future<Boolean> rf = remoteFutures.get(i); - if (rf == null) + final boolean[] votes = new boolean[1 + nfollowers]; + for (int i = 0; i <= nfollowers; i++) { + final Future<Boolean> ft = localFutures.get(i); + if (ft == null) throw new AssertionError("null @ index=" + i); - boolean done = false; try { remaining = nanos - (System.nanoTime() - begin); - final boolean vote = rf + final boolean vote = ft .get(remaining, TimeUnit.NANOSECONDS); votes[i] = vote; - if (i < joinedServiceIds.length) { - // Only the leader and the followers get a vote. - nyes += vote ? 1 : 0; - } else { - // non-joined pipeline service. vote does not count. - if (!vote) { - log.warn("Non-joined pipeline service will not prepare"); - } - } - done = true; + // Only the leader and the followers get a vote. + nyes += vote ? 1 : 0; } catch (CancellationException ex) { // This Future was cancelled. log.error(ex, ex); - done = true; // CancellationException indicates isDone(). } catch (TimeoutException ex) { // Timeout on this Future. log.error(ex, ex); - done = false; } catch (ExecutionException ex) { /* * Note: prepare2Phase() is throwing exceptions if @@ -373,7 +343,6 @@ * service when attempting to perform the RMI. */ log.error(ex, ex); - done = true; // ExecutionException indicates isDone(). } catch (RuntimeException ex) { /* * Note: ClientFuture.get() can throw a RuntimeException if @@ -382,14 +351,8 @@ */ log.error(ex, ex); } finally { - if (!done) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } + // Note: cancelling a *local* Future wrapping an RMI. + ft.cancel(true/*mayInterruptIfRunning*/); } } @@ -415,30 +378,19 @@ return new PrepareResponse(k, nyes, willCommit, votes); } finally { - /* - * Ensure that all futures are cancelled. - */ - for (Future<Boolean> rf : remoteFutures) { - if (rf == null) // ignore empty slots. - continue; - if (!rf.isDone()) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } - } + + cancelFutures(localFutures); + } } - public void commit2Phase(final CommitRequest req) throws IOException, + @Override + public void commit2Phase(final CommitRequest commitRequest) throws IOException, InterruptedException { if (log.isInfoEnabled()) - log.info("req=" + req); + log.info("req=" + commitRequest); /* * To minimize latency, we first submit the futures for the other @@ -460,31 +412,28 @@ * atomic decision point concerning such things in commitNow().] */ - final PrepareRequest preq = req.getPrepareRequest(); + final PrepareRequest prepareRequest = commitRequest.getPrepareRequest(); - final UUID[] joinedServiceIds = preq.getPrepareAndNonJoinedServices() + final UUID[] joinedServiceIds = prepareRequest.getPrepareAndNonJoinedServices() .getJoinedServiceIds(); -// final Set<UUID> nonJoinedPipelineServiceIds = preq -// .getNonJoinedPipelineServiceIds(); - - final long token = preq.getRootBlock().getQuorumToken(); + final long token = prepareRequest.getRootBlock().getQuorumToken(); - final long commitTime = preq.getRootBlock().getLastCommitTime(); + final long commitTime = prepareRequest.getRootBlock().getLastCommitTime(); - final PrepareResponse presp = req.getPrepareResponse(); + final PrepareResponse prepareResponse = commitRequest.getPrepareResponse(); // true iff we have a full complement of services that vote YES for this // commit. - final boolean didAllServicesPrepare = presp.getYesCount() == presp + final boolean didAllServicesPrepare = prepareResponse.getYesCount() == prepareResponse .replicationFactor(); - member.assertLeader(token); - - final List<Future<Void>> remoteFutures = new LinkedList<Future<Void>>(); + final List<Future<Void>> localFutures = new LinkedList<Future<Void>>(); try { + member.assertLeader(token); + final IHA2PhaseCommitMessage msgJoinedService = new HA2PhaseCommitMessage( true/* isJoinedService */, commitTime, didAllServicesPrepare); @@ -492,7 +441,7 @@ final UUID serviceId = joinedServiceIds[i]; - if (!presp.getVote(i)) { + if (!prepareResponse.getVote(i)) { // Skip services that did not vote YES in PREPARE. continue; @@ -500,38 +449,18 @@ } /* - * Runnable which will execute this message on the remote - * service. + * Submit task on local executor. The task will do an RMI to the + * remote service. */ - final Future<Void> rf = getService(serviceId).commit2Phase( - msgJoinedService); + final Future<Void> rf = executorService + .submit(new CommitMessageTask(serviceId, + msgJoinedService)); // add to list of futures we will check. - remoteFutures.add(rf); + localFutures.add(rf); } -// if (!nonJoinedPipelineServiceIds.isEmpty()) { -// -// final IHA2PhaseCommitMessage msgNonJoinedService = new HA2PhaseCommitMessage( -// false/* isJoinedService */, commitTime); -// -// for (UUID serviceId : nonJoinedPipelineServiceIds) { -// -// /* -// * Runnable which will execute this message on the remote -// * service. -// */ -// final Future<Void> rf = getService(serviceId).commit2Phase( -// msgNonJoinedService); -// -// // add to list of futures we will check. -// remoteFutures.add(rf); -// -// } -// -// } - { /* * Run the operation on the leader using local method call in @@ -542,7 +471,7 @@ final Future<Void> f = leader.commit2Phase(msgJoinedService); - remoteFutures.add(f); + localFutures.add(f); } @@ -550,11 +479,9 @@ * Check the futures for the other services in the quorum. */ final List<Throwable> causes = new LinkedList<Throwable>(); - for (Future<Void> rf : remoteFutures) { - boolean done = false; + for (Future<Void> ft : localFutures) { try { - rf.get(); // TODO Timeout to await followers in commit2Phase(). - done = true; + ft.get(); // FIXME Timeout to await followers in commit2Phase(). // } catch (TimeoutException ex) { // // Timeout on this Future. // log.error(ex, ex); @@ -564,11 +491,9 @@ // Future was cancelled. log.error(ex, ex); causes.add(ex); - done = true; // Future is done since cancelled. } catch (ExecutionException ex) { log.error(ex, ex); causes.add(ex); - done = true; // Note: ExecutionException indicates isDone(). } catch (RuntimeException ex) { /* * Note: ClientFuture.get() can throw a RuntimeException @@ -578,14 +503,8 @@ log.error(ex, ex); causes.add(ex); } finally { - if (!done) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } + // Note: cancelling a *local* Future wrapping an RMI. + ft.cancel(true/* mayInterruptIfRunning */); } } @@ -593,8 +512,6 @@ * If there were any errors, then throw an exception listing them. */ if (!causes.isEmpty()) { - // Cancel remote futures. - cancelRemoteFutures(remoteFutures); // Throw exception back to the leader. if (causes.size() == 1) throw new RuntimeException(causes.get(0)); @@ -603,28 +520,22 @@ } } finally { - /* - * Ensure that all futures are cancelled. - */ - for (Future<Void> rf : remoteFutures) { - if (!rf.isDone()) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } - } + + // Ensure that all futures are cancelled. + cancelFutures(localFutures); + } } /** + * {@inheritDoc} + * * FIXME Only issue abort to services that voted YES in prepare? [We have * that information in commitNow(), but we do not have the atomic set of * joined services in AbstractJournal.abort())]. */ + @Override public void abort2Phase(final long token) throws IOException, InterruptedException { @@ -632,14 +543,6 @@ log.info("token=" + token); /* - * To minimize latency, we first submit the futures for the other - * services and then do f.run() on the leader. This will allow the other - * services to commit concurrently with the leader's IO. - */ - - final List<Future<Void>> remoteFutures = new LinkedList<Future<Void>>(); - - /* * For services (other than the leader) in the quorum, submit the * RunnableFutures to an Executor. */ @@ -649,6 +552,12 @@ final IHA2PhaseAbortMessage msg = new HA2PhaseAbortMessage(token); + /* + * To minimize latency, we first submit the futures for the other + * services and then do f.run() on the leader. + */ + final List<Future<Void>> localFutures = new LinkedList<Future<Void>>(); + try { for (int i = 1; i < joinedServiceIds.length; i++) { @@ -656,13 +565,14 @@ final UUID serviceId = joinedServiceIds[i]; /* - * Runnable which will execute this message on the remote - * service. + * Submit task on local executor. The task will do an RMI to the + * remote service. */ - final Future<Void> rf = getService(serviceId).abort2Phase(msg); + final Future<Void> rf = executorService + .submit(new AbortMessageTask(serviceId, msg)); // add to list of futures we will check. - remoteFutures.add(rf); + localFutures.add(rf); } @@ -674,25 +584,22 @@ member.assertLeader(token); final S leader = member.getService(); final Future<Void> f = leader.abort2Phase(msg); - remoteFutures.add(f); + localFutures.add(f); } /* * Check the futures for the other services in the quorum. */ final List<Throwable> causes = new LinkedList<Throwable>(); - for (Future<Void> rf : remoteFutures) { - boolean done = false; + for (Future<Void> ft : localFutures) { try { - rf.get(); - done = true; + ft.get(); // TODO Timeout for abort? } catch (InterruptedException ex) { log.error(ex, ex); causes.add(ex); } catch (ExecutionException ex) { log.error(ex, ex); causes.add(ex); - done = true; // Note: ExecutionException indicates isDone(). } catch (RuntimeException ex) { /* * Note: ClientFuture.get() can throw a RuntimeException @@ -702,14 +609,8 @@ log.error(ex, ex); causes.add(ex); } finally { - if (!done) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } + // Note: cancelling a *local* Future wrapping an RMI. + ft.cancel(true/* mayInterruptIfRunning */); } } @@ -717,11 +618,10 @@ * If there were any errors, then throw an exception listing them. * * TODO But only throw an exception for the joined services. - * Non-joined services, we just long an error. + * Non-joined services, we just long an error (or simply do not tell + * them to do an abort()). */ if (!causes.isEmpty()) { - // Cancel remote futures. - cancelRemoteFutures(remoteFutures); // Throw exception back to the leader. if (causes.size() == 1) throw new RuntimeException(causes.get(0)); @@ -730,22 +630,133 @@ } } finally { + + // Ensure that all futures are cancelled. + cancelFutures(localFutures); + + } + + } + + /** + * Helper class submits the RMI for a PREPARE, COMMIT, or ABORT message. + * This is used to execute the different requests in parallel on a local + * executor service. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private abstract class AbstractMessageTask<T, M extends IHAMessage> + implements Callable<T> { + + private final UUID serviceId; + protected final M msg; + + public AbstractMessageTask(final UUID serviceId, final M msg) { + + this.serviceId = serviceId; + + this.msg = msg; + + } + + @Override + final public T call() throws Exception { + /* - * Ensure that all futures are cancelled. + * Note: This code MAY be interrupted at any point if the Future for + * the task is cancelled. If it is interrupted during the RMI, then + * the expectation is that the NIO will be interrupted in a timely + * manner throwing back some sort of IOException indicating the + * asynchronous close of the IO channel or cancel of the RMI. */ - for (Future<Void> rf : remoteFutures) { - if (!rf.isDone()) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } + + // Resolve proxy for remote service. + final HACommitGlue service = getService(serviceId); + + // RMI. + final Future<T> ft = doRMI(service); + + try { + + /* + * Await the inner Future for the RMI. + * + * Note: In fact, this is a ThickFuture so it is already done by + * the time the RMI returns. + */ + + return ft.get(); + + } finally { + + ft.cancel(true/* mayInterruptIfRunning */); + } } + abstract protected Future<T> doRMI(final HACommitGlue service) + throws IOException; + } + private class PrepareMessageTask extends + AbstractMessageTask<Boolean, IHA2PhasePrepareMessage> { + + public PrepareMessageTask(final UUID serviceId, + final IHA2PhasePrepareMessage msg) { + + super(serviceId, msg); + + } + + @Override + protected Future<Boolean> doRMI(final HACommitGlue service) + throws IOException { + + return service.prepare2Phase(msg); + + } + + } + + private class CommitMessageTask extends + AbstractMessageTask<Void, IHA2PhaseCommitMessage> { + + public CommitMessageTask(final UUID serviceId, + final IHA2PhaseCommitMessage msg) { + + super(serviceId, msg); + + } + + @Override + protected Future<Void> doRMI(final HACommitGlue service) + throws IOException { + + return service.commit2Phase(msg); + } + + } + + private class AbortMessageTask extends + AbstractMessageTask<Void, IHA2PhaseAbortMessage> { + + public AbortMessageTask(final UUID serviceId, + final IHA2PhaseAbortMessage msg) { + + super(serviceId, msg); + + } + + @Override + protected Future<Void> doRMI(final HACommitGlue service) + throws IOException { + + return service.abort2Phase(msg); + } + + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HARootBlockRequest.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HARootBlockRequest.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HARootBlockRequest.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -32,11 +32,33 @@ private static final long serialVersionUID = 1L; private final UUID storeUUID; + private final boolean isNonBlocking; + /** + * Create a non-blocking request (this is the historical behavior). + * + * @param storeUUID + * The store UUID (optional). + */ public HARootBlockRequest(final UUID storeUUID) { + this(storeUUID, true/* isNonBlocking */); + + } + + /** + * + * @param storeUUID + * The store UUID (optional). + * @param isNonBlocking + * <code>true</code> iff the request should be non-blocking. + */ + public HARootBlockRequest(final UUID storeUUID,final boolean isNonBlocking) { + // Note: Optional. this.storeUUID = storeUUID; + + this.isNonBlocking = isNonBlocking; } @@ -47,4 +69,11 @@ } + @Override + public boolean isNonBlocking() { + + return isNonBlocking; + + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHARootBlockRequest.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHARootBlockRequest.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHARootBlockRequest.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -39,4 +39,18 @@ */ UUID getStoreUUID(); + /** + * When <code>true</code> the request should be non-blocking. Otherwise the + * request should obtain the lock that guards the update of the root block + * in the commit protocol such that the caller can not observe a root block + * that has been updated but where the commit protocol is still in its + * critical section. + * <p> + * Note: The non-blocking form of the request is used in some context where + * the avoidence of a deadlock is necessary. The blocking form is used in + * some contexts where we need to await a specific commit point on the + * service (typically under test suite control). + */ + boolean isNonBlocking(); + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -7842,9 +7842,34 @@ } - return new HARootBlockResponse( - AbstractJournal.this.getRootBlockView()); + if (msg.isNonBlocking()) { + // Non-blocking code path. + + return new HARootBlockResponse( + AbstractJournal.this.getRootBlockView()); + + } else { + + // Blocking code path. + + final ReadLock lock = _fieldReadWriteLock.readLock(); + + lock.lock(); + + try { + + return new HARootBlockResponse( + AbstractJournal.this.getRootBlockView()); + + } finally { + + lock.unlock(); + + } + + } + } // @Override Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -260,6 +260,11 @@ private JoinManager joinManager; /** + * Used to suppor the {@link #joinManager}. + */ + private volatile LookupDiscoveryManager lookupDiscoveryManager = null; + + /** * The {@link Configuration} read based on the args[] provided when the * server is started. */ @@ -399,24 +404,6 @@ } -// /** -// * An object used to manage jini service registrar discovery. -// */ -// public LookupDiscoveryManager getDiscoveryManagement() { -// -// return lookupDiscoveryManager; -// -// } -// -// /** -// * An object used to lookup services using the discovered service registars. -// */ -// public ServiceDiscoveryManager getServiceDiscoveryManager() { -// -// return serviceDiscoveryManager; -// -// } - /** * The {@link HAClient}. */ @@ -445,6 +432,7 @@ /** * Signals anyone waiting on {@link #discoveryEvent}. */ + @Override public void discarded(final DiscoveryEvent e) { try { @@ -472,6 +460,7 @@ /** * Signals anyone waiting on {@link #discoveryEvent}. */ + @Override public void discovered(final DiscoveryEvent e) { try { @@ -895,106 +884,6 @@ throw new AssertionError();// keeps compiler happy. } -// Note: Moved HAClient.connect() into quorumService.start(). -// final HAConnection ctx; -// try { -// -// // Create client. -// haClient = new HAClient(args); -// -// // Connect. -// ctx = haClient.connect(); -// -//// /* -//// * Note: This class will perform multicast discovery if ALL_GROUPS -//// * is specified and otherwise requires you to specify one or more -//// * unicast locators (URIs of hosts running discovery services). As -//// * an alternative, you can use LookupDiscovery, which always does -//// * multicast discovery. -//// */ -//// lookupDiscoveryManager = new LookupDiscoveryManager( -//// jiniClientConfig.groups, jiniClientConfig.locators, -//// this /* DiscoveryListener */, config); -//// -//// /* -//// * Setup a helper class that will be notified as services join or -//// * leave the various registrars to which the data server is -//// * listening. -//// */ -//// try { -//// -//// serviceDiscoveryManager = new ServiceDiscoveryManager( -//// lookupDiscoveryManager, new LeaseRenewalManager(), -//// config); -//// -//// } catch (IOException ex) { -//// -//// throw new RuntimeException( -//// "Could not initiate service discovery manager", ex); -//// -//// } -//// -//// } catch (IOException ex) { -//// -//// fatal("Could not setup discovery", ex); -//// throw new AssertionError();// keep the compiler happy. -//// -// } catch (ConfigurationException ex) { -// -// fatal("Configuration error: " + ex, ex); -// -// throw new AssertionError();// keep the compiler happy. -// -// } catch(Throwable ex) { -// -// fatal("Could not connect: " + ex, ex); -// -// throw new AssertionError();// keep the compiler happy. -// -// } - - // Note: Moved newService() call into AbstractServer.run(). -// /* -// * Create the service object. -// */ -// try { -// -// /* -// * Note: By creating the service object here rather than outside of -// * the constructor we potentially create problems for subclasses of -// * AbstractServer since their own constructor will not have been -// * executed yet. -// * -// * Some of those problems are worked around using a JiniClient to -// * handle all aspects of service discovery (how this service locates -// * the other services in the federation). -// * -// * Note: If you explicitly assign values to those clients when the -// * fields are declared, e.g., [timestampServiceClient=null] then the -// * ctor will overwrite the values set by [newService] since it is -// * running before those initializations are performed. This is -// * really crufty, may be JVM dependent, and needs to be refactored -// * to avoid this subclass ctor init problem. -// */ -// -// if (log.isInfoEnabled()) -// log.info("Creating service impl..."); -// -// // init. -// impl = newService(config); -// -// if (log.isInfoEnabled()) -// log.info("Service impl is " + impl); -// -// } catch(Exception ex) { -// -// fatal("Could not start service: "+this, ex); -// throw new AssertionError();// keeps compiler happy. -// } - -// // Export the service proxy. -// exportProxy(haClient, impl); - } /** @@ -1157,7 +1046,6 @@ } } - private volatile LookupDiscoveryManager lookupDiscoveryManager = null; /** * Await discovery of at least one {@link ServiceRegistrar}. @@ -1420,6 +1308,7 @@ * @param serviceID * The assigned {@link ServiceID}. */ + @Override synchronized public void serviceIDNotify(final ServiceID serviceID) { if (serviceID == null) @@ -1550,6 +1439,7 @@ * Note: This is only invoked if the automatic lease renewal by the lease * manager is denied by the service registrar. */ + @Override public void notify(final LeaseRenewalEvent event) { log.warn("Lease could not be renewed: " + this + " : " + event); @@ -1922,43 +1812,7 @@ } } - -// if (serviceDiscoveryManager != null) { -// -// serviceDiscoveryManager.terminate(); -// -// serviceDiscoveryManager = null; -// -// } -// -// if (lookupDiscoveryManager != null) { -// -// lookupDiscoveryManager.terminate(); -// -// lookupDiscoveryManager = null; -// -// } -// if (client != null) { -// -// if(client.isConnected()) { -// -// /* -// * Note: This will close the zookeeper client and that will -// * cause the ephemeral znode for the service to be removed. -// */ -// -//// if (log.isInfoEnabled()) -//// log.info("Disconnecting from federation"); -// -// client.disconnect(true/* immediateShutdown */); -// -// } -// -// client = null; -// -// } - } /** @@ -2181,7 +2035,8 @@ return new FileFilter() { - public boolean accept(File pathname) { + @Override + public boolean accept(final File pathname) { return false; Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -441,11 +441,6 @@ } -// /** -// * Caching discovery client for the {@link HAGlue} services. -// */ -// private HAJournalDiscoveryClient discoveryClient; - /** * The journal. */ @@ -458,10 +453,6 @@ */ private boolean onelineDisasterRecovery; -// private ZookeeperClientConfig zkClientConfig; - -// private ZooKeeperAccessor zka; - /** * An executor used to handle events that were received in the zk watcher * event thread. We can not take actions that could block in the watcher @@ -560,15 +551,6 @@ */ Operator; } - -// /** -// * Caching discovery client for the {@link HAGlue} services. -// */ -// public HAJournalDiscoveryClient getDiscoveryClient() { -// -// return discoveryClient; -// -// } public HAJournalServer(final String[] args, final LifeCycle lifeCycle) { @@ -603,14 +585,6 @@ @Override protected void terminate() { -// if (discoveryClient != null) { -// -// discoveryClient.terminate(); -// -// discoveryClient = null; -// -// } - super.terminate(); } @@ -663,49 +637,6 @@ if (log.isInfoEnabled()) log.info("Creating service impl..."); -// /* -// * Verify discovery of at least one ServiceRegistrar. -// */ -// getHAClient().getConnection().awaitServiceRegistrars(10/* timeout */, -// TimeUnit.SECONDS); - -// { -// final long begin = System.currentTimeMillis(); -// -// ServiceRegistrar[] registrars = null; -// -// long elapsed = 0; -// -// while ((registrars == null || registrars.length == 0) -// && elapsed < TimeUnit.SECONDS.toMillis(10)) { -// -// registrars = getHAClient().getConnection() -// .getDiscoveryManagement().getRegistrars(); -// -// Thread.sleep(100/* ms */); -// -// elapsed = System.currentTimeMillis() - begin; -// -// } -// -// if (registrars == null || registrars.length == 0) { -// -// throw new RuntimeException( -// "Could not discover ServiceRegistrar(s)"); -// -// } -// -// if (log.isInfoEnabled()) { -// log.info("Found " + registrars.length + " service registrars"); -// } -// -// } - -// // Setup discovery for HAGlue clients. -// discoveryClient = new HAJournalDiscoveryClient( -// getServiceDiscoveryManager(), -// null/* serviceDiscoveryListener */, cacheMissTimeout); - // Jini/River ServiceID. final ServiceID serviceID = getServiceID(); @@ -727,7 +658,6 @@ * Setup the Quorum / HAJournal. */ -// zkClientConfig = new ZookeeperClientConfig(config); final ZookeeperClientConfig zkClientConfig = getHAClient() .getZookeeperClientConfig(); @@ -757,56 +687,9 @@ /* * Zookeeper quorum. */ - final Quorum<HAGlue, QuorumService<HAGlue>> quorum; - { + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = (Quorum) new ZKQuorumImpl<HAGlue, HAQuorumService<HAGlue, HAJournal>>( + replicationFactor); -// final List<ACL> acl = zkClientConfig.acl; -// -// final ZooKeeperAccessor zka = getHAClient().getConnection() -// .getZookeeperAccessor(); -// -// if (!zka.awaitZookeeperConnected(10, TimeUnit.SECONDS)) { -// -// throw new RuntimeException("Could not connect to zk"); -// -// } -// -// if (log.isInfoEnabled()) { -// log.info("Connected to zookeeper"); -// } -// -// /* -// * Ensure key znodes exist. -// */ -// try { -// zka.getZookeeper() -// .create(zkClientConfig.zroot, -// new byte[] {/* data */}, acl, -// CreateMode.PERSISTENT); -// } catch (NodeExistsException ex) { -// // ignore. -// } -// try { -// zka.getZookeeper() -// .create(logicalServiceZPathPrefix, -// new byte[] {/* data */}, acl, -// CreateMode.PERSISTENT); -// } catch (NodeExistsException ex) { -// // ignore. -// } -// try { -// zka.getZookeeper() -// .create(logicalServiceZPath, -// new byte[] {/* data */}, acl, -// CreateMode.PERSISTENT); -// } catch (NodeExistsException ex) { -// // ignore. -// } - - quorum = (Quorum) new ZKQuorumImpl<HAGlue, HAQuorumService<HAGlue, HAJournal>>( - replicationFactor);// , zka, acl); - } - // The HAJournal. this.journal = newHAJournal(this, config, quorum); @@ -822,19 +705,15 @@ // Setup the quorum client (aka quorum service). quorumService = newQuorumService(logicalServiceZPath, serviceUUID, haGlueService, journal); - -// // wrap the external interface, exposing administrative functions. -// final AdministrableHAGlueService administrableService = new AdministrableHAGlueService( -// this, haGlueService); -// -// // return that wrapped interface. -// return administrableService; /* - * Return that object. This will get proxied. If we wrap it with a - * delegation pattern here, then RMI methods on a subclass of - * HAGlueService will not be visible on the exported proxy. + * Return our external interface object. This object will get proxied. + * + * Note: If we wrap that object with a delegation pattern, then RMI + * methods on a subclass of HAGlueService will not be visible on the + * exported proxy. */ + return haGlueService; } @@ -924,25 +803,9 @@ // Start the NSS. startNSS(); -// // Setup listener that logs quorum events @ TRACE. -// journal.getQuorum().addListener(new QuorumListener() { -// @Override -// public void notify(final QuorumEvent e) { -// if (log.isTraceEnabled()) -// log.trace(e); -// } -// }); - -// // Setup the quorum client (aka quorum service). -// quorumService = newQuorumService(logicalServiceZPath, serviceUUID, -// haGlueService, journal); - // Start the quorum. journal.getQuorum().start(quorumService); -// // Enter a run state for the HAJournalServer. -// quorumService.enterRunState(quorumService.new RestoreTask()); - } /** @@ -985,37 +848,6 @@ if (log.isInfoEnabled()) log.info("destroy=" + destroy); - // Note: Moved to quorumService.terminate(). -// if (quorumService != null) { -// -// /* -// * FIXME SHUTDOWN: What if we are already running a ShutdownTask? We -// * should just submit a ShutdownTask here and let it work this out. -// */ -// -// /* -// * Ensure that the HAQuorumService will not attempt to cure any -// * serviceLeave or related actions. -// * -// * TODO SHUTDOWN: If we properly enter a ShutdownTask run state then -// * we would not have to do this since it will already be in the -// * Shutdown runstate. -// */ -// quorumService.runStateRef.set(RunStateEnum.Shutdown); -// -// /* -// * Terminate any running task. -// */ -// final FutureTask<?> ft = quorumService.runStateFutureRef.get(); -// -// if (ft != null) { -// -// ft.cancel(true/* mayInterruptIfRunning */); -// -// } -// -// } - final HAJournal tjournal = journal; final Quorum<HAGlue, QuorumService<HAGlue>> quorum = tjournal == null ? null @@ -1042,26 +874,7 @@ * that method.] */ quorum.terminate(); - - // Note: handled by HAClient.disconnect(). - // TODO Should we do that disconnect here? -// /* -// * Close our zookeeper connection, invalidating all ephemeral -// * znodes for this service. -// * -// * Note: This provides a decisive mechanism for removing this -// * service from the joined services, the pipeline, withdrawing -// * its vote, and removing it as a quorum member. -// */ -// if (haLog.isInfoEnabled()) -// haLog.warn("FORCING UNCURABLE ZOOKEEPER DISCONNECT"); -// -// if (zka != null) { -// -// zka.close(); -// -// } - + } catch (Throwable t) { log.error(t, t); @@ -1215,6 +1028,7 @@ } + @Override final public T call() throws Exception { /* @@ -1504,11 +1318,6 @@ @Override protected long getRetrySendTimeoutNanos() { -// final ZooKeeperAccessor zka = journal -// .getHAClient() -// .getConnection() -// .getZookeeperAccessor(); - final ZooKeeper zk = getZooKeeper(); int negotiatedSessionTimeoutMillis; // try { @@ -2206,21 +2015,6 @@ log.warn("Will attempt SERVICE LEAVE"); getActor().serviceLeave(); // Just once(!) -// while (true) { -// try { -// getActor().serviceLeave(); -// break; -// } catch (RuntimeException re) { -// if (InnerCause.isInnerCause(re, -// KeeperException.class)) { -// // Do not retry in a tight loop. -// Thread.sleep(250/* ms */); -// // Retry. -// continue; -// } -// throw re; -// } -// } /** * Dispatch Events before entering SeekConsensus! Otherwise @@ -2743,10 +2537,6 @@ while (r.hasMoreBuffers()) { -// // IHABufferStrategy -// final IHABufferStrategy strategy = journal -// .getBufferStrategy(); - // get message and fill write cache buffer (unless // WORM). final IHAWriteMessage msg = r.processNextBuffer(buf @@ -2815,6 +2605,7 @@ } + @Override protected Void doRun() throws Exception { /* @@ -3004,6 +2795,7 @@ * * @throws Exception */ + @Override protected Void doRun() throws Exception { // // Wait for the token to be set, root blocks to be valid. @@ -4106,6 +3898,7 @@ txs.runWithBarrierLock(new Runnable() { + @Override public void run() { // Verify that the quorum is valid. @@ -4765,168 +4558,8 @@ // Wait for the HAJournalServer to terminate. server.run(); -// try { -// final Server tmp = server.jettyServer; -// if (tmp != null) { -// // Wait for the jetty server to terminate. -// tmp.join(); -// } -// } catch (InterruptedException e) { -// log.warn(e); -// } - System.exit(0); } - -// /** -// * Adds jini administration interfaces to the basic {@link HAGlue} interface -// * exposed by the {@link HAJournal}. -// * -// * @see HAJournal.HAGlueService -// * -// * @author <a href="mailto:tho...@us...">Bryan -// * Thompson</a> -// */ -// public static class AdministrableHAGlueService extends HAGlueDelegate -// implements RemoteAdministrable, RemoteDestroyAdmin { -// -// final protected HAJournalServer server; -// -// public AdministrableHAGlueService(final HAJournalServer server, -// final HAGlue service) { -// -// super(service); -// -// this.server = server; -// -// } -// -//// /** -//// * Returns an object that implements whatever administration interfaces -//// * are appropriate for the particular service. -//// * -//// * @return an object that implements whatever administration interfaces -//// * are appropriate for the particular service. -//// */ -//// public Object getAdmin() throws RemoteException { -//// -//// if (log.isInfoEnabled()) -//// log.info("serviceID=" + server.getServiceID()); -//// -//// return server.proxy; -//// -//// } -// -//// /** -//// * Sets up the {@link MDC} logging context. You should do this on every -//// * client facing point of entry and then call -//// * {@link #clearLoggingContext()} in a <code>finally</code> clause. You -//// * can extend this method to add additional context. -//// * <p> -//// * This implementation adds the following parameters to the {@link MDC}. -//// * <dl> -//// * <dt>serviceName</dt> -//// * <dd>The serviceName is typically a configuration property for the -//// * service. This datum can be injected into log messages using -//// * <em>%X{serviceName}</em> in your log4j pattern layout.</dd> -//// * <dt>serviceUUID</dt> -//// * <dd>The serviceUUID is, in general, assigned asynchronously by the -//// * service registrar. Once the serviceUUID becomes available it will be -//// * added to the {@link MDC}. This datum can be injected into log -//// * messages using <em>%X{serviceUUID}</em> in your log4j pattern layout. -//// * </dd> -//// * <dt>hostname</dt> ... [truncated message content] |
From: <tho...@us...> - 2013-10-31 14:53:34
|
Revision: 7504 http://bigdata.svn.sourceforge.net/bigdata/?rev=7504&view=rev Author: thompsonbry Date: 2013-10-31 14:53:27 +0000 (Thu, 31 Oct 2013) Log Message: ----------- Dead code cleanup on HAClient Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-31 13:29:45 UTC (rev 7503) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-31 14:53:27 UTC (rev 7504) @@ -320,144 +320,6 @@ } - // /** - // * {@inheritDoc} - // * - // * @see #getProperties(String component) - // */ - // public Properties getProperties() { - // - // return properties; - // - // } - - // /** - // * Return the {@link Properties} for the {@link JiniClient} merged with - // * those for the named component in the {@link Configuration}. Any - // * properties found for the {@link JiniClient} "component" will be read - // * first. Properties for the named component are next, and therefore will - // * override those given for the {@link JiniClient}. You can specify - // * properties for either the {@link JiniClient} or the <i>component</i> - // * using: - // * - // * <pre> - // * properties = NV[]{...}; - // * </pre> - // * - // * in the appropriate section of the {@link Configuration}. For example: - // * - // * <pre> - // * - // * // Jini client configuration - // * com.bigdata.service.jini.JiniClient { - // * - // * // ... - // * - // * // optional JiniClient properties. - // * properties = new NV[] { - // * - // * new NV("foo","bar") - // * - // * }; - // * - // * } - // * </pre> - // * - // * And overrides for a named component as: - // * - // * <pre> - // * - // * com.bigdata.service.FooBaz { - // * - // * properties = new NV[] { - // * - // * new NV("foo","baz"), - // * new NV("goo","12"), - // * - // * }; - // * - // * } - // * </pre> - // * - // * @param component - // * The name of the component. - // * - // * @return The properties specified for that component. - // * - // * @see #getConfiguration() - // */ - // public Properties getProperties(final String component) - // throws ConfigurationException { - // - // return JiniClient.getProperties(component, getConfiguration()); - // - // } - - // /** - // * Read {@value JiniClientConfig.Options#PROPERTIES} for the optional - // * application or server class identified by [cls]. - // * <p> - // * Note: Anything read for the specific class will overwrite any value for - // * the same properties specified for {@link JiniClient}. - // * - // * @param className - // * The class name of the client or service (optional). When - // * specified, properties defined for that class in the - // * configuration will be used and will override those specified - // * for the {@value Options#NAMESPACE}. - // * @param config - // * The {@link Configuration}. - // * - // * @todo this could be replaced by explicit use of the java identifier - // * corresponding to the Option and simply collecting all such - // * properties into a Properties object using their native type (as - // * reported by the ConfigurationFile). - // */ - // static public Properties getProperties(final String className, - // final Configuration config) throws ConfigurationException { - // - // final NV[] a = (NV[]) config - // .getEntry(JiniClient.class.getName(), - // JiniClientConfig.Options.PROPERTIES, NV[].class, - // new NV[] {}/* defaultValue */); - // - // final NV[] b; - // if (className != null) { - // - // b = (NV[]) config.getEntry(className, - // JiniClientConfig.Options.PROPERTIES, NV[].class, - // new NV[] {}/* defaultValue */); - // - // } else - // b = null; - // - // final NV[] tmp = ConfigMath.concat(a, b); - // - // final Properties properties = new Properties(); - // - // for (NV nv : tmp) { - // - // properties.setProperty(nv.getName(), nv.getValue()); - // - // } - // - // if (log.isInfoEnabled() || BigdataStatics.debug) { - // - // final String msg = "className=" + className + " : properties=" - // + properties.toString(); - // - // if (BigdataStatics.debug) - // System.err.println(msg); - // - // if (log.isInfoEnabled()) - // log.info(msg); - // - // } - // - // return properties; - // - // } - /** * Installs a {@link SecurityManager} and returns a new client. * @@ -562,50 +424,6 @@ } - // /** - // * Read and return the content of the properties file. - // * - // * @param propertyFile - // * The properties file. - // * - // * @throws IOException - // */ - // static protected Properties getProperties(final File propertyFile) - // throws IOException { - // - // if(log.isInfoEnabled()) { - // - // log.info("Reading properties: file="+propertyFile); - // - // } - // - // final Properties properties = new Properties(); - // - // InputStream is = null; - // - // try { - // - // is = new BufferedInputStream(new FileInputStream(propertyFile)); - // - // properties.load(is); - // - // if(log.isInfoEnabled()) { - // - // log.info("Read properties: " + properties); - // - // } - // - // return properties; - // - // } finally { - // - // if (is != null) - // is.close(); - // - // } - // - // } - /** * Invoked when a service join is noticed. * @@ -1054,16 +872,6 @@ zk = null; } - // try { - // - // // close the zookeeper client. - // zooKeeperAccessor.close(); - // - // } catch (InterruptedException e) { - // - // throw new RuntimeException(e); - // - // } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-31 13:29:53
|
Revision: 7503 http://bigdata.svn.sourceforge.net/bigdata/?rev=7503&view=rev Author: thompsonbry Date: 2013-10-31 13:29:45 +0000 (Thu, 31 Oct 2013) Log Message: ----------- Import of Martyn's code with my initial review edits. Modified Paths: -------------- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java Added Paths: ----------- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -1575,7 +1575,10 @@ * @return The hex string. */ static public String toHexString(final byte[] buf) { - + + if (buf == null) + return "NULL"; + return toHexString(buf, buf.length); } @@ -1591,6 +1594,10 @@ * @return The hex string. */ static public String toHexString(final byte[] buf, int n) { + + if (buf == null) + return "NULL"; + n = n < buf.length ? n : buf.length; final StringBuffer out = new StringBuffer(); for (int i = 0; i < n; i++) { Added: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java (rev 0) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -0,0 +1,56 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.ha; + +import java.util.UUID; + +/** + * PipelineException is thrown from RMI calls to communicate + * the root cause of a pipeline problem. The caller is then able + * to take action: for example to remove the problem service + * from the quorum. + */ +public class PipelineException extends RuntimeException { + + /** + * Generated ID + */ + private static final long serialVersionUID = 8019938954269914574L; + + /** The UUID of the service that could not be reached. */ + private final UUID serviceId; + + public PipelineException(final UUID serviceId, final Throwable t) { + super(t); + + this.serviceId = serviceId; + } + + /** Return the UUID of the service that could not be reached. */ + public UUID getProblemServiceId() { + return serviceId; + } + +} Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -1184,7 +1184,7 @@ final IHAWriteMessage msg) { // Use size and checksum from real IHAWriteMessage. - super(msg.getSize(),msg.getChk()); + super(msg.getSize(),msg.getChk(),msg.getToken()); this.req = req; // MAY be null; this.msg = msg; @@ -1473,7 +1473,22 @@ innerReplicate(0/* retryCount */); } catch (Throwable t) { + + final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); + if (pe != null) { + log.error("Really need to remove service " + pe.getProblemServiceId()); + final UUID psid = pe.getProblemServiceId(); + + try { + member.getActor().forceRemoveService(psid); + } catch (Exception e) { + log.warn("Problem on node removal", e); + + throw new RuntimeException(e); + } + } + // Note: Also see retrySend()'s catch block. if (InnerCause.isInnerCause(t, InterruptedException.class) // || InnerCause.isInnerCause(t, CancellationException.class) @@ -1714,7 +1729,7 @@ ExecutionException, IOException { // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService.send(b); + final Future<Void> futSnd = sendService.send(b, msg.getToken()); try { @@ -1754,6 +1769,18 @@ } } + } catch (Throwable t) { + // check inner cause for downstream PipelineException + final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); + if (pe != null) { + throw pe; // throw it upstream + } + + // determine next pipeline service id + final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext(member.getServiceId()); + log.warn("Problem with downstream service: " + priorAndNext[1], t); + + throw new PipelineException(priorAndNext[1], t); } finally { // cancel the local Future. futSnd.cancel(true/* mayInterruptIfRunning */); @@ -2022,6 +2049,12 @@ } } + } catch (Throwable t) { + // determine next pipeline service id + final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext(member.getServiceId()); + log.warn("Problem with downstream service: " + priorAndNext[1], t); + + throw new PipelineException(priorAndNext[1], t); } finally { // cancel the local Future. futSnd.cancel(true/* mayInterruptIfRunning */); Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -302,7 +302,7 @@ this.compressorKey = compressorKey; } - + /** * The initial version. * Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -28,7 +28,9 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Random; +import com.bigdata.btree.BytesUtil; import com.bigdata.ha.pipeline.HAReceiveService; import com.bigdata.ha.pipeline.HASendService; @@ -55,7 +57,38 @@ /** The Alder32 checksum of the bytes to be transfered. */ private int chk; + /** A byte[] token that must prefix the message payload, needed to skip stale data from failed read tasks */ + private byte[] token; + + static private Random r = new Random(); + static final private int TOKEN_SIZE = 8; + + static private byte[] genToken() { + final byte[] token = new byte[TOKEN_SIZE]; + + while (!unique1(token)) { + r.nextBytes(token); + } + + return token; + } + /** + * Checks that the first byte is not repeated in the + * remaining bytes, this simplifies search for the token + * in the input stream. + */ + static private boolean unique1(final byte[] bytes) { + final byte b = bytes[0]; + for (int t = 1; t < bytes.length; t++) { + if (bytes[t] == b) + return false; + } + + return true; + } + + /** * * @param sze * The #of bytes of data to be transfered. @@ -63,6 +96,10 @@ * The Alder32 checksum of the bytes to be transfered. */ public HAWriteMessageBase(final int sze, final int chk) { + this(sze, chk, genToken()); + } + + public HAWriteMessageBase(final int sze, final int chk, final byte[] token) { if (sze <= 0) throw new IllegalArgumentException(); @@ -71,6 +108,8 @@ this.chk = chk; + this.token = token; + } /** @@ -115,7 +154,7 @@ final IHAWriteMessageBase t = (IHAWriteMessageBase) obj; - return sze == t.getSize() && chk == t.getChk(); + return sze == t.getSize() && chk == t.getChk() && (BytesUtil.compareBytes(t.getToken(), getToken()) == 0); } @@ -143,6 +182,15 @@ chk = in.readInt(); + // read token + final int tlen = in.readInt(); + if (tlen == 0) { + token = null; + } else { + token = new byte[tlen]; + in.read(token); + } + } public void writeExternal(final ObjectOutput out) throws IOException { @@ -153,6 +201,17 @@ out.writeInt(chk); + if (token == null) { + out.writeInt(0); + } else { + out.writeInt(token.length); + out.write(token); + } } + @Override + public byte[] getToken() { + return token; + } + } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -37,4 +37,6 @@ /** The Alder32 checksum of the bytes to be transfered. */ int getChk(); + /** A byte[] token that must prefix the message payload, needed to skip stale data from failed read tasks */ + byte[] getToken(); } \ No newline at end of file Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -51,6 +51,7 @@ import org.apache.log4j.Logger; +import com.bigdata.btree.BytesUtil; import com.bigdata.ha.QuorumPipelineImpl; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -564,7 +565,7 @@ try { readFuture.get(); } catch (Exception e) { - log.warn(e, e); + log.warn("Pipeline should have been drained", e); } lock.lockInterruptibly(); @@ -579,6 +580,7 @@ } finally { final Client client = clientRef.get(); if (client != null) { + log.warn("Closing client"); client.close(); } } @@ -958,29 +960,44 @@ } -// boolean success = false; -// try { + boolean success = false; + try { doReceiveAndReplicate(client); -// success = true; + success = true; // success. return null; -// } finally { -// try { -// if(success) { -// ack(client); -// } else { -// nack(client); -// } -// } catch (IOException ex) { -// // log and ignore. -// log.error(ex, ex); -// } -// } + } finally { + try { + if (!success) { + // Drain, assuming that localBuffer is sized to be able to receive the full message + // TODO; confirm this assumption + if (localBuffer.capacity() < message.getSize()) + log.error("Insufficient buffer capacity"); + + // TODO: confirm that it is not possible for the next message to be sent to the pipeline since the + // RMI may have already failed and the next message could be on the way. If so the drain may read the + // start of the next message. + final int startDrain = localBuffer.position(); + final int msgSize = message.getSize(); + log.warn("Start drain at " + startDrain + ", message size: " + msgSize + ", blocking mode: " + client.client.isBlocking()); + while(localBuffer.position() < msgSize) { + if (client.client.read(localBuffer) <= 0) // either -1 or no bytes available + break; + } + log.warn("Drained the pipe of " + (localBuffer.position()-startDrain) + " bytes"); + } + } catch (IOException ex) { + // log and ignore. + log.error(ex, ex); + } + } } // call. private void doReceiveAndReplicate(final Client client) throws Exception { + + log.warn("doReceiveAndReplicate"); /* * We should now have parameters ready in the WriteMessage and can @@ -998,6 +1015,17 @@ // for debug retain number of low level reads int reads = 0; + // setup token values to search for any provided token prefix + final byte[] token = message.getToken(); + + boolean foundStart = token == null; // if null then not able to check + int tokenIndex = 0; + final byte[] tokenBuffer = token == null ? null : new byte[token.length]; + final ByteBuffer tokenBB = token == null ? null : ByteBuffer.wrap(tokenBuffer); + + if (log.isDebugEnabled()) + log.debug("Receive token: " + BytesUtil.toHexString(token)); + while (rem > 0 && !EOS) { // block up to the timeout. @@ -1049,15 +1077,68 @@ } + final Set<SelectionKey> keys = client.clientSelector .selectedKeys(); final Iterator<SelectionKey> iter = keys.iterator(); + int ntokenreads = 0; + int ntokenbytematches = 0; + while (iter.hasNext()) { iter.next(); iter.remove(); + + if (!foundStart) { + if (log.isDebugEnabled()) + log.debug("Looking for token, reads: " + ntokenreads); + // Note that the logic for finding the token bytes depends on the + // first byte in the token being unique! + // We have to be a bit clever to be sure we do not read beyond the + // token and therefore complicate the reading into the localBuffer. + // This is optimized for the normal case where the key token is read + // as the next bytes from the stream. In the worst case scenario this + // could read large amounts of data only a few bytes at a time, however + // this is not in reality a significant overhead. + while (tokenIndex < token.length ) { + final int remtok = token.length - tokenIndex; + tokenBB.limit(remtok); + tokenBB.position(0); + + final int rdLen = client.client.read(tokenBB); + for (int i = 0; i < rdLen; i++) { + if (tokenBuffer[i] != token[tokenIndex]) { + log.warn("TOKEN MISMATCH"); + tokenIndex = 0; + if (tokenBuffer[i] == token[tokenIndex]) { + tokenIndex++; + } + } else { + tokenIndex++; + ntokenbytematches++; + } + } + + ntokenreads++; + if (ntokenreads % 10000 == 0) { + if (log.isDebugEnabled()) + log.debug("...still looking, reads: " + ntokenreads); + } + + foundStart = tokenIndex == token.length; + } + + if (!foundStart) { // not sufficient data ready + // if (log.isDebugEnabled()) + log.warn("Not found token yet!"); + continue; + } else { + if (log.isDebugEnabled()) + log.debug("Found token after " + ntokenreads + " token reads and " + ntokenbytematches + " byte matches"); + } + } final int rdlen = client.client.read(localBuffer); @@ -1099,41 +1180,53 @@ * * Note: loop since addrNext might change asynchronously. */ - while(true) { - if (rdlen != 0 && addrNextRef.get() != null) { - if (log.isTraceEnabled()) - log.trace("Incremental send of " + rdlen + " bytes"); - final ByteBuffer out = localBuffer.asReadOnlyBuffer(); - out.position(localBuffer.position() - rdlen); - out.limit(localBuffer.position()); - synchronized (sendService) { - /* - * Note: Code block is synchronized on [downstream] - * to make the decision to start the HASendService - * that relays to [addrNext] atomic. The - * HASendService uses [synchronized] for its public - * methods so we can coordinate this lock with its - * synchronization API. - */ - if (!sendService.isRunning()) { - /* - * Prepare send service for incremental - * transfers to the specified address. - */ - // Check for termination. - client.checkFirstCause(); - // Note: use then current addrNext! - sendService.start(addrNextRef.get()); - continue; - } - } - // Check for termination. - client.checkFirstCause(); - // Send and await Future. - sendService.send(out).get(); - } - break; - } + while (true) { + if (rdlen != 0 && addrNextRef.get() != null) { + if (log.isTraceEnabled()) + log.trace("Incremental send of " + rdlen + + " bytes"); + final ByteBuffer out = localBuffer + .asReadOnlyBuffer(); + out.position(localBuffer.position() - rdlen); + out.limit(localBuffer.position()); + synchronized (sendService) { + /* + * Note: Code block is synchronized on + * [downstream] to make the decision to start + * the HASendService that relays to [addrNext] + * atomic. The HASendService uses [synchronized] + * for its public methods so we can coordinate + * this lock with its synchronization API. + */ + if (!sendService.isRunning()) { + /* + * Prepare send service for incremental + * transfers to the specified address. + */ + // Check for termination. + client.checkFirstCause(); + // Note: use then current addrNext! + sendService.start(addrNextRef.get()); + continue; + } + } + // Check for termination. + client.checkFirstCause(); + + // Send and await Future, sending message token if at start of buffer + if (out.position() == 0) { + log.warn("Sending token " + BytesUtil.toHexString(message.getToken())); + } + try { + sendService.send(out, out.position() == 0 ? message.getToken() : null).get(); + } catch(Throwable t) { + log.warn("Send downstream error", t); + + throw new RuntimeException(t); + } + } + break; + } } } // while( rem > 0 ) @@ -1295,6 +1388,7 @@ localBuffer.limit(message.getSize()); localBuffer.position(0); messageReady.signalAll(); + if (log.isTraceEnabled()) log.trace("Will accept data for message: msg=" + msg); Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -38,6 +38,8 @@ import org.apache.log4j.Logger; +import com.bigdata.btree.BytesUtil; + /** * A service for sending raw {@link ByteBuffer}s across a socket. This service * supports the HA write pipeline. This service is designed to be paired with an @@ -281,7 +283,11 @@ * @todo throws IOException if the {@link SocketChannel} was not open and * could not be opened. */ - public Future<Void> send(final ByteBuffer buffer) { +// public Future<Void> send(final ByteBuffer buffer) { +// return send(buffer, null); +// } + + public Future<Void> send(final ByteBuffer buffer, final byte[] token) { if (buffer == null) throw new IllegalArgumentException(); @@ -300,7 +306,9 @@ // reopenChannel(); - return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer())); + log.warn("Sending message with token: " + BytesUtil.toHexString(token)); + + return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer(), token)); } @@ -402,13 +410,14 @@ * * @param buffer * The buffer whose data are to be sent. + * @param token * * @return The task which will send the data to the configured * {@link InetSocketAddress}. */ - protected Callable<Void> newIncSendTask(final ByteBuffer buffer) { + protected Callable<Void> newIncSendTask(final ByteBuffer buffer, final byte[] token) { - return new IncSendTask(buffer); + return new IncSendTask(buffer, token); } @@ -460,8 +469,9 @@ // private final SocketChannel socketChannel; private final ByteBuffer data; + private final byte[] token; - public IncSendTask(/*final SocketChannel socketChannel, */final ByteBuffer data) { + public IncSendTask(/*final SocketChannel socketChannel, */final ByteBuffer data, final byte[] token) { // if (socketChannel == null) // throw new IllegalArgumentException(); @@ -472,12 +482,12 @@ // this.socketChannel = socketChannel; this.data = data; - + this.token = token; } public Void call() throws Exception { - // defer until we actually run. + // defer until we actually run. final SocketChannel socketChannel = reopenChannel(); if (!isRunning()) @@ -494,9 +504,22 @@ try { + int ntoken = 0; int nwritten = 0; while (nwritten < remaining) { + + log.warn("TOKEN: " + BytesUtil.toHexString(token) + ", written: " + (token == null ? false : ntoken == token.length)); + if (token != null && ntoken < token.length) { + final ByteBuffer tokenBB = ByteBuffer.wrap(token); + tokenBB.position(ntoken); + + ntoken += socketChannel.write(tokenBB); + + log.warn("Wrote " + ntoken + " token bytes"); + + continue; + } /* * Write the data. Depending on the channel, will either Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -2289,16 +2289,36 @@ abstract protected void doMemberAdd(); - abstract protected void doMemberRemove(); + final protected void doMemberRemove() { + doMemberRemove(serviceId); + } + abstract protected void doMemberRemove(UUID serviceId); + abstract protected void doCastVote(long lastCommitTime); - abstract protected void doWithdrawVote(); + final protected void doWithdrawVote() { + doWithdrawVote(serviceId); + } + abstract protected void doWithdrawVote(UUID serviceId); + abstract protected void doPipelineAdd(); - abstract protected void doPipelineRemove(); + final protected void doPipelineRemove() { + doPipelineRemove(serviceId); + } + abstract protected void doPipelineRemove(UUID serviceId); + + @Override + public void forceRemoveService(final UUID psid) { + doMemberRemove(psid); + doWithdrawVote(psid); + doPipelineRemove(psid); + doServiceLeave(psid); + } + /** * Invoked when our client will become the leader to (a) reorganize the * write pipeline such that our client is the first service in the write @@ -2396,8 +2416,12 @@ abstract protected void doServiceJoin(); - abstract protected void doServiceLeave(); + final protected void doServiceLeave() { + doServiceLeave(serviceId); + } + abstract protected void doServiceLeave(UUID serviceId); + abstract protected void doSetToken(long newToken); // abstract protected void doSetLastValidToken(long newToken); Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -205,4 +205,21 @@ */ void clearToken(); + /** + * Remove the service from the quorum. This should be called when a problem + * with the service is reported to the quorum leader, for example as a + * result of a failed RMI request or failed socket level write replication. + * Such errors arise either from network connectivity or service death. + * These problems will generally be cured, but the heatbeat timeout to cure + * the problem can cause write replication to block. This method may be used + * to force the timely reordering of the pipeline in order to work around + * the replication problem. This is not a permenant disabling of the service + * - the service may be restarted or may recover and reenter the quorum at + * any time. + * + * @param serviceId + * The UUID of the service to be removed. + */ + public void forceRemoveService(UUID serviceId); + } Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -60,6 +60,9 @@ { final TestSuite suite = new TestSuite("write pipeline"); + + // Test message buffer framing idiom (not required for CI). + // suite.addTestSuite(TestBufferFraming.class); // Test of HASendService and HAReceiveService (2 nodes). suite.addTestSuite(TestHASendAndReceive.class); Added: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java (rev 0) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -0,0 +1,191 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +package com.bigdata.ha.pipeline; + +import java.util.Random; + +import junit.framework.TestCase; + +/** + * This is a test suite for the buffer framing idiom. + * <p> + * BufferFraming is required to ensure that pipeline buffers are correctly + * identified by RMI messages. + * <p> + * There is currently a problem where receive tasks can be interrupted leaving + * data in the pipeline and subsequent data reads are unable to process the + * correct data. + * <p> + * A proposed solution is to prefix the buffer with an 8 byte identifier + * suitably unique to stochastically avoid problems of random matching errors. + * <p> + * This test class tests finding the offset of the long value in otherwise + * random data. Since it must read each byte this is complicated by the + * requirement to window shift. This complexity can be somewhat simplified by + * ensuring that each byte in the long key is unique. + * <p> + * Note: only the first byte in the key needs to be unique, guaranteeing that + * if a match attempt fails it is only necessary to check the failing character + * to see if that could be the start of a new match attempt. + * + * @author Martyn Cutcher + */ +public class TestBufferFraming extends TestCase { + +// private static final Logger log = Logger +// .getLogger(junit.framework.Test.class); + + boolean unique(final byte[] bytes) { + for (int i = 0; i < bytes.length; i++) { + final byte b = bytes[i]; + for (int t = i + 1; t < bytes.length; t++) { + if (bytes[t] == b) + return false; + } + } + + return true; + } + + boolean unique1(final byte[] bytes) { + final byte b = bytes[0]; + for (int t = 1; t < bytes.length; t++) { + if (bytes[t] == b) + return false; + } + + return true; + } + + /** + * Returns n bytes of unique values. + * + * The unique values are important to simplify testing + * against data streams. + * + * In fact the only important aspect is that the initial byte + * is unique! This is sufficient to identify the start point + * of the key in a data stream. + */ + byte[] genKey(Random r, final int size) { + final byte[] ret = new byte[size]; + + while (!unique1(ret)) { + r.nextBytes(ret); + } + + return ret; + } + + /** + * Functional test on performance of key generation + */ + public void testGenKey() { + final Random r = new Random(); + + final int keys = 100000000; // 100M + + final long start = System.currentTimeMillis(); + for (int i = 0; i < keys; i++) { + genKey(r, 8); + } + final long end = System.currentTimeMillis(); + + final long throughputms = (keys / (end - start)); + + assertTrue(throughputms > 10000L); // should be able to produce more than 10M keys per second + } + + /** + * Let's write a string into the middle of a load + * of random data and identify it with our generated key. + */ + public void testEmbeddedMessage() { + doEmbeddedMessage(); + } + + public void testStressEmbeddedMessage() { + for (int t = 0; t < 1000; t++) { + doEmbeddedMessage(); + } + } + + public void doEmbeddedMessage() { + final Random r = new Random(); + final byte[] buffer = new byte[10000000]; // 10M bytes + r.nextBytes(buffer); + + final String tst = "Hello World"; + final byte[] tstbytes = tst.getBytes(); + + final byte[] key = genKey(r, 8); + + int offset = r.nextInt(9000000); // somewhere in first 9M bytes + + // copy string into buffer + copy(key, 0, buffer, offset); + copy(tstbytes, 0, buffer, offset+key.length); + + final int position = find(key, buffer); + + assertTrue(position == offset); + + final byte[] copy = new byte[tstbytes.length]; + copy(buffer, position+key.length, copy, 0); + + final String tstRead = new String(copy); + + assertTrue(tstRead.equals(tst)); + } + + void copy(byte[] src, int srcOffset, byte[] dst, int dstOffset) { + int len = Math.min(src.length, dst.length); + + for (int i = 0; i < len; i++) { + dst[dstOffset+i] = src[srcOffset+i]; + } + } + + int find(final byte[] key, final byte[] buffer) { + final int endPos = buffer.length - key.length; + for (int i = 0; i < endPos; i++) { + if (buffer[i] == key[0]) { + boolean match = true; + for (int t = 1; match && t < key.length; t++) { + match = buffer[i+t] == key[t]; + + if (!match) { + i += t-1; + } + } + if (match) { + return i; + } + } + } + + return -1; + } +} Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -24,6 +24,11 @@ package com.bigdata.ha.pipeline; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Random; @@ -108,6 +113,32 @@ chk = null; } + + /** + * Need to check base message serialization + * + * @throws IOException + * @throws ClassNotFoundException + */ + public void testMessageSerialization() throws IOException, ClassNotFoundException { + final ByteArrayOutputStream boutstr = new ByteArrayOutputStream(); + final ObjectOutputStream obout = new ObjectOutputStream(boutstr); + + final HAWriteMessageBase src_msg = new HAWriteMessageBase(50, 23); + + obout.writeObject(src_msg); + obout.flush(); + + final ByteArrayInputStream binstr = new ByteArrayInputStream(boutstr.toByteArray()); + final ObjectInputStream obin = new ObjectInputStream(binstr); + + final Object dst_msg = obin.readObject(); + + assertTrue(src_msg.equals(dst_msg)); + + // now check that it would falsely compare against a different message + assertFalse(src_msg.equals(new HAWriteMessageBase(50, 23))); + } /** * Should we expect concurrency of the Socket send and RMI? It seems that we @@ -130,7 +161,7 @@ final IHAWriteMessageBase msg1 = new HAWriteMessageBase(50, chk.checksum(tst1)); final ByteBuffer rcv = ByteBuffer.allocate(2000); final Future<Void> futRec = receiveService.receiveData(msg1, rcv); - final Future<Void> futSnd = sendService.send(tst1); + final Future<Void> futSnd = sendService.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); assertEquals(tst1, rcv); @@ -140,7 +171,7 @@ final ByteBuffer tst2 = getRandomData(100); final IHAWriteMessageBase msg2 = new HAWriteMessageBase(100, chk.checksum(tst2)); final ByteBuffer rcv2 = ByteBuffer.allocate(2000); - final Future<Void> futSnd = sendService.send(tst2); + final Future<Void> futSnd = sendService.send(tst2, msg2.getToken()); final Future<Void> futRec = receiveService.receiveData(msg2, rcv2); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); @@ -149,6 +180,40 @@ } + public void testSimpleExchangeWithTokens() throws InterruptedException, ExecutionException, TimeoutException { + + final long timeout = 5000;// ms + { + final ByteBuffer tst1 = getRandomData(50); + final IHAWriteMessageBase msg1 = new HAWriteMessageBase(50, chk.checksum(tst1)); + final ByteBuffer rcv = ByteBuffer.allocate(2000); + final Future<Void> futRec = receiveService.receiveData(msg1, rcv); + final Future<Void> futSnd = sendService.send(tst1, msg1.getToken()); + futSnd.get(timeout,TimeUnit.MILLISECONDS); + futRec.get(timeout,TimeUnit.MILLISECONDS); + assertEquals(tst1, rcv); + } + + { + // how throw some random data into the stream to force the tokens to do something + final ByteBuffer junk = getRandomData(10000); + final Future<Void> futSnd = sendService.send(junk, null); + futSnd.get(timeout,TimeUnit.MILLISECONDS); + } + + { + final ByteBuffer tst2 = getRandomData(100); + final IHAWriteMessageBase msg2 = new HAWriteMessageBase(100, chk.checksum(tst2)); + final ByteBuffer rcv2 = ByteBuffer.allocate(2000); + final Future<Void> futSnd = sendService.send(tst2, msg2.getToken()); + final Future<Void> futRec = receiveService.receiveData(msg2, rcv2); + futSnd.get(timeout,TimeUnit.MILLISECONDS); + futRec.get(timeout,TimeUnit.MILLISECONDS); + assertEquals(tst2, rcv2); + } + + } + /** * Sends a large number of random buffers, confirming successful * transmission. @@ -168,7 +233,7 @@ final ByteBuffer rcv = ByteBuffer.allocate(sze + r.nextInt(1024)); // FutureTask return ensures remote ready for Socket data final Future<Void> futRec = receiveService.receiveData(msg, rcv); - final Future<Void> futSnd = sendService.send(tst); + final Future<Void> futSnd = sendService.send(tst, msg.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); assertEquals(tst, rcv); // make sure buffer has been transmitted @@ -199,7 +264,7 @@ assertEquals(sze,tst.limit()); // FutureTask return ensures remote ready for Socket data final Future<Void> futRec = receiveService.receiveData(msg, rcv); - final Future<Void> futSnd = sendService.send(tst); + final Future<Void> futSnd = sendService.send(tst, msg.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); // make sure buffer has been transmitted Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -177,7 +177,7 @@ // rcv.limit(50); final Future<Void> futRec1 = receiveServiceB.receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec1.get(timeout,TimeUnit.MILLISECONDS); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -195,7 +195,7 @@ // rcv.limit(50); final Future<Void> futRec1 = receiveServiceB.receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); while (!futSnd.isDone() && !futRec2.isDone()) { try { futSnd.get(10L, TimeUnit.MILLISECONDS); @@ -284,7 +284,7 @@ .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec1.get(timeout,TimeUnit.MILLISECONDS); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -317,7 +317,7 @@ .receiveData(msg1, rcv1); // final Future<Void> futRec2 = receiveService2 // .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1.duplicate()); + final Future<Void> futSnd = sendServiceA.send(tst1.duplicate(), msg1.getToken()); // Send will always succeed. futSnd.get(timeout, TimeUnit.MILLISECONDS); /* @@ -421,7 +421,7 @@ .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); // Send will always succeed. futSnd.get(timeout, TimeUnit.MILLISECONDS); /* @@ -498,7 +498,7 @@ // .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); // futRec1.get(); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -520,7 +520,7 @@ .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec1.get(timeout,TimeUnit.MILLISECONDS); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -553,7 +553,7 @@ rcv1); // final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, // rcv2); - final Future<Void> futSnd = sendServiceC.send(tst1); + final Future<Void> futSnd = sendServiceC.send(tst1, msg1.getToken()); futSnd.get(timeout, TimeUnit.MILLISECONDS); futRec1.get(timeout, TimeUnit.MILLISECONDS); // futRec2.get(timeout, TimeUnit.MILLISECONDS); @@ -576,7 +576,7 @@ rcv1); final Future<Void> futRec2 = receiveServiceA.receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceC.send(tst1); + final Future<Void> futSnd = sendServiceC.send(tst1, msg1.getToken()); futSnd.get(timeout, TimeUnit.MILLISECONDS); futRec1.get(timeout, TimeUnit.MILLISECONDS); futRec2.get(timeout, TimeUnit.MILLISECONDS); @@ -665,7 +665,7 @@ // FutureTask return ensures remote ready for Socket data final Future<Void> futRec1 = receiveServiceB.receiveData(msg, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst); + final Future<Void> futSnd = sendServiceA.send(tst, msg.getToken()); while (!futSnd.isDone() && !futRec1.isDone() && !futRec2.isDone()) { try { futSnd.get(10L, TimeUnit.MILLISECONDS); Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -846,32 +846,20 @@ fixture.memberAdd(serviceId); } - protected void doMemberRemove() { - fixture.memberRemove(serviceId); - } - protected void doCastVote(final long lastCommitTime) { fixture.castVote(serviceId, lastCommitTime); } - protected void doWithdrawVote() { - fixture.withdrawVote(serviceId); - } - protected void doPipelineAdd() { fixture.pipelineAdd(serviceId); } - protected void doPipelineRemove() { - fixture.pipelineRemove(serviceId); - } - protected void doServiceJoin() { fixture.serviceJoin(serviceId); } - protected void doServiceLeave() { - fixture.serviceLeave(serviceId); + protected void doServiceLeave(final UUID service) { + fixture.serviceLeave(service); } protected void doSetToken(final long newToken) { @@ -890,6 +878,21 @@ fixture.clearToken(); } + @Override + protected void doMemberRemove(UUID service) { + fixture.memberRemove(service); + } + + @Override + protected void doWithdrawVote(UUID service) { + fixture.withdrawVote(service); + } + + @Override + protected void doPipelineRemove(UUID service) { + fixture.pipelineRemove(service); + } + // /** // * {@inheritDoc} // * <p> Modified: branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -327,7 +327,7 @@ } @Override - protected void doMemberRemove() { + protected void doMemberRemove(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -340,7 +340,7 @@ try { zk.delete(logicalServiceId + "/" + QUORUM + "/" + QUORUM_MEMBER + "/" + QUORUM_MEMBER_PREFIX - + serviceIdStr, -1/* anyVersion */); + + service.toString(), -1/* anyVersion */); } catch (NoNodeException e) { // ignore. } catch (KeeperException e) { @@ -414,7 +414,7 @@ } @Override - protected void doPipelineRemove() { + protected void doPipelineRemove(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -446,7 +446,7 @@ } final QuorumServiceState state = (QuorumServiceState) SerializerUtil .deserialize(b); - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { zk.delete(zpath + "/" + s, -1/* anyVersion */); return; } @@ -636,7 +636,7 @@ * handles a concurrent delete by a simple retry loop. */ @Override - protected void doWithdrawVote() { + protected void doWithdrawVote(final UUID service) { // zpath for votes. final String votesZPath = getVotesZPath(); if (log.isInfoEnabled()) @@ -724,7 +724,7 @@ Thread.currentThread().interrupt(); return; } - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { // found our vote. try { // delete our vote. @@ -761,7 +761,7 @@ } // done. if (log.isInfoEnabled()) - log.info("withdrawn: serviceId=" + serviceIdStr + log.info("withdrawn: serviceId=" + service.toString() + ", lastCommitTime=" + lastCommitTime); return; } catch (NoNodeException e) { @@ -836,7 +836,7 @@ } @Override - protected void doServiceLeave() { + protected void doServiceLeave(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -871,7 +871,7 @@ } final QuorumServiceState state = (QuorumServiceState) SerializerUtil .deserialize(b); - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { // Found this service. zk.delete(zpath + "/" + s, -1/* anyVersion */); return; @@ -2492,5 +2492,4 @@ } } - } Modified: branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -45,6 +45,7 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; +import com.bigdata.ha.IndexManagerCallable; import com.bigdata.ha.halog.HALogWriter; import com.bigdata.ha.halog.IHALogReader; import com.bigdata.ha.msg.HARootBlockRequest; @@ -1031,6 +1032,180 @@ } /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, sure kill C (the last + * follower). Verify that the LOAD completes successfully with the remaining + * services (A+B). + */ + public void testABC_LiveLoadRemainsMet_kill_C() throws Exception { + + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // allow load head start + Thread.sleep(300/* ms */); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + kill(startup.serverC); + + awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); + + awaitMembers(new HAGlue[] {startup.serverA, startup.serverB}); + awaitJoined(new HAGlue[] {startup.serverA, startup.serverB}); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + } + + public void _testStressABC_LiveLoadRemainsMet_kill_C() throws Exception { + for (int i = 0; i < 5; i++) { + try { + testABC_LiveLoadRemainsMet_kill_C(); + } catch (Throwable t) { + fail("Run " + i, t); + } finally { + Thread.sleep(2000); + destroyAll(); + } + } + } + + /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, sure kill B (the first + * follower). Verify that the LOAD completes successfully with the remaining + * services (A+C), after the leader re-orders the pipeline. + */ + public void testABC_LiveLoadRemainsMet_kill_B() throws Exception { + + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // allow load head start + Thread.sleep(300/* ms */); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + kill(startup.serverB); + + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); + + // also check members and joined + awaitMembers(new HAGlue[] {startup.serverA, startup.serverC}); + awaitJoined(new HAGlue[] {startup.serverA, startup.serverC}); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + } + + /** + * Instead of killing B this forces its removal from Zookeeper using the + * forceRemoveService method in a task submitted to the leader. + * + * @throws Exception + */ + public void testABC_LiveLoadRemainsMet_remove_B() throws Exception { + + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // allow load head start + Thread.sleep(300/* ms */); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + startup.serverA.submit(new ForceRemoveService(startup.serverB.getServiceId()), false); + + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // ...and in this case we might also expect the service to rejoin + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC, startup.serverB}); + assertEquals(token, awaitFullyMetQuorum()); + + } + + static class... [truncated message content] |
From: <tho...@us...> - 2013-10-31 13:12:34
|
Revision: 7502 http://bigdata.svn.sourceforge.net/bigdata/?rev=7502&view=rev Author: thompsonbry Date: 2013-10-31 13:12:26 +0000 (Thu, 31 Oct 2013) Log Message: ----------- Branch to work on #724 (sudden kills) - specifically the resynchronization of the write replication at the socket channel level. Added Paths: ----------- branches/PIPELINE_RESYNC/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |