From: <tho...@us...> - 2013-12-11 16:17:55
|
Revision: 7634 http://bigdata.svn.sourceforge.net/bigdata/?rev=7634&view=rev Author: thompsonbry Date: 2013-12-11 16:17:47 +0000 (Wed, 11 Dec 2013) Log Message: ----------- Modified HAReceiveService to always drain down the payload for the current message if there is a problem with re-replicating the payload to the downstream service. Added code to expose the HAGlue implementation object - this is the object that gets exported. This makes it possible to reach into the HAGlueTestImpl object in some unit tests. See #724 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-11 15:45:11 UTC (rev 7633) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-11 16:17:47 UTC (rev 7634) @@ -983,6 +983,19 @@ private void doReceiveAndReplicate(final Client client) throws Exception { + /** + * The first cause if downstream replication fails. We make a note + * of this first cause, continue to drain the payload, and then + * rethrow the first cause once the payload has been fully drained. + * This is necessary to ensure that the socket channel does not have + * partial data remaining from an undrained payload. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" + * > HA wire pulling and sure kill testing </a> + */ + Throwable downstreamFirstCause = null; + /* * We should now have parameters ready in the WriteMessage and can * begin transferring data from the stream to the writeCache. @@ -1085,65 +1098,22 @@ callback.incReceive(message, reads, rdlen, rem); } - /* - * Now forward the most recent transfer bytes downstream - * - * @todo Since the downstream writes are against a blocking - * mode channel, the receiver on this node runs in sync with - * the receiver on the downstream node. In fact, those - * processes could be decoupled with a bit more effort and - * are only required to synchronize by the end of each - * received payload. - * - * Note: [addrNext] is final. If the downstream address is - * changed, then the ReadTask is interrupted using its - * Future and the WriteCacheService will handle the error by - * retransmitting the current cache block. - * - * The rdlen is checked for non zero to avoid an - * IllegalArgumentException. - * - * Note: loop since addrNext might change asynchronously. - */ - while(true) { - if (rdlen != 0 && addrNextRef.get() != null) { - if (log.isTraceEnabled()) - log.trace("Incremental send of " + rdlen + " bytes"); - final ByteBuffer out = localBuffer.asReadOnlyBuffer(); - out.position(localBuffer.position() - rdlen); - out.limit(localBuffer.position()); - synchronized (sendService) { - /* - * Note: Code block is synchronized on [downstream] - * to make the decision to start the HASendService - * that relays to [addrNext] atomic. The - * HASendService uses [synchronized] for its public - * methods so we can coordinate this lock with its - * synchronization API. - */ - if (!sendService.isRunning()) { - /* - * Prepare send service for incremental - * transfers to the specified address. - */ - // Check for termination. - client.checkFirstCause(); - // Note: use then current addrNext! - sendService.start(addrNextRef.get()); - continue; - } + if (downstreamFirstCause == null) { + try { + forwardReceivedBytes(client, rdlen); + } catch (ExecutionException ex) { + log.error( + "Downstream replication failure" + + ": will drain payload and then rethrow exception: rootCause=" + + ex, ex); + downstreamFirstCause = ex; } - // Check for termination. - client.checkFirstCause(); - // Send and await Future. - sendService.send(out).get(); - } - break; } - } - } // while( rem > 0 ) + } // while(itr.hasNext()) + } // while( rem > 0 && !EOS ) + if (localBuffer.position() != message.getSize()) throw new IOException("Receive length error: localBuffer.pos=" + localBuffer.position() + ", message.size=" @@ -1164,12 +1134,109 @@ + ", actual=" + (int) chk.getValue()); } + if (downstreamFirstCause != null) { + + /** + * Replication to the downstream service failed. The payload has + * been fully drained. This ensures that we do not leave part of + * the payload in the upstream socket channel. We now wrap and + * rethrow the root cause of the downstream failure. The leader + * will handle this by forcing the remove of the downstream + * service and then re-replicating the payload. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" + * > HA wire pulling and sure kill testing </a> + */ + + throw new RuntimeException( + "Downstream replication failure: msg=" + message + + ", ex=" + downstreamFirstCause, + downstreamFirstCause); + + } + if (callback != null) { + + /* + * The message was received and (if there is a downstream + * service) successfully replicated to the downstream service. + * We now invoke the callback to given this service an + * opportunity to handle the message and the fully received + * payload. + */ + callback.callback(message, localBuffer); + } } // call() + /** + * Now forward the most recent transfer bytes downstream. + * <p> + * + * Note: [addrNext] is final. If the downstream address is changed, then + * the {@link ReadTask} is interrupted using its {@link Future} and the + * WriteCacheService on the leader will handle the error by + * retransmitting the current cache block. + * + * The rdlen is checked for non zero to avoid an + * IllegalArgumentException. + * + * Note: loop since addrNext might change asynchronously. + * + * @throws ExecutionException + * @throws InterruptedException + * + * @todo Since the downstream writes are against a blocking mode + * channel, the receiver on this node runs in sync with the + * receiver on the downstream node. In fact, those processes could + * be decoupled with a bit more effort and are only required to + * synchronize by the end of each received payload. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/724" > + * HA wire pulling and sure kill testing </a> + */ + private void forwardReceivedBytes(final Client client, final int rdlen) + throws InterruptedException, ExecutionException { + while (true) { + if (rdlen != 0 && addrNextRef.get() != null) { + if (log.isTraceEnabled()) + log.trace("Incremental send of " + rdlen + " bytes"); + final ByteBuffer out = localBuffer.asReadOnlyBuffer(); + out.position(localBuffer.position() - rdlen); + out.limit(localBuffer.position()); + synchronized (sendService) { + /* + * Note: Code block is synchronized on [downstream] to + * make the decision to start the HASendService that + * relays to [addrNext] atomic. The HASendService uses + * [synchronized] for its public methods so we can + * coordinate this lock with its synchronization API. + */ + if (!sendService.isRunning()) { + /* + * Prepare send service for incremental transfers to + * the specified address. + */ + // Check for termination. + client.checkFirstCause(); + // Note: use then current addrNext! + sendService.start(addrNextRef.get()); + continue; + } + } + // Check for termination. + client.checkFirstCause(); + // Send and await Future. + sendService.send(out).get(); + } + break; // break out of the inner while loop. + } // while(true) + + } + // private void ack(final Client client) throws IOException { // // if (log.isTraceEnabled()) Modified: branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-12-11 15:45:11 UTC (rev 7633) +++ branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-12-11 16:17:47 UTC (rev 7634) @@ -311,12 +311,12 @@ /** * The service implementation object. */ - protected Remote impl; + private Remote impl; /** * The exported proxy for the service implementation object. */ - protected Remote proxy; + private Remote proxy; /** * The name of the host on which the server is running. @@ -349,6 +349,8 @@ /** * The exported proxy for the service implementation object. + * + * @see #getRemoteImpl() */ public Remote getProxy() { @@ -357,6 +359,17 @@ } /** + * The service implementation object. + * + * @see #getProxy() + */ + public Remote getRemoteImpl() { + + return impl; + + } + + /** * Return the assigned {@link ServiceID}. If this is a new service and the * {@link ServiceUUID} was not specified in the {@link Configuration} then * the {@link ServiceID} will be <code>null</code> until it has been Modified: branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-12-11 15:45:11 UTC (rev 7633) +++ branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-12-11 16:17:47 UTC (rev 7634) @@ -1949,12 +1949,13 @@ * @return an object that implements whatever administration interfaces * are appropriate for the particular service. */ + @Override public Object getAdmin() throws RemoteException { if (log.isInfoEnabled()) log.info("serviceID=" + server.getServiceID()); - return server.proxy; + return server.getProxy(); } Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-11 15:45:11 UTC (rev 7633) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-11 16:17:47 UTC (rev 7634) @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import net.jini.config.Configuration; import net.jini.config.ConfigurationException; @@ -89,8 +90,8 @@ import com.bigdata.journal.ITx; import com.bigdata.journal.StoreState; import com.bigdata.journal.jini.ha.HAJournalServer.HAQuorumService; +import com.bigdata.journal.jini.ha.HAJournalServer.HAQuorumService.IHAProgressListener; import com.bigdata.journal.jini.ha.HAJournalServer.RunStateEnum; -import com.bigdata.journal.jini.ha.HAJournalServer.HAQuorumService.IHAProgressListener; import com.bigdata.quorum.AsynchronousQuorumCloseException; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.zk.ZKQuorumImpl; @@ -125,6 +126,16 @@ return new HAGlueTestImpl(serviceId); } + + /** + * The service implementation object that gets exported (not the proxy, but + * the thing that gets exported as the {@link HAGlueTest} proxy). + */ + public HAGlueTestImpl getRemoteImpl() { + + return (HAGlueTestImpl) getHAJournalServer().getRemoteImpl(); + + } /** * Utility accessible for HAGlueTest methods and public static for @@ -328,7 +339,21 @@ * Supports consistency checking between HA services */ public StoreState getStoreState() throws IOException; - + + /** + * Gets and clears a root cause that was set on the remote service. This + * is used to inspect the root cause when an RMI method is interrupted + * in the local JVM. Since the RMI was interrupted, we can not observe + * the outcome or root cause of the associated failure on the remote + * service. However, test glue can explicitly set that root cause such + * that it can then be reported using this method. + */ + public Throwable getAndClearLastRootCause() throws IOException; + + /** + * Variant that does not clear out the last root cause. + */ + public Throwable getLastRootCause() throws IOException; } /** @@ -437,7 +462,7 @@ * * @see HAJournal.HAGlueService */ - private class HAGlueTestImpl extends HAJournal.HAGlueService + class HAGlueTestImpl extends HAJournal.HAGlueService implements HAGlue, HAGlueTest, RemoteDestroyAdmin { /** @@ -479,7 +504,7 @@ false); private final AtomicLong nextTimestamp = new AtomicLong(-1L); - + private HAGlueTestImpl(final UUID serviceId) { super(serviceId); @@ -1350,7 +1375,45 @@ .set(listener); } + + @Override + public Throwable getAndClearLastRootCause() throws IOException { + + final Throwable t = lastRootCause.getAndSet(null/* newValue */); + + if (t != null) + log.warn("lastRootCause: " + t, t); + + return t; + + } + @Override + public Throwable getLastRootCause() throws IOException { + + final Throwable t = lastRootCause.get(); + + if (t != null) + log.warn("lastRootCause: " + t, t); + + return t; + + } + public void setLastRootCause(final Throwable t) { + + if (log.isInfoEnabled()) + log.info("Setting lastRootCause: " + t); + + lastRootCause.set(t); + + } + + /** + * @see HAGlueTest#getAndClearLastRootCause() + */ + private AtomicReference<Throwable> lastRootCause = new AtomicReference<Throwable>(); + + } // class HAGlueTestImpl /** Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-11 15:45:11 UTC (rev 7633) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-11 16:17:47 UTC (rev 7634) @@ -47,149 +47,148 @@ */ public class TestHA3JustKills extends AbstractHA3JournalServerTestCase { - - /** - * {@inheritDoc} - * <p> - * Note: This overrides some {@link Configuration} values for the - * {@link HAJournalServer} in order to establish conditions suitable for - * testing the {@link ISnapshotPolicy} and {@link IRestorePolicy}. - */ - @Override - protected String[] getOverrides() { - - return new String[]{ + /** + * {@inheritDoc} + * <p> + * Note: This overrides some {@link Configuration} values for the + * {@link HAJournalServer} in order to establish conditions suitable for + * testing the {@link ISnapshotPolicy} and {@link IRestorePolicy}. + */ + @Override + protected String[] getOverrides() { + + return new String[]{ // "com.bigdata.journal.HAJournal.properties=" +TestHA3JournalServer.getTestHAJournalProperties(com.bigdata.journal.HAJournal.properties), - "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)", - "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", + "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)", + "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", // "com.bigdata.journal.jini.ha.HAJournalServer.HAJournalClass=\""+HAJournalTest.class.getName()+"\"", - "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", - }; - - } - - public TestHA3JustKills() { - } + "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", + }; + + } + + public TestHA3JustKills() { + } - public TestHA3JustKills(String name) { - super(name); - } + public TestHA3JustKills(String name) { + super(name); + } - /** - * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start - * a long running LOAD. While the LOAD is running, sure kill C (the last - * follower). Verify that the LOAD completes successfully with the remaining - * services (A+B). - */ - public void testABC_LiveLoadRemainsMet_kill_C() throws Exception { + /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, sure kill C (the last + * follower). Verify that the LOAD completes successfully with the remaining + * services (A+B). + */ + public void testABC_LiveLoadRemainsMet_kill_C() throws Exception { - // enforce join order - final ABC startup = new ABC(true /*sequential*/); - - final long token = awaitFullyMetQuorum(); - - // start concurrent task loads that continue until fully met - final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( - token)); + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); - executorService.submit(ft); + executorService.submit(ft); - // allow load head start - Thread.sleep(300/* ms */); + // allow load head start + Thread.sleep(300/* ms */); - // Verify load is still running. - assertFalse(ft.isDone()); - - // Dump Zookeeper - log.warn("ZOOKEEPER\n" + dumpZoo()); - - kill(startup.serverC); - - // FIXME: in the face of no implemented error propagation we can explicitly - // tell the leader to remove the killed service! - startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + kill(startup.serverC); + + // FIXME: in the face of no implemented error propagation we can explicitly + // tell the leader to remove the killed service! + startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); - awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); + awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); - // token must remain unchanged to indicate same quorum - assertEquals(token, awaitMetQuorum()); + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); - awaitMembers(new HAGlue[] {startup.serverA, startup.serverB}); - awaitJoined(new HAGlue[] {startup.serverA, startup.serverB}); + awaitMembers(new HAGlue[] {startup.serverA, startup.serverB}); + awaitJoined(new HAGlue[] {startup.serverA, startup.serverB}); - // token must remain unchanged to indicate same quorum - assertEquals(token, awaitMetQuorum()); - - // Await LOAD, but with a timeout. - ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); - // token must remain unchanged to indicate same quorum - assertEquals(token, awaitMetQuorum()); + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); - } - - public void testStressABC_LiveLoadRemainsMet_kill_C() throws Exception { - for (int i = 0; i < 5; i++) { - try { - testABC_LiveLoadRemainsMet_kill_C(); - } catch (Throwable t) { - fail("Run " + i, t); - } finally { - Thread.sleep(1000); - destroyAll(); - } - } - } + } + + public void _testStressABC_LiveLoadRemainsMet_kill_C() throws Exception { + for (int i = 0; i < 5; i++) { + try { + testABC_LiveLoadRemainsMet_kill_C(); + } catch (Throwable t) { + fail("Run " + i, t); + } finally { + Thread.sleep(1000); + destroyAll(); + } + } + } - /** - * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start - * a long running LOAD. While the LOAD is running, sure kill B (the first - * follower). Verify that the LOAD completes successfully with the remaining - * services (A+C), after the leader re-orders the pipeline. - */ - public void testABC_LiveLoadRemainsMet_kill_B() throws Exception { + /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, sure kill B (the first + * follower). Verify that the LOAD completes successfully with the remaining + * services (A+C), after the leader re-orders the pipeline. + */ + public void testABC_LiveLoadRemainsMet_kill_B() throws Exception { - // enforce join order - final ABC startup = new ABC(true /*sequential*/); - - final long token = awaitFullyMetQuorum(); - - // start concurrent task loads that continue until fully met - final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( - token)); + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); - executorService.submit(ft); + executorService.submit(ft); - // allow load head start - Thread.sleep(1000/* ms */); + // allow load head start + Thread.sleep(1000/* ms */); - // Verify load is still running. - assertFalse(ft.isDone()); - - // Dump Zookeeper - log.warn("ZOOKEEPER\n" + dumpZoo()); - - kill(startup.serverB); - - // FIXME: temporary call to explicitly remove the service prior to correct protocol - startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + kill(startup.serverB); + + // FIXME: temporary call to explicitly remove the service prior to correct protocol + startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); - awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); - - // also check members and joined - awaitMembers(new HAGlue[] {startup.serverA, startup.serverC}); - awaitJoined(new HAGlue[] {startup.serverA, startup.serverC}); + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); + + // also check members and joined + awaitMembers(new HAGlue[] {startup.serverA, startup.serverC}); + awaitJoined(new HAGlue[] {startup.serverA, startup.serverC}); - // token must remain unchanged to indicate same quorum - assertEquals(token, awaitMetQuorum()); - - // Await LOAD, but with a timeout. - ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); - // token must remain unchanged to indicate same quorum - assertEquals(token, awaitMetQuorum()); + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); - } + } /** * Base class for sure kill of a process when write replication reaches a @@ -243,8 +242,13 @@ log.error("msg=" + msg + ", nreads=" + nreads + ", rdlen=" + rdlen + ", rem=" + rem); - // Note: This is the *opening* root block counter. - if (msg.getCommitCounter() == 1L && nreads > 1) { + /* + * Note: This is the *opening* root block counter. The process will + * be killed as soon as it has received the first chunk of data for + * the payload of the first replicated write cache block. + */ + + if (msg.getCommitCounter() == 1L) { sureKill(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |