From: <tho...@us...> - 2014-01-10 13:17:45
|
Revision: 7757 http://bigdata.svn.sourceforge.net/bigdata/?rev=7757&view=rev Author: thompsonbry Date: 2014-01-10 13:17:37 +0000 (Fri, 10 Jan 2014) Log Message: ----------- Updated javadoc on the receive and replicate and robust send tasks in QuorumPipelineImpl. Renamed the local and remote futures as futLoc and futRmt. The local future is either the HASendService or the HAReceiveService (with an inner HASendService). The remote future is the RMI to the HAPipeline interface for receiveAndReplicate() or receive(). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2014-01-09 23:03:43 UTC (rev 7756) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2014-01-10 13:17:37 UTC (rev 7757) @@ -1919,15 +1919,15 @@ ExecutionException, IOException { // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService.send(b, snd.getMarker()); + final Future<Void> futLoc = sendService.send(b, snd.getMarker()); try { try { // Get Future for receive outcome on the remote service // (RMI). - final Future<Void> futRec; + final Future<Void> futRmt; try { - futRec = downstream.service.receiveAndReplicate(req, + futRmt = downstream.service.receiveAndReplicate(req, snd, msg); } catch (IOException ex) { // RMI error. throw new ImmediateDownstreamReplicationException(ex); @@ -1945,20 +1945,28 @@ * both Futures are done. Interrupts are not trapped, so * an interrupt will still exit the loop. * - * It appears that it is possible for futSnd to be blocked - * and not generate an error. If we do not exit the loop - * and check the futRec future in this case then we coul loop - * continuously. This does rather beg the question of - * whether we should only be checking futRec at this stage. + * It appears that it is possible for futSnd to be + * blocked and not generate an error. If we do not exit + * the loop and check the futRec future in this case + * then we coul loop continuously. This does rather beg + * the question of whether we should only be checking + * futRec at this stage. + * + * Note: [futRmt] is currently a ThickFuture to avoid + * historical problems with DGC and is already done by + * the time the RMI returns that ThickFuture to us. + * Therefore the loop below can be commented out. All we + * are really doing is waiting on futSnd and verifying + * that the [token] remains valid. */ - while (!futSnd.isDone() || !futRec.isDone()) { + while ((!futLoc.isDone() || !futRmt.isDone())) { /* * Make sure leader's quorum token remains valid for * ALL writes. */ member.assertLeader(token); try { - futSnd.get(500L, TimeUnit.MILLISECONDS); + futLoc.get(500L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { /* @@ -1966,18 +1974,18 @@ * if not done. */ try { - futRec.get(500L, TimeUnit.MILLISECONDS); + futRmt.get(500L, TimeUnit.MILLISECONDS); } catch(TimeoutException ex) { // Ignore. } catch(ExecutionException ex) { // Ignore. } finally { - futRec.cancel(true/* mayInterruptIfRunning */); + futRmt.cancel(true/* mayInterruptIfRunning */); } /* * Note: Both futures are DONE at this point. */ } try { - futRec.get(500L, TimeUnit.MILLISECONDS); + futRmt.get(500L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { /* @@ -1985,37 +1993,40 @@ * if not done. */ try { - futSnd.get(500L, TimeUnit.MILLISECONDS); + futLoc.get(500L, TimeUnit.MILLISECONDS); } catch(TimeoutException ex) { // Ignore. } catch(ExecutionException ex) { // Ignore. } finally { - futSnd.cancel(true/* mayInterruptIfRunning */); + futLoc.cancel(true/* mayInterruptIfRunning */); } /* * Note: Both futures are DONE at this point. */ } + /* + * Note: Both futures are DONE at this point. + */ } /* - * Note: We want to check the remote Future for the downstream - * service first in order to accurately report the - * service that was the source of a pipeline replication - * problem. + * Note: We want to check the remote Future for the + * downstream service first in order to accurately + * report the service that was the source of a pipeline + * replication problem. */ - futRec.get(); - futSnd.get(); + futRmt.get(); + futLoc.get(); } finally { - if (!futRec.isDone()) { + if (!futRmt.isDone()) { // cancel remote Future unless done. - futRec.cancel(true/* mayInterruptIfRunning */); + futRmt.cancel(true/* mayInterruptIfRunning */); } } } finally { // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); + futLoc.cancel(true/* mayInterruptIfRunning */); } } catch (Throwable t) { launderPipelineException(true/* isLeader */, token, member, outerClass, t); @@ -2530,7 +2541,7 @@ req, snd, msg); // Get Future for receive() outcome on local service. - final Future<Void> futRec = receiveService.receiveData(wrappedMsg, + final Future<Void> futLoc = receiveService.receiveData(wrappedMsg, b); try { @@ -2538,9 +2549,9 @@ // Get Future for receive outcome on the remote service // (RMI). - final Future<Void> futRep; + final Future<Void> futRmt; try { - futRep = downstream.service.receiveAndReplicate(req, + futRmt = downstream.service.receiveAndReplicate(req, snd, msg); } catch (IOException ex) { // RMI error. throw new ImmediateDownstreamReplicationException(ex); @@ -2558,10 +2569,14 @@ * both Futures are done. Interrupts are not trapped, so * an interrupt will still exit the loop. * - * TODO: check the comparative logic with this and robustReplicate - * to confirm the equivalence of checking the different futures. + * Note: [futRmt] is currently a ThickFuture to avoid + * historical problems with DGC and is already done by + * the time the RMI returns that ThickFuture to us. + * Therefore the loop below can be commented out. All we + * are really doing is waiting on futSnd and verifying + * that the [token] remains valid. */ - while (!futRec.isDone() || !futRep.isDone()) { + while ((!futLoc.isDone() || !futRmt.isDone())) { /* * The token must remain valid, even if this service * is not joined with the met quorum. If fact, @@ -2571,7 +2586,7 @@ */ member.getQuorum().assertQuorum(token); try { - futRec.get(500L, TimeUnit.MILLISECONDS); + futLoc.get(500L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { /* @@ -2579,18 +2594,18 @@ * if not done. */ try { - futRep.get(500L, TimeUnit.MILLISECONDS); + futRmt.get(500L, TimeUnit.MILLISECONDS); } catch(TimeoutException ex) { // Ignore. } catch(ExecutionException ex) { // Ignore. } finally { - futRep.cancel(true/* mayInterruptIfRunning */); + futRmt.cancel(true/* mayInterruptIfRunning */); } /* * Note: Both futures are DONE at this point. */ } try { - futRep.get(500L, TimeUnit.MILLISECONDS); + futRmt.get(500L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { /* @@ -2598,38 +2613,40 @@ * if not done. */ try { - futRec.get(500L, TimeUnit.MILLISECONDS); + futLoc.get(500L, TimeUnit.MILLISECONDS); } catch(TimeoutException ex) { // Ignore. } catch(ExecutionException ex) { // Ignore. } finally { - futRec.cancel(true/* mayInterruptIfRunning */); + futLoc.cancel(true/* mayInterruptIfRunning */); } /* * Note: Both futures are DONE at this point. */ } + /* + * Note: Both futures are DONE at this point. + */ } /* - * Note: Both futures are DONE (or not - check condition above) at this point. However, - * we want to check the remote Future for the downstream - * service first in order to accurately report the - * service that was the source of a pipeline replication - * problem. + * Note: We want to check the remote Future for the + * downstream service first in order to accurately + * report the service that was the source of a pipeline + * replication problem. */ - futRec.get(); - futRep.get(); + futLoc.get(); + futRmt.get(); } finally { - if (!futRep.isDone()) { + if (!futRmt.isDone()) { // cancel remote Future unless done. - futRep.cancel(true/* mayInterruptIfRunning */); + futRmt.cancel(true/* mayInterruptIfRunning */); } } } finally { // cancel the local Future. - futRec.cancel(true/* mayInterruptIfRunning */); + futLoc.cancel(true/* mayInterruptIfRunning */); } } catch (Throwable t) { launderPipelineException(false/* isLeader */, token, member, This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |