From: <tho...@us...> - 2013-12-17 18:00:50
|
Revision: 7668 http://bigdata.svn.sourceforge.net/bigdata/?rev=7668&view=rev Author: thompsonbry Date: 2013-12-17 18:00:44 +0000 (Tue, 17 Dec 2013) Log Message: ----------- Sync to martyn. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 17:27:53 UTC (rev 7667) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 18:00:44 UTC (rev 7668) @@ -1237,7 +1237,7 @@ // FIXME Versus using the inner handler? // innerEventHandler.pipelineUpstreamChange(); // innerEventHandler.pipelineChange(oldDownStreamId, newDownStreamId); - + log.warn("Will reset pipeline"); if (receiveService != null) { receiveService.changeUpStream(); @@ -1755,7 +1755,7 @@ final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, final HASendService sendService, - final QuorumPipelineImpl outerClass, final Lock sendLock) { + final QuorumPipelineImpl<S> outerClass, final Lock sendLock) { this.member = member; this.token = token; @@ -1873,7 +1873,7 @@ */ static private <S extends HAPipelineGlue> void launderPipelineException( final boolean isLeader, final long token, - final QuorumMember<S> member, final QuorumPipelineImpl outerClass, + final QuorumMember<S> member, final QuorumPipelineImpl<S> outerClass, final Throwable t) { log.warn("isLeader=" + isLeader + ", t=" + t, t); @@ -1904,23 +1904,29 @@ member.getServiceId()); if (isLeader) { + + // The problem service (iff identified). + UUID problemServiceId = null; try { - + /* * If we can identify the problem service, then force it out of * the pipeline. It can re-enter the pipeline once it * transitions through its ERROR state. */ + if (directCause != null) { - member.getActor().forceRemoveService(priorAndNext[1]); + problemServiceId = priorAndNext[1]; + + member.getActor().forceRemoveService(problemServiceId); } else if (remoteCause != null) { - final UUID problemService = remoteCause.getProblemServiceId(); + problemServiceId = remoteCause.getProblemServiceId(); - member.getActor().forceRemoveService(problemService); + member.getActor().forceRemoveService(problemServiceId); } else { @@ -1928,28 +1934,46 @@ } - /** - * Reset the pipeline on each service (including the leader). If - * replication fails, the socket connections both upstream and - * downstream of the point of failure can be left in an - * indeterminate state with partially buffered data. In order to - * bring the pipeline back into a known state (without forcing a - * quorum break) we message each service in the pipeline to - * reset its HAReceiveService (including the inner - * HASendService. The next message and payload relayed from the - * leader will cause new socket connections to be established. - * - * @see <a - * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" - * > HA Wire Pulling and Sudden Kills </a> - */ - outerClass.resetPipeline(token); + } catch (Throwable e) { + + // Log and continue. + log.error("Problem on node removal", e); - } catch (Exception e) { + if(InnerCause.isInnerCause(t, InterruptedException.class)) { + // Propagate interrupt. + Thread.currentThread().interrupt(); + } - log.error("Problem on node removal", e); + } - throw new RuntimeException(e); + /** + * Reset the pipeline on each service (including the leader). If + * replication fails, the socket connections both upstream and + * downstream of the point of failure can be left in an + * indeterminate state with partially buffered data. In order to + * bring the pipeline back into a known state (without forcing a + * quorum break) we message each service in the pipeline to reset + * its HAReceiveService (including the inner HASendService. The next + * message and payload relayed from the leader will cause new socket + * connections to be established. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" + * > HA Wire Pulling and Sudden Kills </a> + */ + try { + + outerClass.resetPipeline(token, problemServiceId); + + } catch (Throwable e) { + + // Log and continue. + log.error("Problem on reset pipeline", e); + + if(InnerCause.isInnerCause(t, InterruptedException.class)) { + // Propagate interrupt. + Thread.currentThread().interrupt(); + } } @@ -1980,9 +2004,10 @@ * * @param token * The quorum token on the leader. - * @param member + * @param problemServiceId + * The problem service in the write pipeline (if known). */ - private void resetPipeline(final long token) { + private void resetPipeline(final long token, final UUID problemServiceId) { log.error("Leader will reset pipeline: " + token); @@ -2019,7 +2044,7 @@ */ final Future<IHAPipelineResetResponse> rf = member .getExecutor().submit( - new PipelineResetMessageTask(member, + new PipelineResetMessageTask<S>(member, serviceId, msg)); // add to list of futures we will check. @@ -2067,10 +2092,6 @@ /* * If there were any errors, then throw an exception listing them. - * - * TODO But only throw an exception for the joined services. - * Non-joined services, we just long an error (or simply do not tell - * them to do an abort()). */ if (!causes.isEmpty()) { // Throw exception back to the leader. Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-17 17:27:53 UTC (rev 7667) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-17 18:00:44 UTC (rev 7668) @@ -1117,7 +1117,7 @@ public Future<IHAPipelineResetResponse> resetPipeline( final IHAPipelineResetRequest req) throws IOException { - checkMethod("resetPipeline", new Class[] {}); + checkMethod("resetPipeline", new Class[] {IHAPipelineResetRequest.class}); return super.resetPipeline(req); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |