From: <mar...@us...> - 2014-04-02 13:13:00
|
Revision: 8026 http://sourceforge.net/p/bigdata/code/8026 Author: martyncutcher Date: 2014-04-02 13:12:56 +0000 (Wed, 02 Apr 2014) Log Message: ----------- Initial commit on creation of HA1_HA5 branch Modified Paths: -------------- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java Added Paths: ----------- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy2.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA5JournalServer.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-D.properties branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-E.properties branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/zkClient1.config branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/zkClient5.config branches/BIGDATA_MGC_HA1_HA5/branch-notes.txt Removed Paths: ------------- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-A.properties branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-B.properties branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-C.properties Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-02 13:03:59 UTC (rev 8025) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -3428,8 +3428,8 @@ if (quorum == null) return; - if (!quorum.isHighlyAvailable()) - return; +// if (!quorum.isHighlyAvailable()) +// return; /** * CRITICAL SECTION. We need obtain a distributed consensus for the Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2014-04-02 13:03:59 UTC (rev 8025) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -4574,7 +4574,7 @@ // } - log.warn("Starting NSS"); + log.warn("Starting NSS from " + jettyXml); // Start the server. jettyServer.start(); @@ -4658,8 +4658,9 @@ if (tmp == null) throw new IllegalStateException("Server is not running"); - return tmp.getConnectors()[0].getLocalPort(); - + final int port = tmp.getConnectors()[0].getLocalPort(); + haLog.warn("Returning NSSPort: " + port); + return port; } /** Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java 2014-04-02 13:03:59 UTC (rev 8025) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -1096,6 +1096,8 @@ + haLogBytesOnDisk// + ", journalSize=" + journalSize// + + ", thresholdPercentLogSize=" + + thresholdPercentLogSize// + ", percentLogSize=" + actualPercentLogSize// + "%, takeSnapshot=" + takeSnapshot // Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2014-04-02 13:03:59 UTC (rev 8025) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -133,7 +133,7 @@ * Implementation listens for the death of the child process and can be used * to decide when the child process is no longer executing. */ - private static class ServiceListener implements IServiceListener { + public static class ServiceListener implements IServiceListener { private volatile HAGlue haGlue; private volatile ProcessHelper processHelper; @@ -218,8 +218,12 @@ * The {@link Remote} interfaces for these services (if started and * successfully discovered). */ - private HAGlue serverA = null, serverB = null, serverC = null; + protected HAGlue serverA = null; + protected HAGlue serverB = null; + + protected HAGlue serverC = null; + /** * {@link UUID}s for the {@link HAJournalServer}s. */ @@ -232,17 +236,20 @@ * @see <a href="http://trac.bigdata.com/ticket/730" > Allow configuration * of embedded NSS jetty server using jetty-web.xml </a> */ - private final int A_JETTY_PORT = 8090, B_JETTY_PORT = A_JETTY_PORT + 1, - C_JETTY_PORT = B_JETTY_PORT + 1; + protected final int A_JETTY_PORT = 8090; + protected final int B_JETTY_PORT = A_JETTY_PORT + 1; + protected final int C_JETTY_PORT = B_JETTY_PORT + 1; /** * These {@link IServiceListener}s are used to reliably detect that the * corresponding process starts and (most importantly) that it is really * dies once it has been shutdown or destroyed. */ - private ServiceListener serviceListenerA = null, serviceListenerB = null; + protected ServiceListener serviceListenerA = null; - private ServiceListener serviceListenerC = null; + protected ServiceListener serviceListenerB = null; + + protected ServiceListener serviceListenerC = null; private LookupDiscoveryManager lookupDiscoveryManager = null; @@ -1143,14 +1150,14 @@ } - private void safeShutdown(final HAGlue haGlue, final File serviceDir, + void safeShutdown(final HAGlue haGlue, final File serviceDir, final ServiceListener serviceListener) { safeShutdown(haGlue, serviceDir, serviceListener, false/* now */); } - private void safeShutdown(final HAGlue haGlue, final File serviceDir, + protected void safeShutdown(final HAGlue haGlue, final File serviceDir, final ServiceListener serviceListener, final boolean now) { if (haGlue == null) @@ -1368,6 +1375,10 @@ } + protected String getZKConfigFile() { + return "zkClient.config"; + } + /** * Return Zookeeper quorum that can be used to reflect (or act on) the * distributed quorum state for the logical service. @@ -1382,7 +1393,7 @@ KeeperException, IOException { final Configuration config = ConfigurationProvider - .getInstance(new String[] { SRC_PATH + "zkClient.config" }); + .getInstance(new String[] { SRC_PATH + getZKConfigFile() }); zkClientConfig = new ZookeeperClientConfig(config); @@ -1551,7 +1562,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ - abstract private class StartServerTask implements Callable<HAGlue> { + public abstract class StartServerTask implements Callable<HAGlue> { private final String name; private final String configName; Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -0,0 +1,466 @@ +package com.bigdata.journal.jini.ha; + +import java.io.File; +import java.io.IOException; +import java.rmi.Remote; +import java.security.DigestException; +import java.security.NoSuchAlgorithmException; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.bigdata.ha.HAGlue; +import com.bigdata.jini.start.IServiceListener; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownATask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownBTask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownCTask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownTask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.ServiceListener; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartATask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartBTask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartCTask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartServerTask; +import com.bigdata.quorum.AsynchronousQuorumCloseException; + +public class AbstractHA5JournalServerTestCase extends + AbstractHA3JournalServerTestCase { + + /** + * The {@link Remote} interfaces for these services (if started and + * successfully discovered). + */ + protected HAGlue serverD = null; + + protected HAGlue serverE = null; + + /** + * {@link UUID}s for the {@link HAJournalServer}s. + */ + private UUID serverDId = UUID.randomUUID(); + private UUID serverEId = UUID.randomUUID(); + + /** + * The HTTP ports at which the services will respond. + * + * @see <a href="http://trac.bigdata.com/ticket/730" > Allow configuration + * of embedded NSS jetty server using jetty-web.xml </a> + */ + protected final int D_JETTY_PORT = C_JETTY_PORT + 1; + protected final int E_JETTY_PORT = D_JETTY_PORT + 1; + + protected String getZKConfigFile() { + return "zkClient5.config"; // 5 stage pipeline + } + + /** + * These {@link IServiceListener}s are used to reliably detect that the + * corresponding process starts and (most importantly) that it is really + * dies once it has been shutdown or destroyed. + */ + protected ServiceListener serviceListenerD = null, serviceListenerE = null; + + protected File getServiceDirD() { + return new File(getTestDir(), "D"); + } + + protected File getServiceDirE() { + return new File(getTestDir(), "E"); + } + + protected File getHAJournalFileD() { + return new File(getServiceDirD(), "bigdata-ha.jnl"); + } + + protected File getHAJournalFileE() { + return new File(getServiceDirE(), "bigdata-ha.jnl"); + } + + protected File getHALogDirD() { + return new File(getServiceDirD(), "HALog"); + } + + protected File getHALogDirE() { + return new File(getServiceDirE(), "HALog"); + } + + /** + * Start A then B then C. As each service starts, this method waits for that + * service to appear in the pipeline in the proper position. + * + * @return The ordered array of services <code>[A, B, C]</code> + */ + protected HAGlue[] startSequenceABCDE() throws Exception { + + startA(); + awaitPipeline(new HAGlue[] { serverA }); + + startB(); + awaitPipeline(new HAGlue[] { serverA, serverB }); + + startC(); + awaitPipeline(new HAGlue[] { serverA, serverB, serverC }); + + startD(); + awaitPipeline(new HAGlue[] { serverA, serverB, serverC, serverD }); + + startE(); + awaitPipeline(new HAGlue[] { serverA, serverB, serverC, serverD, serverE }); + + return new HAGlue[] { serverA, serverB, serverC, serverD, serverE }; + + } + + /** + * Helper class for simultaneous/seqeunced start of 3 HA services. + */ + protected class ABCDE { + + /** + * The services. + */ + final HAGlue serverA, serverB, serverC, serverD, serverE; + + /** + * Start of 3 HA services (this happens in the ctor). + * + * @param sequential + * True if the startup should be sequential or false + * if services should start concurrently. + * @throws Exception + */ + public ABCDE(final boolean sequential) + throws Exception { + + this(true/* sequential */, true/* newServiceStarts */); + + } + + /** + * Start of 3 HA services (this happens in the ctor). + * + * @param sequential + * True if the startup should be sequential or false if + * services should start concurrently. + * @param newServiceStarts + * When <code>true</code> the services are new, the database + * should be at <code>commitCounter:=0</code> and the + * constructor will check for the implicit create of the + * default KB. + * @throws Exception + */ + public ABCDE(final boolean sequential, final boolean newServiceStarts) + throws Exception { + + if (sequential) { + + final HAGlue[] services = startSequenceABCDE(); + + serverA = services[0]; + + serverB = services[1]; + + serverC = services[2]; + + serverD = services[3]; + + serverE = services[4]; + + } else { + + final List<Callable<HAGlue>> tasks = new LinkedList<Callable<HAGlue>>(); + + tasks.add(new StartATask(false/* restart */)); + tasks.add(new StartBTask(false/* restart */)); + tasks.add(new StartCTask(false/* restart */)); + tasks.add(new StartDTask(false/* restart */)); + tasks.add(new StartETask(false/* restart */)); + + // Start all servers in parallel. Wait up to a timeout. + final List<Future<HAGlue>> futures = executorService.invokeAll( + tasks, 30/* timeout */, TimeUnit.SECONDS); + + serverA = futures.get(0).get(); + + serverB = futures.get(1).get(); + + serverC = futures.get(2).get(); + + serverD = futures.get(3).get(); + + serverE = futures.get(4).get(); + + } + + // wait for the quorum to fully meet. + awaitFullyMetQuorum(); + + if(newServiceStarts) { + // wait for the initial commit point (KB create). + awaitCommitCounter(1L, serverA, serverB, serverC, serverD, serverE); + } + + } + + public void shutdownAll() throws InterruptedException, + ExecutionException { + + shutdownAll(false/* now */); + + } + + public void shutdownAll(final boolean now) throws InterruptedException, + ExecutionException { + + final List<Callable<Void>> tasks = new LinkedList<Callable<Void>>(); + + tasks.add(new SafeShutdownATask()); + tasks.add(new SafeShutdownBTask()); + tasks.add(new SafeShutdownCTask()); + tasks.add(new SafeShutdownDTask()); + tasks.add(new SafeShutdownETask()); + + // Start all servers in parallel. Wait up to a timeout. + final List<Future<Void>> futures = executorService.invokeAll( + tasks, 30/* timeout */, TimeUnit.SECONDS); + + futures.get(0).get(); + futures.get(1).get(); + futures.get(2).get(); + futures.get(3).get(); + futures.get(4).get(); + + } + + public void assertDigestsEqual() throws NoSuchAlgorithmException, DigestException, IOException { + assertDigestsEquals(new HAGlue[] { serverA, serverB, serverC, serverD, serverE }); + } + + } + + protected HAGlue startD() throws Exception { + + return new StartDTask(false/* restart */).call(); + + } + + protected HAGlue startE() throws Exception { + + return new StartETask(false/* restart */).call(); + + } + + protected HAGlue restartD() throws Exception { + + return new StartDTask(true/* restart */).call(); + + } + + protected HAGlue restartE() throws Exception { + + return new StartETask(true/* restart */).call(); + + } + + protected void shutdownD() throws IOException { + safeShutdown(serverD, getServiceDirD(), serviceListenerD, true); + + serverD = null; + serviceListenerD = null; + } + + protected void shutdownE() throws IOException { + safeShutdown(serverE, getServiceDirE(), serviceListenerE, true); + + serverE = null; + serviceListenerE = null; + } + + protected class StartDTask extends StartServerTask { + + public StartDTask(final boolean restart) { + + super("D", "HAJournal-D.config", serverDId, D_JETTY_PORT, + serviceListenerD = new ServiceListener(), restart); + + } + + @Override + public HAGlue call() throws Exception { + + if (restart) { + + safeShutdown(serverD, getServiceDirD(), serviceListenerD); + + serverD = null; + + } + + return serverD = start(); + + } + + } + + protected class StartETask extends StartServerTask { + + public StartETask(final boolean restart) { + + super("E", "HAJournal-E.config", serverEId, E_JETTY_PORT, + serviceListenerE = new ServiceListener(), restart); + + } + + @Override + public HAGlue call() throws Exception { + + if (restart) { + + safeShutdown(serverE, getServiceDirE(), serviceListenerE); + + serverE = null; + + } + + return serverE = start(); + + } + + } + + protected class SafeShutdownDTask extends SafeShutdownTask { + + public SafeShutdownDTask() { + this(false/* now */); + } + + public SafeShutdownDTask(final boolean now) { + super(serverD, getServiceDirC(), serviceListenerD, now); + } + + } + + protected class SafeShutdownETask extends SafeShutdownTask { + + public SafeShutdownETask() { + this(false/* now */); + } + + public SafeShutdownETask(final boolean now) { + super(serverE, getServiceDirC(), serviceListenerE, now); + } + + } + public AbstractHA5JournalServerTestCase() { + } + + public AbstractHA5JournalServerTestCase(final String name) { + super(name); + } + + protected void destroyAll() throws AsynchronousQuorumCloseException, + InterruptedException, TimeoutException { + /** + * The most reliable tear down is in reverse pipeline order. + * + * This may not be necessary long term but for now we want to avoid + * destroying the leader first since it can lead to problems as + * followers attempt to reform + */ + final HAGlue leader; + final File leaderServiceDir; + final ServiceListener leaderListener; + if (quorum.isQuorumMet()) { + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + /* + * Note: It is possible to resolve a proxy for a service that has + * been recently shutdown or destroyed. This is effectively a data + * race. + */ + final HAGlue t = quorum.getClient().getLeader(token); + if (t.equals(serverA)) { + leader = t; + leaderServiceDir = getServiceDirA(); + leaderListener = serviceListenerA; + } else if (t.equals(serverB)) { + leader = t; + leaderServiceDir = getServiceDirB(); + leaderListener = serviceListenerB; + } else if (t.equals(serverC)) { + leader = t; + leaderServiceDir = getServiceDirC(); + leaderListener = serviceListenerC; + } else if (t.equals(serverD)) { + leader = t; + leaderServiceDir = getServiceDirD(); + leaderListener = serviceListenerD; + } else if (t.equals(serverE)) { + leader = t; + leaderServiceDir = getServiceDirE(); + leaderListener = serviceListenerE; + } else { + if (serverA == null && serverB == null && serverC == null && serverD == null && serverE == null) { + /* + * There are no services running and nothing to shutdown. We + * probably resolved a stale proxy to the leader above. + */ + return; + } + throw new IllegalStateException( + "Leader is none of A, B, or C: leader=" + t + ", A=" + + serverA + ", B=" + serverB + ", C=" + serverC); + } + } else { + leader = null; + leaderServiceDir = null; + leaderListener = null; + } + + if (leader == null || !leader.equals(serverA)) { + destroyA(); + } + + if (leader == null || !leader.equals(serverB)) { + destroyB(); + } + + if (leader == null || !leader.equals(serverC)) { + destroyC(); + } + + if (leader == null || !leader.equals(serverD)) { + destroyD(); + } + + if (leader == null || !leader.equals(serverE)) { + destroyE(); + } + + // Destroy leader last + if (leader != null) { + safeDestroy(leader, leaderServiceDir, leaderListener); + + serverA = serverB = serverC = serverD = serverE = null; + serviceListenerA = serviceListenerC = serviceListenerB = serviceListenerD = serviceListenerE = null; + } + + } + + protected void destroyD() { + safeDestroy(serverD, getServiceDirD(), serviceListenerD); + serverD = null; + serviceListenerD = null; + } + + protected void destroyE() { + safeDestroy(serverE, getServiceDirE(), serviceListenerE); + serverE = null; + serviceListenerE = null; + } + +} Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2014-04-02 13:03:59 UTC (rev 8025) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -440,8 +440,17 @@ * Wait for the service to report that it is ready as a leader or * follower. */ - haGlue.awaitHAReady(awaitQuorumTimeout, TimeUnit.MILLISECONDS); + return awaitNSSAndHAReady(haGlue, awaitQuorumTimeout, TimeUnit.MILLISECONDS); + } + + protected HAStatusEnum awaitNSSAndHAReady(final HAGlue haGlue, long timout, TimeUnit unit) + throws Exception { /* + * Wait for the service to report that it is ready as a leader or + * follower. + */ + haGlue.awaitHAReady(timout, unit); + /* * Wait for the NSS to report the status of the service (this verifies * that the NSS interface is running). */ @@ -525,7 +534,9 @@ protected String getNanoSparqlServerURL(final HAGlue haGlue) throws IOException { - return "http://localhost:" + haGlue.getNSSPort(); + final int port = haGlue.getNSSPort(); + + return "http://localhost:" + port; } @@ -541,7 +552,7 @@ final String sparqlEndpointURL = getNanoSparqlServerURL(haGlue) + "/sparql"; - + // Client for talking to the NSS. final HttpClient httpClient = new DefaultHttpClient(ccm); @@ -1248,4 +1259,35 @@ } + /** + * The effective name for this test as used to name the directories in which + * we store things. + * + * TODO If there are method name collisions across the different test + * classes then the test suite name can be added to this. Also, if there are + * file naming problems, then this value can be munged before it is + * returned. + */ + private final String effectiveTestFileName = getClass().getSimpleName() + + "." + getName(); + + /** + * The directory that is the parent of each {@link HAJournalServer}'s + * individual service directory. + */ + protected File getTestDir() { + return new File(TGT_PATH, getEffectiveTestFileName()); + } + + /** + * The effective name for this test as used to name the directories in which + * we store things. + */ + protected String getEffectiveTestFileName() { + + return effectiveTestFileName; + + } + + } Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config 2014-04-02 13:12:56 UTC (rev 8026) @@ -0,0 +1,288 @@ +import net.jini.jeri.BasicILFactory; +import net.jini.jeri.BasicJeriExporter; +import net.jini.jeri.tcp.TcpServerEndpoint; + +import net.jini.discovery.LookupDiscovery; +import net.jini.core.discovery.LookupLocator; +import net.jini.core.entry.Entry; +import net.jini.lookup.entry.Name; +import net.jini.lookup.entry.Comment; +import net.jini.lookup.entry.Address; +import net.jini.lookup.entry.Location; +import net.jini.lookup.entry.ServiceInfo; +import net.jini.core.lookup.ServiceTemplate; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.UUID; + +import com.bigdata.util.NV; +import com.bigdata.util.config.NicUtil; +import com.bigdata.journal.Options; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.jini.ha.HAJournal; +import com.bigdata.jini.lookup.entry.*; +import com.bigdata.service.IBigdataClient; +import com.bigdata.service.AbstractTransactionService; +import com.bigdata.service.jini.*; +import com.bigdata.service.jini.lookup.DataServiceFilter; +import com.bigdata.service.jini.master.ServicesTemplate; +import com.bigdata.jini.start.config.*; +import com.bigdata.jini.util.ConfigMath; + +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; + +// imports for various options. +import com.bigdata.btree.IndexMetadata; +import com.bigdata.btree.keys.KeyBuilder; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.spo.SPORelation; +import com.bigdata.rdf.spo.SPOKeyOrder; +import com.bigdata.rdf.lexicon.LexiconRelation; +import com.bigdata.rdf.lexicon.LexiconKeyOrder; +import com.bigdata.rawstore.Bytes; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeUnit.*; + +/* + * This is a sample configuration file for a highly available Journal. A + * version of this file must be available to each HAJournalServer in the + * pipeline. + */ + +/* + * Globals. + */ +bigdata { + + private static fedname = "benchmark"; + + // NanoSparqlServer (http) port. + private static nssPort = ConfigMath.add(8090,3); + + // write replication pipeline port (listener). + private static haPort = ConfigMath.add(9090,3); + + // The #of services in the write pipeline. + private static replicationFactor = 5; + + // The logical service identifier shared by all members of the quorum. + private static logicalServiceId = System.getProperty("test.logicalServiceId","CI-HAJournal-1"); + + // The service directory. + // Note: Overridden by environment property when deployed. + private static serviceDir = new File(System.getProperty("test.serviceDir",ConfigMath.getAbsolutePath(new File(new File(fedname,logicalServiceId),"D")))); + //new File(new File(fedname,logicalServiceId),"D"); + + // journal data directory. + private static dataDir = serviceDir; + + // one federation, multicast discovery. + //static private groups = LookupDiscovery.ALL_GROUPS; + + // unicast discovery or multiple setups, MUST specify groups. + static private groups = new String[]{bigdata.fedname}; + + /** + * One or more unicast URIs of the form <code>jini://host/</code> + * or <code>jini://host:port/</code> (no default). + * + * This MAY be an empty array if you want to use multicast + * discovery <strong>and</strong> you have specified the groups as + * LookupDiscovery.ALL_GROUPS (a <code>null</code>). + */ + static private locators = new LookupLocator[] { + + // runs jini on the localhost using unicast locators. + new LookupLocator("jini://localhost/") + + }; + + /** + * A common point to set the Zookeeper client's requested + * sessionTimeout and the jini lease timeout. The default lease + * renewal period for jini is 5 minutes while for zookeeper it is + * more like 5 seconds. This puts the two systems onto a similar + * timeout period so that a disconnected client is more likely to + * be noticed in roughly the same period of time for either + * system. A value larger than the zookeeper default helps to + * prevent client disconnects under sustained heavy load. + * + * If you use a short lease timeout (LT 20s), then you need to override + * properties properties for the net.jini.lease.LeaseRenewalManager + * or it will run in a tight loop (it's default roundTripTime is 10s + * and it schedules lease renewals proactively.) + */ + + // jini + static private leaseTimeout = ConfigMath.s2ms(20); + + // zookeeper + static private sessionTimeout = (int)ConfigMath.s2ms(20); + + /* + * Configuration for default KB. + */ + + private static namespace = "kb"; + + private static kb = new NV[] { + + /* Setup for QUADS mode without the full text index. */ + + new NV(BigdataSail.Options.TRUTH_MAINTENANCE, "false" ), + new NV(BigdataSail.Options.QUADS, "true"), + new NV(BigdataSail.Options.STATEMENT_IDENTIFIERS, "false"), + new NV(BigdataSail.Options.TEXT_INDEX, "false"), + new NV(BigdataSail.Options.AXIOMS_CLASS,"com.bigdata.rdf.axioms.NoAxioms"), + new NV(BigdataSail.Options.QUERY_TIME_EXPANDER, "false"), + + // Bump up the branching factor for the lexicon indices on the named kb. + // com.bigdata.namespace.kb.lex.com.bigdata.btree.BTree.branchingFactor=400 + new NV(com.bigdata.config.Configuration.getOverrideProperty + ( namespace + "." + LexiconRelation.NAME_LEXICON_RELATION, + IndexMetadata.Options.BTREE_BRANCHING_FACTOR + ), "400"), + + // Bump up the branching factor for the statement indices on the named kb. + // com.bigdata.namespace.kb.spo.com.bigdata.btree.BTree.branchingFactor=1024 + new NV(com.bigdata.config.Configuration.getOverrideProperty + ( namespace + "." + SPORelation.NAME_SPO_RELATION, + IndexMetadata.Options.BTREE_BRANCHING_FACTOR + ), "1024"), + }; + +} + +/* + * Zookeeper client configuration. + */ +org.apache.zookeeper.ZooKeeper { + + /* Root znode for the federation instance. */ + zroot = "/" + bigdata.fedname; + + /* A comma separated list of host:port pairs, where the port is + * the CLIENT port for the zookeeper server instance. + */ + // standalone. + servers = "localhost:2081"; + + /* Session timeout (optional). */ + sessionTimeout = bigdata.sessionTimeout; + + /* + * ACL for the zookeeper nodes created by the bigdata federation. + * + * Note: zookeeper ACLs are not transmitted over secure channels + * and are placed into plain text Configuration files by the + * ServicesManagerServer. + */ + acl = new ACL[] { + + new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone")) + + }; +} + +/* + * You should not have to edit below this line. + */ + +/* + * Jini client configuration. + */ +com.bigdata.service.jini.JiniClient { + + groups = bigdata.groups; + + locators = bigdata.locators; + + entries = new Entry[] { + + // Optional metadata entries. + new Name("D"), + + // Note: Used to assign the ServiceID to the service. + new ServiceUUID(UUID.fromString(System.getProperty("test.serviceId"))) + + }; + +} + +net.jini.lookup.JoinManager { + + maxLeaseDuration = bigdata.leaseTimeout; + +} + +/* + * Server configuration options. + */ +com.bigdata.journal.jini.ha.HAJournalServer { + + args = new String[] { + "-showversion", + "-Djava.security.policy=policy.all", + "-Dlog4j.configuration=file:log4j-D.properties", + "-Djava.util.logging.config.file=logging-D.properties", + "-server", + "-Xmx1G", + "-ea", + "-Xdebug","-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1053" + }; + + serviceDir = bigdata.serviceDir; + + // Default policy. + restorePolicy = new com.bigdata.journal.jini.ha.DefaultRestorePolicy(); + + // Suppress automatic snapshots. + snapshotPolicy = new com.bigdata.journal.jini.ha.NoSnapshotPolicy(); + + logicalServiceId = bigdata.logicalServiceId; + + writePipelineAddr = new InetSocketAddress("localhost",bigdata.haPort); + + /* + writePipelineAddr = new InetSocketAddress(// + InetAddress.getByName(// + NicUtil.getIpAddress("default.nic", "default", + false// loopbackOk + )), // + bigdata.haPort + ); + */ + + replicationFactor = bigdata.replicationFactor; + + // Use the overridden version of the HAJournal by default so we get the + // HAGlueTest API for every test. + HAJournalClass = "com.bigdata.journal.jini.ha.HAJournalTest"; + +} + +/* + * Journal configuration. + */ +com.bigdata.journal.jini.ha.HAJournal { + + properties = (NV[]) ConfigMath.concat(new NV[] { + + new NV(Options.FILE, + ConfigMath.getAbsolutePath(new File(bigdata.dataDir,"bigdata-ha.jnl"))), + + new NV(Options.BUFFER_MODE,""+BufferMode.DiskRW), + + new NV(IndexMetadata.Options.WRITE_RETENTION_QUEUE_CAPACITY,"4000"), + + new NV(IndexMetadata.Options.BTREE_BRANCHING_FACTOR,"128"), + + new NV(AbstractTransactionService.Options.MIN_RELEASE_AGE,"1"), + + }, bigdata.kb); + +} Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config 2014-04-02 13:12:56 UTC (rev 8026) @@ -0,0 +1,288 @@ +import net.jini.jeri.BasicILFactory; +import net.jini.jeri.BasicJeriExporter; +import net.jini.jeri.tcp.TcpServerEndpoint; + +import net.jini.discovery.LookupDiscovery; +import net.jini.core.discovery.LookupLocator; +import net.jini.core.entry.Entry; +import net.jini.lookup.entry.Name; +import net.jini.lookup.entry.Comment; +import net.jini.lookup.entry.Address; +import net.jini.lookup.entry.Location; +import net.jini.lookup.entry.ServiceInfo; +import net.jini.core.lookup.ServiceTemplate; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.UUID; + +import com.bigdata.util.NV; +import com.bigdata.util.config.NicUtil; +import com.bigdata.journal.Options; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.jini.ha.HAJournal; +import com.bigdata.jini.lookup.entry.*; +import com.bigdata.service.IBigdataClient; +import com.bigdata.service.AbstractTransactionService; +import com.bigdata.service.jini.*; +import com.bigdata.service.jini.lookup.DataServiceFilter; +import com.bigdata.service.jini.master.ServicesTemplate; +import com.bigdata.jini.start.config.*; +import com.bigdata.jini.util.ConfigMath; + +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; + +// imports for various options. +import com.bigdata.btree.IndexMetadata; +import com.bigdata.btree.keys.KeyBuilder; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.spo.SPORelation; +import com.bigdata.rdf.spo.SPOKeyOrder; +import com.bigdata.rdf.lexicon.LexiconRelation; +import com.bigdata.rdf.lexicon.LexiconKeyOrder; +import com.bigdata.rawstore.Bytes; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeUnit.*; + +/* + * This is a sample configuration file for a highly available Journal. A + * version of this file must be available to each HAJournalServer in the + * pipeline. + */ + +/* + * Globals. + */ +bigdata { + + private static fedname = "benchmark"; + + // NanoSparqlServer (http) port. + private static nssPort = ConfigMath.add(8090,4); + + // write replication pipeline port (listener). + private static haPort = ConfigMath.add(9090,4); + + // The #of services in the write pipeline. + private static replicationFactor = 5; + + // The logical service identifier shared by all members of the quorum. + private static logicalServiceId = System.getProperty("test.logicalServiceId","CI-HAJournal-1"); + + // The service directory. + // Note: Overridden by environment property when deployed. + private static serviceDir = new File(System.getProperty("test.serviceDir",ConfigMath.getAbsolutePath(new File(new File(fedname,logicalServiceId),"E")))); + //new File(new File(fedname,logicalServiceId),"E"); + + // journal data directory. + private static dataDir = serviceDir; + + // one federation, multicast discovery. + //static private groups = LookupDiscovery.ALL_GROUPS; + + // unicast discovery or multiple setups, MUST specify groups. + static private groups = new String[]{bigdata.fedname}; + + /** + * One or more unicast URIs of the form <code>jini://host/</code> + * or <code>jini://host:port/</code> (no default). + * + * This MAY be an empty array if you want to use multicast + * discovery <strong>and</strong> you have specified the groups as + * LookupDiscovery.ALL_GROUPS (a <code>null</code>). + */ + static private locators = new LookupLocator[] { + + // runs jini on the localhost using unicast locators. + new LookupLocator("jini://localhost/") + + }; + + /** + * A common point to set the Zookeeper client's requested + * sessionTimeout and the jini lease timeout. The default lease + * renewal period for jini is 5 minutes while for zookeeper it is + * more like 5 seconds. This puts the two systems onto a similar + * timeout period so that a disconnected client is more likely to + * be noticed in roughly the same period of time for either + * system. A value larger than the zookeeper default helps to + * prevent client disconnects under sustained heavy load. + * + * If you use a short lease timeout (LT 20s), then you need to override + * properties properties for the net.jini.lease.LeaseRenewalManager + * or it will run in a tight loop (it's default roundTripTime is 10s + * and it schedules lease renewals proactively.) + */ + + // jini + static private leaseTimeout = ConfigMath.s2ms(20); + + // zookeeper + static private sessionTimeout = (int)ConfigMath.s2ms(20); + + /* + * Configuration for default KB. + */ + + private static namespace = "kb"; + + private static kb = new NV[] { + + /* Setup for QUADS mode without the full text index. */ + + new NV(BigdataSail.Options.TRUTH_MAINTENANCE, "false" ), + new NV(BigdataSail.Options.QUADS, "true"), + new NV(BigdataSail.Options.STATEMENT_IDENTIFIERS, "false"), + new NV(BigdataSail.Options.TEXT_INDEX, "false"), + new NV(BigdataSail.Options.AXIOMS_CLASS,"com.bigdata.rdf.axioms.NoAxioms"), + new NV(BigdataSail.Options.QUERY_TIME_EXPANDER, "false"), + + // Bump up the branching factor for the lexicon indices on the named kb. + // com.bigdata.namespace.kb.lex.com.bigdata.btree.BTree.branchingFactor=400 + new NV(com.bigdata.config.Configuration.getOverrideProperty + ( namespace + "." + LexiconRelation.NAME_LEXICON_RELATION, + IndexMetadata.Options.BTREE_BRANCHING_FACTOR + ), "400"), + + // Bump up the branching factor for the statement indices on the named kb. + // com.bigdata.namespace.kb.spo.com.bigdata.btree.BTree.branchingFactor=1024 + new NV(com.bigdata.config.Configuration.getOverrideProperty + ( namespace + "." + SPORelation.NAME_SPO_RELATION, + IndexMetadata.Options.BTREE_BRANCHING_FACTOR + ), "1024"), + }; + +} + +/* + * Zookeeper client configuration. + */ +org.apache.zookeeper.ZooKeeper { + + /* Root znode for the federation instance. */ + zroot = "/" + bigdata.fedname; + + /* A comma separated list of host:port pairs, where the port is + * the CLIENT port for the zookeeper server instance. + */ + // standalone. + servers = "localhost:2081"; + + /* Session timeout (optional). */ + sessionTimeout = bigdata.sessionTimeout; + + /* + * ACL for the zookeeper nodes created by the bigdata federation. + * + * Note: zookeeper ACLs are not transmitted over secure channels + * and are placed into plain text Configuration files by the + * ServicesManagerServer. + */ + acl = new ACL[] { + + new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone")) + + }; +} + +/* + * You should not have to edit below this line. + */ + +/* + * Jini client configuration. + */ +com.bigdata.service.jini.JiniClient { + + groups = bigdata.groups; + + locators = bigdata.locators; + + entries = new Entry[] { + + // Optional metadata entries. + new Name("E"), + + // Note: Used to assign the ServiceID to the service. + new ServiceUUID(UUID.fromString(System.getProperty("test.serviceId"))) + + }; + +} + +net.jini.lookup.JoinManager { + + maxLeaseDuration = bigdata.leaseTimeout; + +} + +/* + * Server configuration options. + */ +com.bigdata.journal.jini.ha.HAJournalServer { + + args = new String[] { + "-showversion", + "-Djava.security.policy=policy.all", + "-Dlog4j.configuration=file:log4j-E.properties", + "-Djava.util.logging.config.file=logging-E.properties", + "-server", + "-Xmx1G", + "-ea", + "-Xdebug","-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1054" + }; + + serviceDir = bigdata.serviceDir; + + // Default policy. + restorePolicy = new com.bigdata.journal.jini.ha.DefaultRestorePolicy(); + + // Suppress automatic snapshots. + snapshotPolicy = new com.bigdata.journal.jini.ha.NoSnapshotPolicy(); + + logicalServiceId = bigdata.logicalServiceId; + + writePipelineAddr = new InetSocketAddress("localhost",bigdata.haPort); + + /* + writePipelineAddr = new InetSocketAddress(// + InetAddress.getByName(// + NicUtil.getIpAddress("default.nic", "default", + false// loopbackOk + )), // + bigdata.haPort + ); + */ + + replicationFactor = bigdata.replicationFactor; + + // Use the overridden version of the HAJournal by default so we get the + // HAGlueTest API for every test. + HAJournalClass = "com.bigdata.journal.jini.ha.HAJournalTest"; + +} + +/* + * Journal configuration. + */ +com.bigdata.journal.jini.ha.HAJournal { + + properties = (NV[]) ConfigMath.concat(new NV[] { + + new NV(Options.FILE, + ConfigMath.getAbsolutePath(new File(bigdata.dataDir,"bigdata-ha.jnl"))), + + new NV(Options.BUFFER_MODE,""+BufferMode.DiskRW), + + new NV(IndexMetadata.Options.WRITE_RETENTION_QUEUE_CAPACITY,"4000"), + + new NV(IndexMetadata.Options.BTREE_BRANCHING_FACTOR,"128"), + + new NV(AbstractTransactionService.Options.MIN_RELEASE_AGE,"1"), + + }, bigdata.kb); + +} Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -0,0 +1,114 @@ +package com.bigdata.journal.jini.ha; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.HAStatusEnum; + +import net.jini.config.Configuration; + + +public class TestHA1JournalServer extends AbstractHA3JournalServerTestCase { + + /** + * {@inheritDoc} + * <p> + * Note: This overrides some {@link Configuration} values for the + * {@link HAJournalServer} in order to establish conditions suitable for + * testing the {@link ISnapshotPolicy} and {@link IRestorePolicy}. + */ + @Override + protected String[] getOverrides() { + + return new String[]{ +// "com.bigdata.journal.HAJournal.properties=" +TestHA3JournalServer.getTestHAJournalProperties(com.bigdata.journal.HAJournal.properties), + "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)", + "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", +// "com.bigdata.journal.jini.ha.HAJournalServer.HAJournalClass=\""+HAJournalTest.class.getName()+"\"", + "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", + "com.bigdata.journal.jini.ha.HAJournalServer.replicationFactor=1", + }; + + } + + protected String getZKConfigFile() { + return "zkClient1.config"; // 1 stage pipeline + } + + public TestHA1JournalServer() { + } + + public TestHA1JournalServer(String name) { + super(name); + } + + public void testStartA() throws Exception { + doStartA(); + } + + protected void doStartA() throws Exception { + + try { + quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + fail("HA1 requires quorum of 1!"); + } catch (TimeoutException te) { + // expected + } + + // Start 1 service. + final HAGlue serverA = startA(); + + // this should succeed + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + assertEquals(token, awaitFullyMetQuorum()); + + final HAGlue leader = quorum.getClient().getLeader(token); + + assertEquals(serverA, leader); + } + + public void testSimpleTransaction() throws Exception { + doStartA(); + + serverA.awaitHAReady(2, TimeUnit.SECONDS); + + /* + * Awaiting HAReady is not sufficient since the service may still + * writing the initial transaction. + * + * So it seems that the problem is not so much with HA1 as rather the + * status of a new journal being ready too soon to process an NSS + * request + */ + + awaitCommitCounter(1, new HAGlue[] { serverA}); + + // Thread.sleep(100); + + // serverA. + + log.warn("Calling SimpleTransaction"); + simpleTransaction(); + + awaitCommitCounter(2, new HAGlue[] { serverA}); + } + + public void testMultiTransaction() throws Exception { + doStartA(); + + awaitCommitCounter(1, new HAGlue[] { serverA}); + // Thread.sleep(1000); + + final int NTRANS = 10; + for (int t = 0; t < NTRANS; t++) { + simpleTransaction(); + } + + awaitCommitCounter(NTRANS+1, new HAGlue[] { serverA}); +} +} Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -0,0 +1,530 @@ +package com.bigdata.journal.jini.ha; + +import java.util.UUID; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.HAStatusEnum; +import com.bigdata.ha.msg.HARootBlockRequest; +import com.bigdata.ha.msg.HASnapshotRequest; +import com.bigdata.ha.msg.IHASnapshotResponse; +import com.bigdata.journal.IRootBlockView; +import com.bigdata.journal.Journal; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.LargeLoadTask; +import com.bigdata.rdf.sail.webapp.client.RemoteRepository; + +import net.jini.config.Configuration; + +public class TestHA1SnapshotPolicy extends AbstractHA3BackupTestCase { + + public TestHA1SnapshotPolicy() { + } + + public TestHA1SnapshotPolicy(String name) { + super(name); + } + + protected String getZKConfigFile() { + return "zkClient1.config"; // 1 stage pipeline + } + + /** + * {@inheritDoc} + * <p> + * Note: This overrides some {@link Configuration} values for the + * {@link HAJournalServer} in order to establish conditions suitable for + * testing the {@link ISnapshotPolicy} and {@link IRestorePolicy}. + */ + @Override + protected String[] getOverrides() { + + /* + * We need to set the time at which the DefaultSnapshotPolicy runs to + * some point in the Future in order to avoid test failures due to + * violated assumptions when the policy runs up self-triggering (based + * on the specified run time) during a CI run. + */ + final String neverRun = getNeverRunSnapshotTime(); + + /* + * For HA1, must have onlineDisasterRecovery to ensure logs are maintained + */ + return new String[]{ + "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)", + "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.DefaultSnapshotPolicy("+neverRun+",0)", + // "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", + "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", + "com.bigdata.journal.jini.ha.HAJournalServer.replicationFactor=1", + }; + + } + + /** + * Start a service. The quorum meets. Take a snapshot. Verify that the + * snapshot appears within a reasonable period of time and that it is for + * <code>commitCounter:=1</code> (just the KB create). Verify that the + * digest of the snapshot agrees with the digest of the journal. + */ + public void testA_snapshot() throws Exception { + + // Start 1 service. + final HAGlue serverA = startA(); + + // Wait for a quorum meet. + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + // Await initial commit point (KB create). + awaitCommitCounter(1L, serverA); + + final HAGlue leader = quorum.getClient().getLeader(token); + assertEquals(serverA, leader); // A is the leader. + { + + // Verify quorum is still valid. + quorum.assertQuorum(token); + + // Verify quorum is at the expected commit point. + assertEquals( + 1L, + leader.getRootBlock( + new HARootBlockRequest(null/* storeUUID */)) + .getRootBlock().getCommitCounter()); + + // Snapshot directory is empty. + assertEquals(0, recursiveCount(getSnapshotDirA(),SnapshotManager.SNAPSHOT_FILTER)); + + final Future<IHASnapshotResponse> ft = leader + .takeSnapshot(new HASnapshotRequest(0/* percentLogSize */)); + + // wait for the snapshot. + try { + ft.get(5, TimeUnit.SECONDS); + } catch (TimeoutException ex) { + ft.cancel(true/* mayInterruptIfRunning */); + throw ex; + } + + final IRootBlockView snapshotRB = ft.get().getRootBlock(); + + final long commitCounter = 1L; + + // Verify snapshot is for the expected commit point. + assertEquals(commitCounter, snapshotRB.getCommitCounter()); + + // Snapshot directory contains the desired filename. + assertExpectedSnapshots(getSnapshotDirA(), + new long[] { commitCounter }); + + // Verify digest of snapshot agrees with digest of journal. + assertSnapshotDigestEquals(leader, commitCounter); + + } + + } + /** + * Start service. The quorum meets. Take a snapshot. Verify that the + * snapshot appears within a resonable period of time and that it is for + * <code>commitCounter:=1</code> (just the KB create). Request a second + * snapshot for the same commit point and verify that a <code>null</code> is + * returned since we already have a snapshot for that commit point. + */ + public void testA_snapshot_await_snapshot_null() throws Exception { + + // Start 2 services. + final HAGlue serverA = startA(); + + // Wait for a quorum meet. + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + // Await initial commit point (KB create). + awaitCommitCounter(1L, serverA); + + final HAGlue leader = quorum.getClient().getLeader(token); + + { + + // Verify quorum is still valid. + quorum.assertQuorum(token); + + // Verify quorum is at the expected commit point. + assertEquals( + 1L, + leader.getRootBlock( + new HARootBlockRequest(null/* storeUUID */)) + .getRootBlock().getCommitCounter()); + + // Snapshot directory is empty. + assertEquals(0, recursiveCount(getSnapshotDirA(),SnapshotManager.SNAPSHOT_FILTER)); + + final Future<IHASnapshotResponse> ft = leader + .takeSnapshot(new HASnapshotRequest(0/* percentLogSize */)); + + // wait for the snapshot. + try { + ft.get(5, TimeUnit.SECONDS); + } catch (TimeoutException ex) { + ft.cancel(true/* mayInterruptIfRunning */); + throw ex; + } + + final IRootBlockView snapshotRB = ft.get().getRootBlock(); + + final long commitCounter = 1L; + + // Verify snapshot is for the expected commit point. + assertEquals(commitCounter, snapshotRB.getCommitCounter()); + + // Snapshot directory contains the expected snapshot(s). + assertExpectedSnapshots(getSnapshotDirA(), + new long[] { commitCounter }); + + // Verify digest of snapshot agrees with digest of journal. + assertSnapshotDigestEquals(leader, commitCounter); + + } + + /* + * Verify 2nd request returns null since snapshot exists for that + * commit point. + */ + { + + // Verify quorum is still at the expected commit point. + assertEquals( + 1L, + leader.getRootBlock( + new HARootBlockRequest(null/* storeUUID */)) + .getRootBlock().getCommitCounter()); + + // request another snapshot. + final Future<IHASnapshotResponse> ft = leader + .takeSnapshot(new HASnapshotRequest(0/* percentLogSize */)); + + if (ft != null) { + + ft.cancel(true/* mayInteruptIfRunning */); + + fail("Expecting null since snapshot exists for current commit point."); + + } + + } + + } + + /** + * Test ability to request a snapshot using an HTTP GET + * <code>.../status?snapshot</code>. + * + * TODO Variant where the percentLogSize parameter is also expressed and + * verify that the semantics of that argument are obeyed. Use this to verify + * that the server will not take snapshot if size on disk of HALog files + * since the last snapshot is LT some percentage. + */ + public void testA_snapshot_HTTP_GET() throws Exception { + + // Start 2 services. + final HAGlue serverA = startA(); + + // Wait for a quorum meet. + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + // Await initial commit point (KB create). + awaitCommitCounter(1L, serverA); + + final HAGlue leader = quorum.getClient().getLeader(token); + + { + + // Verify quorum is still valid. + quorum.assertQuorum(token); + + // Verify quorum is at the expected commit point. + assertEquals( + 1L, + leader.getRootBlock( + new HARootBlockRequest(null/* storeUUID */)) + .getRootBlock().getCommitCounter()); + + // Snapshot directory is empty. + assertEquals(0, recursiveCount(getSnapshotDirA(),SnapshotManager.SNAPSHOT_FILTER)); + + doSnapshotRequest(leader); + + /* + * Get the Future. Should still be there, but if not then will be + * null (it which case the snapshot is already done). + */ + f... [truncated message content] |