From: <mar...@us...> - 2013-12-13 10:45:03
|
Revision: 7640 http://bigdata.svn.sourceforge.net/bigdata/?rev=7640&view=rev Author: martyncutcher Date: 2013-12-13 10:44:56 +0000 (Fri, 13 Dec 2013) Log Message: ----------- Intermediate implementation of PipelineException and pipeline propagation leading to forceRemoveService invocation on leader Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java 2013-12-13 10:44:56 UTC (rev 7640) @@ -0,0 +1,56 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.ha; + +import java.util.UUID; + +/** + * PipelineException is thrown from RMI calls to communicate + * the root cause of a pipeline problem. The caller is then able + * to take action: for example to remove the problem service + * from the quorum. + */ +public class PipelineException extends RuntimeException { + + /** + * Generated ID + */ + private static final long serialVersionUID = 8019938954269914574L; + + /** The UUID of the service that could not be reached. */ + private final UUID serviceId; + + public PipelineException(final UUID serviceId, final Throwable t) { + super(t); + + this.serviceId = serviceId; + } + + /** Return the UUID of the service that could not be reached. */ + public UUID getProblemServiceId() { + return serviceId; + } + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 09:07:46 UTC (rev 7639) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 10:44:56 UTC (rev 7640) @@ -1729,56 +1729,91 @@ } private void doRunWithLock() throws InterruptedException, - ExecutionException, IOException { + ExecutionException, IOException { - // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService.send(b, snd.getMarker()); + try { + // Get Future for send() outcome on local service. + final Future<Void> futSnd = sendService + .send(b, snd.getMarker()); - try { + try { - // Get Future for receive outcome on the remote service (RMI). - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, snd, msg); + // Get Future for receive outcome on the remote service + // (RMI). + final Future<Void> futRec = downstream.service + .receiveAndReplicate(req, snd, msg); - try { + try { - /* - * Await the Futures, but spend more time waiting on the - * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * Make sure leader's quorum token remains valid for ALL - * writes. - */ - member.assertLeader(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futSnd.isDone() && !futRec.isDone()) { + /* + * Make sure leader's quorum token remains valid for + * ALL writes. + */ + member.assertLeader(token); + try { + futSnd.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futRec.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futSnd.get(); + futRec.get(); - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec.cancel(true/* mayInterruptIfRunning */); - } - } + } finally { + if (!futRec.isDone()) { + // cancel remote Future unless done. + futRec.cancel(true/* mayInterruptIfRunning */); + } + } - } finally { - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } + } finally { + // cancel the local Future. + futSnd.cancel(true/* mayInterruptIfRunning */); + } - } - + } catch (Throwable t) { + // check inner cause for downstream PipelineException + final PipelineException pe = (PipelineException) InnerCause + .getInnerCause(t, PipelineException.class); + final UUID problemService; + if (pe != null) { + // throw pe; // throw it upstream - already should have been + // handled + problemService = pe.getProblemServiceId(); + } else { + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + problemService = priorAndNext[1]; + } + + // determine next pipeline service id + log.warn("Problem with downstream service: " + problemService, + t); + + // Carry out remedial work directly - BAD + log.error("Really need to remove service " + problemService); + + try { + member.getActor().forceRemoveService(problemService); + } catch (Exception e) { + log.warn("Problem on node removal", e); + + throw new RuntimeException(e); + } + + throw new PipelineException(problemService, t); + + } + } } /** @@ -1996,69 +2031,81 @@ } @Override - public Void call() throws Exception { + public Void call() throws Exception { - // wrap the messages together. - final HAMessageWrapper wrappedMsg = new HAMessageWrapper( - req, snd, msg); + // wrap the messages together. + final HAMessageWrapper wrappedMsg = new HAMessageWrapper(req, snd, + msg); - // Get Future for send() outcome on local service. - final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, - b); + // Get Future for send() outcome on local service. + final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, + b); - try { + try { + try { - // Get future for receive outcome on the remote - // service. - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, snd, msg); + // Get future for receive outcome on the remote + // service. + final Future<Void> futRec = downstream.service + .receiveAndReplicate(req, snd, msg); - try { + try { - /* - * Await the Futures, but spend more time - * waiting on the local Future and only check - * the remote Future every second. Timeouts are - * ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * The token must remain valid, even if this service is - * not joined with the met quorum. If fact, services - * MUST replicate writes regardless of whether or not - * they are joined with the met quorum, but only while - * there is a met quorum. - */ - member.getQuorum().assertQuorum(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futSnd.isDone() && !futRec.isDone()) { + /* + * The token must remain valid, even if this service + * is not joined with the met quorum. If fact, + * services MUST replicate writes regardless of + * whether or not they are joined with the met + * quorum, but only while there is a met quorum. + */ + member.getQuorum().assertQuorum(token); + try { + futSnd.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futRec.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futSnd.get(); + futRec.get(); - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec - .cancel(true/* mayInterruptIfRunning */); - } - } + } finally { + if (!futRec.isDone()) { + // cancel remote Future unless done. + futRec.cancel(true/* mayInterruptIfRunning */); + } + } - } finally { - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } + } finally { + // Is it possible that this cancel conflicts with throwing + // the PipelineException? + // cancel the local Future. + futSnd.cancel(true/* mayInterruptIfRunning */); + } + } catch (Throwable t) { + // determine next pipeline service id + // FIXME: should this check for problem from further downstream for + // quorums with > 3 services? + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + log.warn("Problem with downstream service: " + priorAndNext[1], + t); - // done - return null; - } + throw new PipelineException(priorAndNext[1], t); + } + // done + return null; + } + } /** Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-13 09:07:46 UTC (rev 7639) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-13 10:44:56 UTC (rev 7640) @@ -106,7 +106,7 @@ // FIXME: in the face of no implemented error propagation we can explicitly // tell the leader to remove the killed service! - startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); + // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); @@ -171,7 +171,7 @@ kill(startup.serverB); // FIXME: temporary call to explicitly remove the service prior to correct protocol - startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); + // startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); @@ -317,8 +317,8 @@ // FIXME: temporary call to explicitly remove the service prior to // correct protocol - startup.serverA.submit(new ForceRemoveService(getServiceBId()), true) - .get(); + // startup.serverA.submit(new ForceRemoveService(getServiceBId()), true) + // .get(); awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] { startup.serverA, startup.serverC }); @@ -396,8 +396,8 @@ // FIXME: temporary call to explicitly remove the service prior to // correct protocol - startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) - .get(); + // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) + // .get(); awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] { startup.serverA, startup.serverB }); @@ -496,8 +496,8 @@ executorService.submit(ft); // FIXME RESYNC_PIPELINE: move into QuorumPipelineImpl. - startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) - .get(); + // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) + // .get(); awaitPipeline(getZKSessionTimeout() + 5000, TimeUnit.MILLISECONDS, new HAGlue[] { startup.serverA, startup.serverB }); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |