|
From: <mar...@us...> - 2014-04-02 13:13:00
|
Revision: 8026
http://sourceforge.net/p/bigdata/code/8026
Author: martyncutcher
Date: 2014-04-02 13:12:56 +0000 (Wed, 02 Apr 2014)
Log Message:
-----------
Initial commit on creation of HA1_HA5 branch
Modified Paths:
--------------
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java
Added Paths:
-----------
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy2.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA5JournalServer.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-D.properties
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-E.properties
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/zkClient1.config
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/zkClient5.config
branches/BIGDATA_MGC_HA1_HA5/branch-notes.txt
Removed Paths:
-------------
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-A.properties
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-B.properties
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-C.properties
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-02 13:03:59 UTC (rev 8025)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-02 13:12:56 UTC (rev 8026)
@@ -3428,8 +3428,8 @@
if (quorum == null)
return;
- if (!quorum.isHighlyAvailable())
- return;
+// if (!quorum.isHighlyAvailable())
+// return;
/**
* CRITICAL SECTION. We need obtain a distributed consensus for the
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2014-04-02 13:03:59 UTC (rev 8025)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2014-04-02 13:12:56 UTC (rev 8026)
@@ -4574,7 +4574,7 @@
// }
- log.warn("Starting NSS");
+ log.warn("Starting NSS from " + jettyXml);
// Start the server.
jettyServer.start();
@@ -4658,8 +4658,9 @@
if (tmp == null)
throw new IllegalStateException("Server is not running");
- return tmp.getConnectors()[0].getLocalPort();
-
+ final int port = tmp.getConnectors()[0].getLocalPort();
+ haLog.warn("Returning NSSPort: " + port);
+ return port;
}
/**
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java 2014-04-02 13:03:59 UTC (rev 8025)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java 2014-04-02 13:12:56 UTC (rev 8026)
@@ -1096,6 +1096,8 @@
+ haLogBytesOnDisk//
+ ", journalSize="
+ journalSize//
+ + ", thresholdPercentLogSize="
+ + thresholdPercentLogSize//
+ ", percentLogSize="
+ actualPercentLogSize//
+ "%, takeSnapshot=" + takeSnapshot //
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2014-04-02 13:03:59 UTC (rev 8025)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2014-04-02 13:12:56 UTC (rev 8026)
@@ -133,7 +133,7 @@
* Implementation listens for the death of the child process and can be used
* to decide when the child process is no longer executing.
*/
- private static class ServiceListener implements IServiceListener {
+ public static class ServiceListener implements IServiceListener {
private volatile HAGlue haGlue;
private volatile ProcessHelper processHelper;
@@ -218,8 +218,12 @@
* The {@link Remote} interfaces for these services (if started and
* successfully discovered).
*/
- private HAGlue serverA = null, serverB = null, serverC = null;
+ protected HAGlue serverA = null;
+ protected HAGlue serverB = null;
+
+ protected HAGlue serverC = null;
+
/**
* {@link UUID}s for the {@link HAJournalServer}s.
*/
@@ -232,17 +236,20 @@
* @see <a href="http://trac.bigdata.com/ticket/730" > Allow configuration
* of embedded NSS jetty server using jetty-web.xml </a>
*/
- private final int A_JETTY_PORT = 8090, B_JETTY_PORT = A_JETTY_PORT + 1,
- C_JETTY_PORT = B_JETTY_PORT + 1;
+ protected final int A_JETTY_PORT = 8090;
+ protected final int B_JETTY_PORT = A_JETTY_PORT + 1;
+ protected final int C_JETTY_PORT = B_JETTY_PORT + 1;
/**
* These {@link IServiceListener}s are used to reliably detect that the
* corresponding process starts and (most importantly) that it is really
* dies once it has been shutdown or destroyed.
*/
- private ServiceListener serviceListenerA = null, serviceListenerB = null;
+ protected ServiceListener serviceListenerA = null;
- private ServiceListener serviceListenerC = null;
+ protected ServiceListener serviceListenerB = null;
+
+ protected ServiceListener serviceListenerC = null;
private LookupDiscoveryManager lookupDiscoveryManager = null;
@@ -1143,14 +1150,14 @@
}
- private void safeShutdown(final HAGlue haGlue, final File serviceDir,
+ void safeShutdown(final HAGlue haGlue, final File serviceDir,
final ServiceListener serviceListener) {
safeShutdown(haGlue, serviceDir, serviceListener, false/* now */);
}
- private void safeShutdown(final HAGlue haGlue, final File serviceDir,
+ protected void safeShutdown(final HAGlue haGlue, final File serviceDir,
final ServiceListener serviceListener, final boolean now) {
if (haGlue == null)
@@ -1368,6 +1375,10 @@
}
+ protected String getZKConfigFile() {
+ return "zkClient.config";
+ }
+
/**
* Return Zookeeper quorum that can be used to reflect (or act on) the
* distributed quorum state for the logical service.
@@ -1382,7 +1393,7 @@
KeeperException, IOException {
final Configuration config = ConfigurationProvider
- .getInstance(new String[] { SRC_PATH + "zkClient.config" });
+ .getInstance(new String[] { SRC_PATH + getZKConfigFile() });
zkClientConfig = new ZookeeperClientConfig(config);
@@ -1551,7 +1562,7 @@
*
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
*/
- abstract private class StartServerTask implements Callable<HAGlue> {
+ public abstract class StartServerTask implements Callable<HAGlue> {
private final String name;
private final String configName;
Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java (rev 0)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java 2014-04-02 13:12:56 UTC (rev 8026)
@@ -0,0 +1,466 @@
+package com.bigdata.journal.jini.ha;
+
+import java.io.File;
+import java.io.IOException;
+import java.rmi.Remote;
+import java.security.DigestException;
+import java.security.NoSuchAlgorithmException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.bigdata.ha.HAGlue;
+import com.bigdata.jini.start.IServiceListener;
+import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownATask;
+import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownBTask;
+import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownCTask;
+import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownTask;
+import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.ServiceListener;
+import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartATask;
+import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartBTask;
+import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartCTask;
+import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartServerTask;
+import com.bigdata.quorum.AsynchronousQuorumCloseException;
+
+public class AbstractHA5JournalServerTestCase extends
+ AbstractHA3JournalServerTestCase {
+
+ /**
+ * The {@link Remote} interfaces for these services (if started and
+ * successfully discovered).
+ */
+ protected HAGlue serverD = null;
+
+ protected HAGlue serverE = null;
+
+ /**
+ * {@link UUID}s for the {@link HAJournalServer}s.
+ */
+ private UUID serverDId = UUID.randomUUID();
+ private UUID serverEId = UUID.randomUUID();
+
+ /**
+ * The HTTP ports at which the services will respond.
+ *
+ * @see <a href="http://trac.bigdata.com/ticket/730" > Allow configuration
+ * of embedded NSS jetty server using jetty-web.xml </a>
+ */
+ protected final int D_JETTY_PORT = C_JETTY_PORT + 1;
+ protected final int E_JETTY_PORT = D_JETTY_PORT + 1;
+
+ protected String getZKConfigFile() {
+ return "zkClient5.config"; // 5 stage pipeline
+ }
+
+ /**
+ * These {@link IServiceListener}s are used to reliably detect that the
+ * corresponding process starts and (most importantly) that it is really
+ * dies once it has been shutdown or destroyed.
+ */
+ protected ServiceListener serviceListenerD = null, serviceListenerE = null;
+
+ protected File getServiceDirD() {
+ return new File(getTestDir(), "D");
+ }
+
+ protected File getServiceDirE() {
+ return new File(getTestDir(), "E");
+ }
+
+ protected File getHAJournalFileD() {
+ return new File(getServiceDirD(), "bigdata-ha.jnl");
+ }
+
+ protected File getHAJournalFileE() {
+ return new File(getServiceDirE(), "bigdata-ha.jnl");
+ }
+
+ protected File getHALogDirD() {
+ return new File(getServiceDirD(), "HALog");
+ }
+
+ protected File getHALogDirE() {
+ return new File(getServiceDirE(), "HALog");
+ }
+
+ /**
+ * Start A then B then C. As each service starts, this method waits for that
+ * service to appear in the pipeline in the proper position.
+ *
+ * @return The ordered array of services <code>[A, B, C]</code>
+ */
+ protected HAGlue[] startSequenceABCDE() throws Exception {
+
+ startA();
+ awaitPipeline(new HAGlue[] { serverA });
+
+ startB();
+ awaitPipeline(new HAGlue[] { serverA, serverB });
+
+ startC();
+ awaitPipeline(new HAGlue[] { serverA, serverB, serverC });
+
+ startD();
+ awaitPipeline(new HAGlue[] { serverA, serverB, serverC, serverD });
+
+ startE();
+ awaitPipeline(new HAGlue[] { serverA, serverB, serverC, serverD, serverE });
+
+ return new HAGlue[] { serverA, serverB, serverC, serverD, serverE };
+
+ }
+
+ /**
+ * Helper class for simultaneous/seqeunced start of 3 HA services.
+ */
+ protected class ABCDE {
+
+ /**
+ * The services.
+ */
+ final HAGlue serverA, serverB, serverC, serverD, serverE;
+
+ /**
+ * Start of 3 HA services (this happens in the ctor).
+ *
+ * @param sequential
+ * True if the startup should be sequential or false
+ * if services should start concurrently.
+ * @throws Exception
+ */
+ public ABCDE(final boolean sequential)
+ throws Exception {
+
+ this(true/* sequential */, true/* newServiceStarts */);
+
+ }
+
+ /**
+ * Start of 3 HA services (this happens in the ctor).
+ *
+ * @param sequential
+ * True if the startup should be sequential or false if
+ * services should start concurrently.
+ * @param newServiceStarts
+ * When <code>true</code> the services are new, the database
+ * should be at <code>commitCounter:=0</code> and the
+ * constructor will check for the implicit create of the
+ * default KB.
+ * @throws Exception
+ */
+ public ABCDE(final boolean sequential, final boolean newServiceStarts)
+ throws Exception {
+
+ if (sequential) {
+
+ final HAGlue[] services = startSequenceABCDE();
+
+ serverA = services[0];
+
+ serverB = services[1];
+
+ serverC = services[2];
+
+ serverD = services[3];
+
+ serverE = services[4];
+
+ } else {
+
+ final List<Callable<HAGlue>> tasks = new LinkedList<Callable<HAGlue>>();
+
+ tasks.add(new StartATask(false/* restart */));
+ tasks.add(new StartBTask(false/* restart */));
+ tasks.add(new StartCTask(false/* restart */));
+ tasks.add(new StartDTask(false/* restart */));
+ tasks.add(new StartETask(false/* restart */));
+
+ // Start all servers in parallel. Wait up to a timeout.
+ final List<Future<HAGlue>> futures = executorService.invokeAll(
+ tasks, 30/* timeout */, TimeUnit.SECONDS);
+
+ serverA = futures.get(0).get();
+
+ serverB = futures.get(1).get();
+
+ serverC = futures.get(2).get();
+
+ serverD = futures.get(3).get();
+
+ serverE = futures.get(4).get();
+
+ }
+
+ // wait for the quorum to fully meet.
+ awaitFullyMetQuorum();
+
+ if(newServiceStarts) {
+ // wait for the initial commit point (KB create).
+ awaitCommitCounter(1L, serverA, serverB, serverC, serverD, serverE);
+ }
+
+ }
+
+ public void shutdownAll() throws InterruptedException,
+ ExecutionException {
+
+ shutdownAll(false/* now */);
+
+ }
+
+ public void shutdownAll(final boolean now) throws InterruptedException,
+ ExecutionException {
+
+ final List<Callable<Void>> tasks = new LinkedList<Callable<Void>>();
+
+ tasks.add(new SafeShutdownATask());
+ tasks.add(new SafeShutdownBTask());
+ tasks.add(new SafeShutdownCTask());
+ tasks.add(new SafeShutdownDTask());
+ tasks.add(new SafeShutdownETask());
+
+ // Start all servers in parallel. Wait up to a timeout.
+ final List<Future<Void>> futures = executorService.invokeAll(
+ tasks, 30/* timeout */, TimeUnit.SECONDS);
+
+ futures.get(0).get();
+ futures.get(1).get();
+ futures.get(2).get();
+ futures.get(3).get();
+ futures.get(4).get();
+
+ }
+
+ public void assertDigestsEqual() throws NoSuchAlgorithmException, DigestException, IOException {
+ assertDigestsEquals(new HAGlue[] { serverA, serverB, serverC, serverD, serverE });
+ }
+
+ }
+
+ protected HAGlue startD() throws Exception {
+
+ return new StartDTask(false/* restart */).call();
+
+ }
+
+ protected HAGlue startE() throws Exception {
+
+ return new StartETask(false/* restart */).call();
+
+ }
+
+ protected HAGlue restartD() throws Exception {
+
+ return new StartDTask(true/* restart */).call();
+
+ }
+
+ protected HAGlue restartE() throws Exception {
+
+ return new StartETask(true/* restart */).call();
+
+ }
+
+ protected void shutdownD() throws IOException {
+ safeShutdown(serverD, getServiceDirD(), serviceListenerD, true);
+
+ serverD = null;
+ serviceListenerD = null;
+ }
+
+ protected void shutdownE() throws IOException {
+ safeShutdown(serverE, getServiceDirE(), serviceListenerE, true);
+
+ serverE = null;
+ serviceListenerE = null;
+ }
+
+ protected class StartDTask extends StartServerTask {
+
+ public StartDTask(final boolean restart) {
+
+ super("D", "HAJournal-D.config", serverDId, D_JETTY_PORT,
+ serviceListenerD = new ServiceListener(), restart);
+
+ }
+
+ @Override
+ public HAGlue call() throws Exception {
+
+ if (restart) {
+
+ safeShutdown(serverD, getServiceDirD(), serviceListenerD);
+
+ serverD = null;
+
+ }
+
+ return serverD = start();
+
+ }
+
+ }
+
+ protected class StartETask extends StartServerTask {
+
+ public StartETask(final boolean restart) {
+
+ super("E", "HAJournal-E.config", serverEId, E_JETTY_PORT,
+ serviceListenerE = new ServiceListener(), restart);
+
+ }
+
+ @Override
+ public HAGlue call() throws Exception {
+
+ if (restart) {
+
+ safeShutdown(serverE, getServiceDirE(), serviceListenerE);
+
+ serverE = null;
+
+ }
+
+ return serverE = start();
+
+ }
+
+ }
+
+ protected class SafeShutdownDTask extends SafeShutdownTask {
+
+ public SafeShutdownDTask() {
+ this(false/* now */);
+ }
+
+ public SafeShutdownDTask(final boolean now) {
+ super(serverD, getServiceDirC(), serviceListenerD, now);
+ }
+
+ }
+
+ protected class SafeShutdownETask extends SafeShutdownTask {
+
+ public SafeShutdownETask() {
+ this(false/* now */);
+ }
+
+ public SafeShutdownETask(final boolean now) {
+ super(serverE, getServiceDirC(), serviceListenerE, now);
+ }
+
+ }
+ public AbstractHA5JournalServerTestCase() {
+ }
+
+ public AbstractHA5JournalServerTestCase(final String name) {
+ super(name);
+ }
+
+ protected void destroyAll() throws AsynchronousQuorumCloseException,
+ InterruptedException, TimeoutException {
+ /**
+ * The most reliable tear down is in reverse pipeline order.
+ *
+ * This may not be necessary long term but for now we want to avoid
+ * destroying the leader first since it can lead to problems as
+ * followers attempt to reform
+ */
+ final HAGlue leader;
+ final File leaderServiceDir;
+ final ServiceListener leaderListener;
+ if (quorum.isQuorumMet()) {
+ final long token = quorum.awaitQuorum(awaitQuorumTimeout,
+ TimeUnit.MILLISECONDS);
+ /*
+ * Note: It is possible to resolve a proxy for a service that has
+ * been recently shutdown or destroyed. This is effectively a data
+ * race.
+ */
+ final HAGlue t = quorum.getClient().getLeader(token);
+ if (t.equals(serverA)) {
+ leader = t;
+ leaderServiceDir = getServiceDirA();
+ leaderListener = serviceListenerA;
+ } else if (t.equals(serverB)) {
+ leader = t;
+ leaderServiceDir = getServiceDirB();
+ leaderListener = serviceListenerB;
+ } else if (t.equals(serverC)) {
+ leader = t;
+ leaderServiceDir = getServiceDirC();
+ leaderListener = serviceListenerC;
+ } else if (t.equals(serverD)) {
+ leader = t;
+ leaderServiceDir = getServiceDirD();
+ leaderListener = serviceListenerD;
+ } else if (t.equals(serverE)) {
+ leader = t;
+ leaderServiceDir = getServiceDirE();
+ leaderListener = serviceListenerE;
+ } else {
+ if (serverA == null && serverB == null && serverC == null && serverD == null && serverE == null) {
+ /*
+ * There are no services running and nothing to shutdown. We
+ * probably resolved a stale proxy to the leader above.
+ */
+ return;
+ }
+ throw new IllegalStateException(
+ "Leader is none of A, B, or C: leader=" + t + ", A="
+ + serverA + ", B=" + serverB + ", C=" + serverC);
+ }
+ } else {
+ leader = null;
+ leaderServiceDir = null;
+ leaderListener = null;
+ }
+
+ if (leader == null || !leader.equals(serverA)) {
+ destroyA();
+ }
+
+ if (leader == null || !leader.equals(serverB)) {
+ destroyB();
+ }
+
+ if (leader == null || !leader.equals(serverC)) {
+ destroyC();
+ }
+
+ if (leader == null || !leader.equals(serverD)) {
+ destroyD();
+ }
+
+ if (leader == null || !leader.equals(serverE)) {
+ destroyE();
+ }
+
+ // Destroy leader last
+ if (leader != null) {
+ safeDestroy(leader, leaderServiceDir, leaderListener);
+
+ serverA = serverB = serverC = serverD = serverE = null;
+ serviceListenerA = serviceListenerC = serviceListenerB = serviceListenerD = serviceListenerE = null;
+ }
+
+ }
+
+ protected void destroyD() {
+ safeDestroy(serverD, getServiceDirD(), serviceListenerD);
+ serverD = null;
+ serviceListenerD = null;
+ }
+
+ protected void destroyE() {
+ safeDestroy(serverE, getServiceDirE(), serviceListenerE);
+ serverE = null;
+ serviceListenerE = null;
+ }
+
+}
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2014-04-02 13:03:59 UTC (rev 8025)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2014-04-02 13:12:56 UTC (rev 8026)
@@ -440,8 +440,17 @@
* Wait for the service to report that it is ready as a leader or
* follower.
*/
- haGlue.awaitHAReady(awaitQuorumTimeout, TimeUnit.MILLISECONDS);
+ return awaitNSSAndHAReady(haGlue, awaitQuorumTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ protected HAStatusEnum awaitNSSAndHAReady(final HAGlue haGlue, long timout, TimeUnit unit)
+ throws Exception {
/*
+ * Wait for the service to report that it is ready as a leader or
+ * follower.
+ */
+ haGlue.awaitHAReady(timout, unit);
+ /*
* Wait for the NSS to report the status of the service (this verifies
* that the NSS interface is running).
*/
@@ -525,7 +534,9 @@
protected String getNanoSparqlServerURL(final HAGlue haGlue)
throws IOException {
- return "http://localhost:" + haGlue.getNSSPort();
+ final int port = haGlue.getNSSPort();
+
+ return "http://localhost:" + port;
}
@@ -541,7 +552,7 @@
final String sparqlEndpointURL = getNanoSparqlServerURL(haGlue)
+ "/sparql";
-
+
// Client for talking to the NSS.
final HttpClient httpClient = new DefaultHttpClient(ccm);
@@ -1248,4 +1259,35 @@
}
+ /**
+ * The effective name for this test as used to name the directories in which
+ * we store things.
+ *
+ * TODO If there are method name collisions across the different test
+ * classes then the test suite name can be added to this. Also, if there are
+ * file naming problems, then this value can be munged before it is
+ * returned.
+ */
+ private final String effectiveTestFileName = getClass().getSimpleName()
+ + "." + getName();
+
+ /**
+ * The directory that is the parent of each {@link HAJournalServer}'s
+ * individual service directory.
+ */
+ protected File getTestDir() {
+ return new File(TGT_PATH, getEffectiveTestFileName());
+ }
+
+ /**
+ * The effective name for this test as used to name the directories in which
+ * we store things.
+ */
+ protected String getEffectiveTestFileName() {
+
+ return effectiveTestFileName;
+
+ }
+
+
}
Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config (rev 0)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config 2014-04-02 13:12:56 UTC (rev 8026)
@@ -0,0 +1,288 @@
+import net.jini.jeri.BasicILFactory;
+import net.jini.jeri.BasicJeriExporter;
+import net.jini.jeri.tcp.TcpServerEndpoint;
+
+import net.jini.discovery.LookupDiscovery;
+import net.jini.core.discovery.LookupLocator;
+import net.jini.core.entry.Entry;
+import net.jini.lookup.entry.Name;
+import net.jini.lookup.entry.Comment;
+import net.jini.lookup.entry.Address;
+import net.jini.lookup.entry.Location;
+import net.jini.lookup.entry.ServiceInfo;
+import net.jini.core.lookup.ServiceTemplate;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+
+import com.bigdata.util.NV;
+import com.bigdata.util.config.NicUtil;
+import com.bigdata.journal.Options;
+import com.bigdata.journal.BufferMode;
+import com.bigdata.journal.jini.ha.HAJournal;
+import com.bigdata.jini.lookup.entry.*;
+import com.bigdata.service.IBigdataClient;
+import com.bigdata.service.AbstractTransactionService;
+import com.bigdata.service.jini.*;
+import com.bigdata.service.jini.lookup.DataServiceFilter;
+import com.bigdata.service.jini.master.ServicesTemplate;
+import com.bigdata.jini.start.config.*;
+import com.bigdata.jini.util.ConfigMath;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+
+// imports for various options.
+import com.bigdata.btree.IndexMetadata;
+import com.bigdata.btree.keys.KeyBuilder;
+import com.bigdata.rdf.sail.BigdataSail;
+import com.bigdata.rdf.spo.SPORelation;
+import com.bigdata.rdf.spo.SPOKeyOrder;
+import com.bigdata.rdf.lexicon.LexiconRelation;
+import com.bigdata.rdf.lexicon.LexiconKeyOrder;
+import com.bigdata.rawstore.Bytes;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeUnit.*;
+
+/*
+ * This is a sample configuration file for a highly available Journal. A
+ * version of this file must be available to each HAJournalServer in the
+ * pipeline.
+ */
+
+/*
+ * Globals.
+ */
+bigdata {
+
+ private static fedname = "benchmark";
+
+ // NanoSparqlServer (http) port.
+ private static nssPort = ConfigMath.add(8090,3);
+
+ // write replication pipeline port (listener).
+ private static haPort = ConfigMath.add(9090,3);
+
+ // The #of services in the write pipeline.
+ private static replicationFactor = 5;
+
+ // The logical service identifier shared by all members of the quorum.
+ private static logicalServiceId = System.getProperty("test.logicalServiceId","CI-HAJournal-1");
+
+ // The service directory.
+ // Note: Overridden by environment property when deployed.
+ private static serviceDir = new File(System.getProperty("test.serviceDir",ConfigMath.getAbsolutePath(new File(new File(fedname,logicalServiceId),"D"))));
+ //new File(new File(fedname,logicalServiceId),"D");
+
+ // journal data directory.
+ private static dataDir = serviceDir;
+
+ // one federation, multicast discovery.
+ //static private groups = LookupDiscovery.ALL_GROUPS;
+
+ // unicast discovery or multiple setups, MUST specify groups.
+ static private groups = new String[]{bigdata.fedname};
+
+ /**
+ * One or more unicast URIs of the form <code>jini://host/</code>
+ * or <code>jini://host:port/</code> (no default).
+ *
+ * This MAY be an empty array if you want to use multicast
+ * discovery <strong>and</strong> you have specified the groups as
+ * LookupDiscovery.ALL_GROUPS (a <code>null</code>).
+ */
+ static private locators = new LookupLocator[] {
+
+ // runs jini on the localhost using unicast locators.
+ new LookupLocator("jini://localhost/")
+
+ };
+
+ /**
+ * A common point to set the Zookeeper client's requested
+ * sessionTimeout and the jini lease timeout. The default lease
+ * renewal period for jini is 5 minutes while for zookeeper it is
+ * more like 5 seconds. This puts the two systems onto a similar
+ * timeout period so that a disconnected client is more likely to
+ * be noticed in roughly the same period of time for either
+ * system. A value larger than the zookeeper default helps to
+ * prevent client disconnects under sustained heavy load.
+ *
+ * If you use a short lease timeout (LT 20s), then you need to override
+ * properties properties for the net.jini.lease.LeaseRenewalManager
+ * or it will run in a tight loop (it's default roundTripTime is 10s
+ * and it schedules lease renewals proactively.)
+ */
+
+ // jini
+ static private leaseTimeout = ConfigMath.s2ms(20);
+
+ // zookeeper
+ static private sessionTimeout = (int)ConfigMath.s2ms(20);
+
+ /*
+ * Configuration for default KB.
+ */
+
+ private static namespace = "kb";
+
+ private static kb = new NV[] {
+
+ /* Setup for QUADS mode without the full text index. */
+
+ new NV(BigdataSail.Options.TRUTH_MAINTENANCE, "false" ),
+ new NV(BigdataSail.Options.QUADS, "true"),
+ new NV(BigdataSail.Options.STATEMENT_IDENTIFIERS, "false"),
+ new NV(BigdataSail.Options.TEXT_INDEX, "false"),
+ new NV(BigdataSail.Options.AXIOMS_CLASS,"com.bigdata.rdf.axioms.NoAxioms"),
+ new NV(BigdataSail.Options.QUERY_TIME_EXPANDER, "false"),
+
+ // Bump up the branching factor for the lexicon indices on the named kb.
+ // com.bigdata.namespace.kb.lex.com.bigdata.btree.BTree.branchingFactor=400
+ new NV(com.bigdata.config.Configuration.getOverrideProperty
+ ( namespace + "." + LexiconRelation.NAME_LEXICON_RELATION,
+ IndexMetadata.Options.BTREE_BRANCHING_FACTOR
+ ), "400"),
+
+ // Bump up the branching factor for the statement indices on the named kb.
+ // com.bigdata.namespace.kb.spo.com.bigdata.btree.BTree.branchingFactor=1024
+ new NV(com.bigdata.config.Configuration.getOverrideProperty
+ ( namespace + "." + SPORelation.NAME_SPO_RELATION,
+ IndexMetadata.Options.BTREE_BRANCHING_FACTOR
+ ), "1024"),
+ };
+
+}
+
+/*
+ * Zookeeper client configuration.
+ */
+org.apache.zookeeper.ZooKeeper {
+
+ /* Root znode for the federation instance. */
+ zroot = "/" + bigdata.fedname;
+
+ /* A comma separated list of host:port pairs, where the port is
+ * the CLIENT port for the zookeeper server instance.
+ */
+ // standalone.
+ servers = "localhost:2081";
+
+ /* Session timeout (optional). */
+ sessionTimeout = bigdata.sessionTimeout;
+
+ /*
+ * ACL for the zookeeper nodes created by the bigdata federation.
+ *
+ * Note: zookeeper ACLs are not transmitted over secure channels
+ * and are placed into plain text Configuration files by the
+ * ServicesManagerServer.
+ */
+ acl = new ACL[] {
+
+ new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone"))
+
+ };
+}
+
+/*
+ * You should not have to edit below this line.
+ */
+
+/*
+ * Jini client configuration.
+ */
+com.bigdata.service.jini.JiniClient {
+
+ groups = bigdata.groups;
+
+ locators = bigdata.locators;
+
+ entries = new Entry[] {
+
+ // Optional metadata entries.
+ new Name("D"),
+
+ // Note: Used to assign the ServiceID to the service.
+ new ServiceUUID(UUID.fromString(System.getProperty("test.serviceId")))
+
+ };
+
+}
+
+net.jini.lookup.JoinManager {
+
+ maxLeaseDuration = bigdata.leaseTimeout;
+
+}
+
+/*
+ * Server configuration options.
+ */
+com.bigdata.journal.jini.ha.HAJournalServer {
+
+ args = new String[] {
+ "-showversion",
+ "-Djava.security.policy=policy.all",
+ "-Dlog4j.configuration=file:log4j-D.properties",
+ "-Djava.util.logging.config.file=logging-D.properties",
+ "-server",
+ "-Xmx1G",
+ "-ea",
+ "-Xdebug","-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1053"
+ };
+
+ serviceDir = bigdata.serviceDir;
+
+ // Default policy.
+ restorePolicy = new com.bigdata.journal.jini.ha.DefaultRestorePolicy();
+
+ // Suppress automatic snapshots.
+ snapshotPolicy = new com.bigdata.journal.jini.ha.NoSnapshotPolicy();
+
+ logicalServiceId = bigdata.logicalServiceId;
+
+ writePipelineAddr = new InetSocketAddress("localhost",bigdata.haPort);
+
+ /*
+ writePipelineAddr = new InetSocketAddress(//
+ InetAddress.getByName(//
+ NicUtil.getIpAddress("default.nic", "default",
+ false// loopbackOk
+ )), //
+ bigdata.haPort
+ );
+ */
+
+ replicationFactor = bigdata.replicationFactor;
+
+ // Use the overridden version of the HAJournal by default so we get the
+ // HAGlueTest API for every test.
+ HAJournalClass = "com.bigdata.journal.jini.ha.HAJournalTest";
+
+}
+
+/*
+ * Journal configuration.
+ */
+com.bigdata.journal.jini.ha.HAJournal {
+
+ properties = (NV[]) ConfigMath.concat(new NV[] {
+
+ new NV(Options.FILE,
+ ConfigMath.getAbsolutePath(new File(bigdata.dataDir,"bigdata-ha.jnl"))),
+
+ new NV(Options.BUFFER_MODE,""+BufferMode.DiskRW),
+
+ new NV(IndexMetadata.Options.WRITE_RETENTION_QUEUE_CAPACITY,"4000"),
+
+ new NV(IndexMetadata.Options.BTREE_BRANCHING_FACTOR,"128"),
+
+ new NV(AbstractTransactionService.Options.MIN_RELEASE_AGE,"1"),
+
+ }, bigdata.kb);
+
+}
Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config (rev 0)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config 2014-04-02 13:12:56 UTC (rev 8026)
@@ -0,0 +1,288 @@
+import net.jini.jeri.BasicILFactory;
+import net.jini.jeri.BasicJeriExporter;
+import net.jini.jeri.tcp.TcpServerEndpoint;
+
+import net.jini.discovery.LookupDiscovery;
+import net.jini.core.discovery.LookupLocator;
+import net.jini.core.entry.Entry;
+import net.jini.lookup.entry.Name;
+import net.jini.lookup.entry.Comment;
+import net.jini.lookup.entry.Address;
+import net.jini.lookup.entry.Location;
+import net.jini.lookup.entry.ServiceInfo;
+import net.jini.core.lookup.ServiceTemplate;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+
+import com.bigdata.util.NV;
+import com.bigdata.util.config.NicUtil;
+import com.bigdata.journal.Options;
+import com.bigdata.journal.BufferMode;
+import com.bigdata.journal.jini.ha.HAJournal;
+import com.bigdata.jini.lookup.entry.*;
+import com.bigdata.service.IBigdataClient;
+import com.bigdata.service.AbstractTransactionService;
+import com.bigdata.service.jini.*;
+import com.bigdata.service.jini.lookup.DataServiceFilter;
+import com.bigdata.service.jini.master.ServicesTemplate;
+import com.bigdata.jini.start.config.*;
+import com.bigdata.jini.util.ConfigMath;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+
+// imports for various options.
+import com.bigdata.btree.IndexMetadata;
+import com.bigdata.btree.keys.KeyBuilder;
+import com.bigdata.rdf.sail.BigdataSail;
+import com.bigdata.rdf.spo.SPORelation;
+import com.bigdata.rdf.spo.SPOKeyOrder;
+import com.bigdata.rdf.lexicon.LexiconRelation;
+import com.bigdata.rdf.lexicon.LexiconKeyOrder;
+import com.bigdata.rawstore.Bytes;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeUnit.*;
+
+/*
+ * This is a sample configuration file for a highly available Journal. A
+ * version of this file must be available to each HAJournalServer in the
+ * pipeline.
+ */
+
+/*
+ * Globals.
+ */
+bigdata {
+
+ private static fedname = "benchmark";
+
+ // NanoSparqlServer (http) port.
+ private static nssPort = ConfigMath.add(8090,4);
+
+ // write replication pipeline port (listener).
+ private static haPort = ConfigMath.add(9090,4);
+
+ // The #of services in the write pipeline.
+ private static replicationFactor = 5;
+
+ // The logical service identifier shared by all members of the quorum.
+ private static logicalServiceId = System.getProperty("test.logicalServiceId","CI-HAJournal-1");
+
+ // The service directory.
+ // Note: Overridden by environment property when deployed.
+ private static serviceDir = new File(System.getProperty("test.serviceDir",ConfigMath.getAbsolutePath(new File(new File(fedname,logicalServiceId),"E"))));
+ //new File(new File(fedname,logicalServiceId),"E");
+
+ // journal data directory.
+ private static dataDir = serviceDir;
+
+ // one federation, multicast discovery.
+ //static private groups = LookupDiscovery.ALL_GROUPS;
+
+ // unicast discovery or multiple setups, MUST specify groups.
+ static private groups = new String[]{bigdata.fedname};
+
+ /**
+ * One or more unicast URIs of the form <code>jini://host/</code>
+ * or <code>jini://host:port/</code> (no default).
+ *
+ * This MAY be an empty array if you want to use multicast
+ * discovery <strong>and</strong> you have specified the groups as
+ * LookupDiscovery.ALL_GROUPS (a <code>null</code>).
+ */
+ static private locators = new LookupLocator[] {
+
+ // runs jini on the localhost using unicast locators.
+ new LookupLocator("jini://localhost/")
+
+ };
+
+ /**
+ * A common point to set the Zookeeper client's requested
+ * sessionTimeout and the jini lease timeout. The default lease
+ * renewal period for jini is 5 minutes while for zookeeper it is
+ * more like 5 seconds. This puts the two systems onto a similar
+ * timeout period so that a disconnected client is more likely to
+ * be noticed in roughly the same period of time for either
+ * system. A value larger than the zookeeper default helps to
+ * prevent client disconnects under sustained heavy load.
+ *
+ * If you use a short lease timeout (LT 20s), then you need to override
+ * properties properties for the net.jini.lease.LeaseRenewalManager
+ * or it will run in a tight loop (it's default roundTripTime is 10s
+ * and it schedules lease renewals proactively.)
+ */
+
+ // jini
+ static private leaseTimeout = ConfigMath.s2ms(20);
+
+ // zookeeper
+ static private sessionTimeout = (int)ConfigMath.s2ms(20);
+
+ /*
+ * Configuration for default KB.
+ */
+
+ private static namespace = "kb";
+
+ private static kb = new NV[] {
+
+ /* Setup for QUADS mode without the full text index. */
+
+ new NV(BigdataSail.Options.TRUTH_MAINTENANCE, "false" ),
+ new NV(BigdataSail.Options.QUADS, "true"),
+ new NV(BigdataSail.Options.STATEMENT_IDENTIFIERS, "false"),
+ new NV(BigdataSail.Options.TEXT_INDEX, "false"),
+ new NV(BigdataSail.Options.AXIOMS_CLASS,"com.bigdata.rdf.axioms.NoAxioms"),
+ new NV(BigdataSail.Options.QUERY_TIME_EXPANDER, "false"),
+
+ // Bump up the branching factor for the lexicon indices on the named kb.
+ // com.bigdata.namespace.kb.lex.com.bigdata.btree.BTree.branchingFactor=400
+ new NV(com.bigdata.config.Configuration.getOverrideProperty
+ ( namespace + "." + LexiconRelation.NAME_LEXICON_RELATION,
+ IndexMetadata.Options.BTREE_BRANCHING_FACTOR
+ ), "400"),
+
+ // Bump up the branching factor for the statement indices on the named kb.
+ // com.bigdata.namespace.kb.spo.com.bigdata.btree.BTree.branchingFactor=1024
+ new NV(com.bigdata.config.Configuration.getOverrideProperty
+ ( namespace + "." + SPORelation.NAME_SPO_RELATION,
+ IndexMetadata.Options.BTREE_BRANCHING_FACTOR
+ ), "1024"),
+ };
+
+}
+
+/*
+ * Zookeeper client configuration.
+ */
+org.apache.zookeeper.ZooKeeper {
+
+ /* Root znode for the federation instance. */
+ zroot = "/" + bigdata.fedname;
+
+ /* A comma separated list of host:port pairs, where the port is
+ * the CLIENT port for the zookeeper server instance.
+ */
+ // standalone.
+ servers = "localhost:2081";
+
+ /* Session timeout (optional). */
+ sessionTimeout = bigdata.sessionTimeout;
+
+ /*
+ * ACL for the zookeeper nodes created by the bigdata federation.
+ *
+ * Note: zookeeper ACLs are not transmitted over secure channels
+ * and are placed into plain text Configuration files by the
+ * ServicesManagerServer.
+ */
+ acl = new ACL[] {
+
+ new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone"))
+
+ };
+}
+
+/*
+ * You should not have to edit below this line.
+ */
+
+/*
+ * Jini client configuration.
+ */
+com.bigdata.service.jini.JiniClient {
+
+ groups = bigdata.groups;
+
+ locators = bigdata.locators;
+
+ entries = new Entry[] {
+
+ // Optional metadata entries.
+ new Name("E"),
+
+ // Note: Used to assign the ServiceID to the service.
+ new ServiceUUID(UUID.fromString(System.getProperty("test.serviceId")))
+
+ };
+
+}
+
+net.jini.lookup.JoinManager {
+
+ maxLeaseDuration = bigdata.leaseTimeout;
+
+}
+
+/*
+ * Server configuration options.
+ */
+com.bigdata.journal.jini.ha.HAJournalServer {
+
+ args = new String[] {
+ "-showversion",
+ "-Djava.security.policy=policy.all",
+ "-Dlog4j.configuration=file:log4j-E.properties",
+ "-Djava.util.logging.config.file=logging-E.properties",
+ "-server",
+ "-Xmx1G",
+ "-ea",
+ "-Xdebug","-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1054"
+ };
+
+ serviceDir = bigdata.serviceDir;
+
+ // Default policy.
+ restorePolicy = new com.bigdata.journal.jini.ha.DefaultRestorePolicy();
+
+ // Suppress automatic snapshots.
+ snapshotPolicy = new com.bigdata.journal.jini.ha.NoSnapshotPolicy();
+
+ logicalServiceId = bigdata.logicalServiceId;
+
+ writePipelineAddr = new InetSocketAddress("localhost",bigdata.haPort);
+
+ /*
+ writePipelineAddr = new InetSocketAddress(//
+ InetAddress.getByName(//
+ NicUtil.getIpAddress("default.nic", "default",
+ false// loopbackOk
+ )), //
+ bigdata.haPort
+ );
+ */
+
+ replicationFactor = bigdata.replicationFactor;
+
+ // Use the overridden version of the HAJournal by default so we get the
+ // HAGlueTest API for every test.
+ HAJournalClass = "com.bigdata.journal.jini.ha.HAJournalTest";
+
+}
+
+/*
+ * Journal configuration.
+ */
+com.bigdata.journal.jini.ha.HAJournal {
+
+ properties = (NV[]) ConfigMath.concat(new NV[] {
+
+ new NV(Options.FILE,
+ ConfigMath.getAbsolutePath(new File(bigdata.dataDir,"bigdata-ha.jnl"))),
+
+ new NV(Options.BUFFER_MODE,""+BufferMode.DiskRW),
+
+ new NV(IndexMetadata.Options.WRITE_RETENTION_QUEUE_CAPACITY,"4000"),
+
+ new NV(IndexMetadata.Options.BTREE_BRANCHING_FACTOR,"128"),
+
+ new NV(AbstractTransactionService.Options.MIN_RELEASE_AGE,"1"),
+
+ }, bigdata.kb);
+
+}
Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java (rev 0)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java 2014-04-02 13:12:56 UTC (rev 8026)
@@ -0,0 +1,114 @@
+package com.bigdata.journal.jini.ha;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.bigdata.ha.HAGlue;
+import com.bigdata.ha.HAStatusEnum;
+
+import net.jini.config.Configuration;
+
+
+public class TestHA1JournalServer extends AbstractHA3JournalServerTestCase {
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Note: This overrides some {@link Configuration} values for the
+ * {@link HAJournalServer} in order to establish conditions suitable for
+ * testing the {@link ISnapshotPolicy} and {@link IRestorePolicy}.
+ */
+ @Override
+ protected String[] getOverrides() {
+
+ return new String[]{
+// "com.bigdata.journal.HAJournal.properties=" +TestHA3JournalServer.getTestHAJournalProperties(com.bigdata.journal.HAJournal.properties),
+ "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)",
+ "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()",
+// "com.bigdata.journal.jini.ha.HAJournalServer.HAJournalClass=\""+HAJournalTest.class.getName()+"\"",
+ "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true",
+ "com.bigdata.journal.jini.ha.HAJournalServer.replicationFactor=1",
+ };
+
+ }
+
+ protected String getZKConfigFile() {
+ return "zkClient1.config"; // 1 stage pipeline
+ }
+
+ public TestHA1JournalServer() {
+ }
+
+ public TestHA1JournalServer(String name) {
+ super(name);
+ }
+
+ public void testStartA() throws Exception {
+ doStartA();
+ }
+
+ protected void doStartA() throws Exception {
+
+ try {
+ quorum.awaitQuorum(awaitQuorumTimeout,
+ TimeUnit.MILLISECONDS);
+
+ fail("HA1 requires quorum of 1!");
+ } catch (TimeoutException te) {
+ // expected
+ }
+
+ // Start 1 service.
+ final HAGlue serverA = startA();
+
+ // this should succeed
+ final long token = quorum.awaitQuorum(awaitQuorumTimeout,
+ TimeUnit.MILLISECONDS);
+
+ assertEquals(token, awaitFullyMetQuorum());
+
+ final HAGlue leader = quorum.getClient().getLeader(token);
+
+ assertEquals(serverA, leader);
+ }
+
+ public void testSimpleTransaction() throws Exception {
+ doStartA();
+
+ serverA.awaitHAReady(2, TimeUnit.SECONDS);
+
+ /*
+ * Awaiting HAReady is not sufficient since the service may still
+ * writing the initial transaction.
+ *
+ * So it seems that the problem is not so much with HA1 as rather the
+ * status of a new journal being ready too soon to process an NSS
+ * request
+ */
+
+ awaitCommitCounter(1, new HAGlue[] { serverA});
+
+ // Thread.sleep(100);
+
+ // serverA.
+
+ log.warn("Calling SimpleTransaction");
+ simpleTransaction();
+
+ awaitCommitCounter(2, new HAGlue[] { serverA});
+ }
+
+ public void testMultiTransaction() throws Exception {
+ doStartA();
+
+ awaitCommitCounter(1, new HAGlue[] { serverA});
+ // Thread.sleep(1000);
+
+ final int NTRANS = 10;
+ for (int t = 0; t < NTRANS; t++) {
+ simpleTransaction();
+ }
+
+ awaitCommitCounter(NTRANS+1, new HAGlue[] { serverA});
+}
+}
Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java (rev 0)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-02 13:12:56 UTC (rev 8026)
@@ -0,0 +1,530 @@
+package com.bigdata.journal.jini.ha;
+
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.bigdata.ha.HAGlue;
+import com.bigdata.ha.HAStatusEnum;
+import com.bigdata.ha.msg.HARootBlockRequest;
+import com.bigdata.ha.msg.HASnapshotRequest;
+import com.bigdata.ha.msg.IHASnapshotResponse;
+import com.bigdata.journal.IRootBlockView;
+import com.bigdata.journal.Journal;
+import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.LargeLoadTask;
+import com.bigdata.rdf.sail.webapp.client.RemoteRepository;
+
+import net.jini.config.Configuration;
+
+public class TestHA1SnapshotPolicy extends AbstractHA3BackupTestCase {
+
+ public TestHA1SnapshotPolicy() {
+ }
+
+ public TestHA1SnapshotPolicy(String name) {
+ super(name);
+ }
+
+ protected String getZKConfigFile() {
+ return "zkClient1.config"; // 1 stage pipeline
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Note: This overrides some {@link Configuration} values for the
+ * {@link HAJournalServer} in order to establish conditions suitable for
+ * testing the {@link ISnapshotPolicy} and {@link IRestorePolicy}.
+ */
+ @Override
+ protected String[] getOverrides() {
+
+ /*
+ * We need to set the time at which the DefaultSnapshotPolicy runs to
+ * some point in the Future in order to avoid test failures due to
+ * violated assumptions when the policy runs up self-triggering (based
+ * on the specified run time) during a CI run.
+ */
+ final String neverRun = getNeverRunSnapshotTime();
+
+ /*
+ * For HA1, must have onlineDisasterRecovery to ensure logs are maintained
+ */
+ return new String[]{
+ "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)",
+ "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.DefaultSnapshotPolicy("+neverRun+",0)",
+ // "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()",
+ "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true",
+ "com.bigdata.journal.jini.ha.HAJournalServer.replicationFactor=1",
+ };
+
+ }
+
+ /**
+ * Start a service. The quorum meets. Take a snapshot. Verify that the
+ * snapshot appears within a reasonable period of time and that it is for
+ * <code>commitCounter:=1</code> (just the KB create). Verify that the
+ * digest of the snapshot agrees with the digest of the journal.
+ */
+ public void testA_snapshot() throws Exception {
+
+ // Start 1 service.
+ final HAGlue serverA = startA();
+
+ // Wait for a quorum meet.
+ final long token = quorum.awaitQuorum(awaitQuorumTimeout,
+ TimeUnit.MILLISECONDS);
+
+ // Await initial commit point (KB create).
+ awaitCommitCounter(1L, serverA);
+
+ final HAGlue leader = quorum.getClient().getLeader(token);
+ assertEquals(serverA, leader); // A is the leader.
+ {
+
+ // Verify quorum is still valid.
+ quorum.assertQuorum(token);
+
+ // Verify quorum is at the expected commit point.
+ assertEquals(
+ 1L,
+ leader.getRootBlock(
+ new HARootBlockRequest(null/* storeUUID */))
+ .getRootBlock().getCommitCounter());
+
+ // Snapshot directory is empty.
+ assertEquals(0, recursiveCount(getSnapshotDirA(),SnapshotManager.SNAPSHOT_FILTER));
+
+ final Future<IHASnapshotResponse> ft = leader
+ .takeSnapshot(new HASnapshotRequest(0/* percentLogSize */));
+
+ // wait for the snapshot.
+ try {
+ ft.get(5, TimeUnit.SECONDS);
+ } catch (TimeoutException ex) {
+ ft.cancel(true/* mayInterruptIfRunning */);
+ throw ex;
+ }
+
+ final IRootBlockView snapshotRB = ft.get().getRootBlock();
+
+ final long commitCounter = 1L;
+
+ // Verify snapshot is for the expected commit point.
+ assertEquals(commitCounter, snapshotRB.getCommitCounter());
+
+ // Snapshot directory contains the desired filename.
+ assertExpectedSnapshots(getSnapshotDirA(),
+ new long[] { commitCounter });
+
+ // Verify digest of snapshot agrees with digest of journal.
+ assertSnapshotDigestEquals(leader, commitCounter);
+
+ }
+
+ }
+ /**
+ * Start service. The quorum meets. Take a snapshot. Verify that the
+ * snapshot appears within a resonable period of time and that it is for
+ * <code>commitCounter:=1</code> (just the KB create). Request a second
+ * snapshot for the same commit point and verify that a <code>null</code> is
+ * returned since we already have a snapshot for that commit point.
+ */
+ public void testA_snapshot_await_snapshot_null() throws Exception {
+
+ // Start 2 services.
+ final HAGlue serverA = startA();
+
+ // Wait for a quorum meet.
+ final long token = quorum.awaitQuorum(awaitQuorumTimeout,
+ TimeUnit.MILLISECONDS);
+
+ // Await initial commit point (KB create).
+ awaitCommitCounter(1L, serverA);
+
+ final HAGlue leader = quorum.getClient().getLeader(token);
+
+ {
+
+ // Verify quorum is still valid.
+ quorum.assertQuorum(token);
+
+ // Verify quorum is at the expected commit point.
+ assertEquals(
+ 1L,
+ leader.getRootBlock(
+ new HARootBlockRequest(null/* storeUUID */))
+ .getRootBlock().getCommitCounter());
+
+ // Snapshot directory is empty.
+ assertEquals(0, recursiveCount(getSnapshotDirA(),SnapshotManager.SNAPSHOT_FILTER));
+
+ final Future<IHASnapshotResponse> ft = leader
+ .takeSnapshot(new HASnapshotRequest(0/* percentLogSize */));
+
+ // wait for the snapshot.
+ try {
+ ft.get(5, TimeUnit.SECONDS);
+ } catch (TimeoutException ex) {
+ ft.cancel(true/* mayInterruptIfRunning */);
+ throw ex;
+ }
+
+ final IRootBlockView snapshotRB = ft.get().getRootBlock();
+
+ final long commitCounter = 1L;
+
+ // Verify snapshot is for the expected commit point.
+ assertEquals(commitCounter, snapshotRB.getCommitCounter());
+
+ // Snapshot directory contains the expected snapshot(s).
+ assertExpectedSnapshots(getSnapshotDirA(),
+ new long[] { commitCounter });
+
+ // Verify digest of snapshot agrees with digest of journal.
+ assertSnapshotDigestEquals(leader, commitCounter);
+
+ }
+
+ /*
+ * Verify 2nd request returns null since snapshot exists for that
+ * commit point.
+ */
+ {
+
+ // Verify quorum is still at the expected commit point.
+ assertEquals(
+ 1L,
+ leader.getRootBlock(
+ new HARootBlockRequest(null/* storeUUID */))
+ .getRootBlock().getCommitCounter());
+
+ // request another snapshot.
+ final Future<IHASnapshotResponse> ft = leader
+ .takeSnapshot(new HASnapshotRequest(0/* percentLogSize */));
+
+ if (ft != null) {
+
+ ft.cancel(true/* mayInteruptIfRunning */);
+
+ fail("Expecting null since snapshot exists for current commit point.");
+
+ }
+
+ }
+
+ }
+
+ /**
+ * Test ability to request a snapshot using an HTTP GET
+ * <code>.../status?snapshot</code>.
+ *
+ * TODO Variant where the percentLogSize parameter is also expressed and
+ * verify that the semantics of that argument are obeyed. Use this to verify
+ * that the server will not take snapshot if size on disk of HALog files
+ * since the last snapshot is LT some percentage.
+ */
+ public void testA_snapshot_HTTP_GET() throws Exception {
+
+ // Start 2 services.
+ final HAGlue serverA = startA();
+
+ // Wait for a quorum meet.
+ final long token = quorum.awaitQuorum(awaitQuorumTimeout,
+ TimeUnit.MILLISECONDS);
+
+ // Await initial commit point (KB create).
+ awaitCommitCounter(1L, serverA);
+
+ final HAGlue leader = quorum.getClient().getLeader(token);
+
+ {
+
+ // Verify quorum is still valid.
+ quorum.assertQuorum(token);
+
+ // Verify quorum is at the expected commit point.
+ assertEquals(
+ 1L,
+ leader.getRootBlock(
+ new HARootBlockRequest(null/* storeUUID */))
+ .getRootBlock().getCommitCounter());
+
+ // Snapshot directory is empty.
+ assertEquals(0, recursiveCount(getSnapshotDirA(),SnapshotManager.SNAPSHOT_FILTER));
+
+ doSnapshotRequest(leader);
+
+ /*
+ * Get the Future. Should still be there, but if not then will be
+ * null (it which case the snapshot is already done).
+ */
+ f...
[truncated message content] |