From: <tho...@us...> - 2013-05-28 15:07:56
|
Revision: 7166 http://bigdata.svn.sourceforge.net/bigdata/?rev=7166&view=rev Author: thompsonbry Date: 2013-05-28 15:07:45 +0000 (Tue, 28 May 2013) Log Message: ----------- - Bug fix to recent commit (r7162) where I broke RESTORE. - Significant changes to handleReplicatedWrite() and enterErrorState() in order to address the failure to identify in a timely fashion live writes that violate the expectation of a joined service and to ensure that a joined service whose expectations are violated for a live write will: (a) do a serviceLeave(); (b) call setQuorumToken() on the journal with the then current quorum token in order to clear the haReadyToken and haStatus fields; (c) disable the open HALog. These changes to handleReplicatedWrite are green for all HA CI tests *except* the "overrides" test suite. I am committing this to sync to Martyn while I work on those overrides. Revision Links: -------------- http://bigdata.svn.sourceforge.net/bigdata/?rev=7162&view=rev Modified Paths: -------------- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-05-28 15:04:14 UTC (rev 7165) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-05-28 15:07:45 UTC (rev 7166) @@ -1146,6 +1146,39 @@ void enterErrorState() { + /* + * Do synchronous service leave. + */ + serviceLeave(); + + /* + * Update the haReadyTokena and haStatus regardless of whether the + * quorum token has changed since this service is no longer joined + * with a met quorum. + */ + journal.setQuorumToken(getQuorum().token()); + + logLock.lock(); + try { + if (journal.getHALogNexus().isHALogOpen()) { + /* + * Note: Closing the HALog is necessary for us to be able to + * re-enter SeekConsensus without violating a pre-condition + * for that run state. + */ + try { + journal.getHALogNexus().disableHALog(); + } catch (IOException e) { + log.error(e, e); + } + } + } finally { + logLock.unlock(); + } + + /* + * Transition into the error state. + */ enterRunState(new ErrorTask()); } @@ -1566,26 +1599,26 @@ * yet. */ // server.haGlueService.bounceZookeeperConnection(); - /* - * Note: Try moving to doRejectedCommit() so this will be - * synchronous. - */ - logLock.lock(); - try { - if (journal.getHALogNexus().isHALogOpen()) { - /* - * Note: Closing the HALog is necessary for us to be - * able to re-enter SeekConsensus without violating a - * pre-condition for that run state. - */ - journal.getHALogNexus().disableHALog(); - } - } finally { - logLock.unlock(); - } +// /* +// * Note: Try moving to doRejectedCommit() so this will be +// * synchronous. +// */ +// logLock.lock(); +// try { +// if (journal.getHALogNexus().isHALogOpen()) { +// /* +// * Note: Closing the HALog is necessary for us to be +// * able to re-enter SeekConsensus without violating a +// * pre-condition for that run state. +// */ +// journal.getHALogNexus().disableHALog(); +// } +// } finally { +// logLock.unlock(); +// } - // Force a service leave. - getQuorum().getActor().serviceLeave(); +// // Force a service leave. +// getQuorum().getActor().serviceLeave(); // /* // * Set token. Journal will notice that it is no longer @@ -1909,11 +1942,13 @@ final long commitCounter = journal.getRootBlockView() .getCommitCounter(); - final IHALogReader r = journal.getHALogNexus().getReader( - commitCounter + 1); + IHALogReader r = null; try { + r = journal.getHALogNexus() + .getReader(commitCounter + 1); + if (r.isEmpty()) { /* @@ -1956,7 +1991,11 @@ } finally { - r.close(); + if (r != null) { + + r.close(); + + } } @@ -2791,6 +2830,7 @@ protected void handleReplicatedWrite(final IHASyncRequest req, final IHAWriteMessage msg, final ByteBuffer data) throws Exception { + if (req == null //&& journal.getQuorumToken() == Quorum.NO_QUORUM && journal.getRootBlockView().getCommitCounter() == 0L && (msg.getUUID() != null && !journal.getUUID().equals(msg.getUUID()))) { @@ -2816,6 +2856,7 @@ */ return; } + pipelineSetup(); logLock.lock(); @@ -2829,9 +2870,9 @@ // Save off reference to most recent *live* message. journal.getHALogNexus().lastLiveHAWriteMessage = msg; - } + } else - if (req != null && req instanceof IHARebuildRequest) { + if (/*req != null &&*/ req instanceof IHARebuildRequest) { /* * This message and payload are part of a ground up service @@ -2906,37 +2947,86 @@ handleResyncMessage((IHALogRequest) req, msg, data); - } else if (req == null // Note: MUST be a live message! - && journal.getRootBlockView().getCommitCounter() == msg - .getCommitCounter() - && isJoinedMember(msg.getQuorumToken())) { + return; + } else if (req != null) { + /* - * We are not resynchronizing this service. This is a - * message for the current write set. The service is joined - * with the quorum. + * A historical message that is being ignored on this node. */ + + dropMessage(req, msg, data); - // write on the log and the local store. - acceptHAWriteMessage(msg, data); + return; + + } else { - } else { + assert req == null; // Note: MUST be a live message! + + if (!isJoinedMember(msg.getQuorumToken())) { + + /* + * If we are not joined, we can not do anything with a + * live write. + */ + + dropMessage(req, msg, data); + + return; + + } + + try { + + /* + * We are not resynchronizing this service. + * + * The service is joined with the quorum. + * + * The message SHOULD be for the current commit counter + * and the expected next write cache block sequence. If + * it is not, then we will enter error handling logic + * below. + */ + + // write on the log and the local store. + acceptHAWriteMessage(msg, data); + + return; + + } catch(Throwable t) { + if (InnerCause.isInnerCause(t, + InterruptedException.class)) { + // propagate interrupt + Thread.currentThread().interrupt(); + return; + } + /* + * Error handler. + * + * Live write is not for expected commit counter and + * write cache block sequence. + */ + log.error(t, t); + try { + enterErrorState(); + } catch (RuntimeException e) { + // log and ignore. + log.error(e, e); + } + // rethrow exception. + throw new RuntimeException(t); + } - if (log.isInfoEnabled()) - log.info("Ignoring message: " + msg); - - /* - * Drop the pipeline message. - * - * Note: There are two cases here. - * - * (A) It is a historical message that is being ignored on - * this node; - * - * (B) It is a live message, but this node is not caught up - * and therefore can not log the message yet. - */ - +// /* +// * Drop the pipeline message. +// * +// * Note: It is a live message, but this node is not caught +// * up and therefore can not log the message yet. +// */ +// +// dropMessage(req, msg, data); + } } finally { @@ -2947,6 +3037,14 @@ } + private void dropMessage(final IHASyncRequest req, + final IHAWriteMessage msg, final ByteBuffer data) { + + if (log.isInfoEnabled()) + log.info("Ignoring message: req=" + req + ", msg=" + msg); + + } + /** * Adjust the size on the disk of the local store to that given in the * message. @@ -3217,12 +3315,21 @@ private void acceptHAWriteMessage(final IHAWriteMessage msg, final ByteBuffer data) throws IOException, InterruptedException { - if (msg.getCommitCounter() != journal.getHALogNexus() - .getCommitCounter()) { + // Note: Caller must be holding the logLock! + + final long expectedCommitCounter = journal.getHALogNexus() + .getCommitCounter(); - throw new AssertionError(); + final long expectedBlockSequence = journal.getHALogNexus() + .getSequence(); - } + if (msg.getCommitCounter() != expectedCommitCounter) + throw new IllegalStateException("expectedCommitCounter=" + + expectedCommitCounter+ ", but msg=" + msg); + + if (msg.getSequence() != expectedBlockSequence) + throw new IllegalStateException("expectedBlockSequence=" + + expectedBlockSequence + ", but msg=" + msg); /* * Log the message and write cache block. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |