This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <tho...@us...> - 2013-12-18 17:28:35
|
Revision: 7676 http://bigdata.svn.sourceforge.net/bigdata/?rev=7676&view=rev Author: thompsonbry Date: 2013-12-18 17:28:28 +0000 (Wed, 18 Dec 2013) Log Message: ----------- updated release notes Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt 2013-12-18 16:41:11 UTC (rev 7675) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt 2013-12-18 17:28:28 UTC (rev 7676) @@ -38,8 +38,9 @@ Road map [3]: -- RDF Graph Mining API ([12]). -- Reification Done Right ([11]). +- RDF Graph Mining API [12]; +- Reification Done Right [11]; +- Column-wise indexing; - Runtime Query Optimizer for Analytic Query mode; - Performance optimization for scale-out clusters; and - Simplified deployment, configuration, and administration for scale-out clusters. @@ -56,8 +57,20 @@ - http://sourceforge.net/apps/trac/bigdata/ticket/639 (Remove triple-buffering in RWStore) - http://sourceforge.net/apps/trac/bigdata/ticket/645 (HA backup) - http://sourceforge.net/apps/trac/bigdata/ticket/646 (River not compatible with newer 1.6.0 and 1.7.0 JVMs) +- http://sourceforge.net/apps/trac/bigdata/ticket/648 (Add a custom function to use full text index for filtering.) +- http://sourceforge.net/apps/trac/bigdata/ticket/651 (RWS test failure) - http://sourceforge.net/apps/trac/bigdata/ticket/652 (Compress write cache blocks for replication and in HALogs) +- http://sourceforge.net/apps/trac/bigdata/ticket/662 (Latency on followers during commit on leader) +- http://sourceforge.net/apps/trac/bigdata/ticket/663 (Issue with OPTIONAL blocks) +- http://sourceforge.net/apps/trac/bigdata/ticket/664 (RWStore needs post-commit protocol) +- http://sourceforge.net/apps/trac/bigdata/ticket/665 (HA3 LOAD non-responsive with node failure) +- http://sourceforge.net/apps/trac/bigdata/ticket/666 (Occasional CI deadlock in HALogWriter testConcurrentRWWriterReader) +- http://sourceforge.net/apps/trac/bigdata/ticket/670 (Accumulating HALog files cause latency for HA commit) +- http://sourceforge.net/apps/trac/bigdata/ticket/671 (Query on follower fails during UPDATE on leader) +- http://sourceforge.net/apps/trac/bigdata/ticket/673 (DGC in release time consensus protocol causes native thread leak in HAJournalServer at each commit) - http://sourceforge.net/apps/trac/bigdata/ticket/674 (WCS write cache compaction causes errors in RWS postHACommit()) +- http://sourceforge.net/apps/trac/bigdata/ticket/676 (Bad patterns for timeout computations) +- http://sourceforge.net/apps/trac/bigdata/ticket/677 (HA deadlock under UPDATE + QUERY) - http://sourceforge.net/apps/trac/bigdata/ticket/678 (DGC Thread and Open File Leaks: sendHALogForWriteSet()) - http://sourceforge.net/apps/trac/bigdata/ticket/679 (HAJournalServer can not restart due to logically empty log file) - http://sourceforge.net/apps/trac/bigdata/ticket/681 (HAJournalServer deadlock: pipelineRemove() and getLeaderId()) @@ -65,37 +78,52 @@ - http://sourceforge.net/apps/trac/bigdata/ticket/686 (Consensus protocol does not detect clock skew correctly) - http://sourceforge.net/apps/trac/bigdata/ticket/687 (HAJournalServer Cache not populated) - http://sourceforge.net/apps/trac/bigdata/ticket/689 (Missing URL encoding in RemoteRepositoryManager) +- http://sourceforge.net/apps/trac/bigdata/ticket/690 (Error when using the alias "a" instead of rdf:type for a multipart insert) - http://sourceforge.net/apps/trac/bigdata/ticket/691 (Failed to re-interrupt thread in HAJournalServer) +- http://sourceforge.net/apps/trac/bigdata/ticket/692 (Failed to re-interrupt thread) - http://sourceforge.net/apps/trac/bigdata/ticket/693 (OneOrMorePath SPARQL property path expression ignored) - http://sourceforge.net/apps/trac/bigdata/ticket/694 (Transparently cancel update/query in RemoteRepository) - http://sourceforge.net/apps/trac/bigdata/ticket/695 (HAJournalServer reports "follower" but is in SeekConsensus and is not participating in commits.) -- http://sourceforge.net/apps/trac/bigdata/ticket/696 (Incorrect HttpEntity consuming in RemoteRepositoryManager) - http://sourceforge.net/apps/trac/bigdata/ticket/701 (Problems in BackgroundTupleResult) +- http://sourceforge.net/apps/trac/bigdata/ticket/702 (InvocationTargetException on /namespace call) - http://sourceforge.net/apps/trac/bigdata/ticket/704 (ask does not return json) - http://sourceforge.net/apps/trac/bigdata/ticket/705 (Race between QueryEngine.putIfAbsent() and shutdownNow()) - http://sourceforge.net/apps/trac/bigdata/ticket/706 (MultiSourceSequentialCloseableIterator.nextSource() can throw NPE) - http://sourceforge.net/apps/trac/bigdata/ticket/707 (BlockingBuffer.close() does not unblock threads) -- http://sourceforge.net/apps/trac/bigdata/ticket/708 (BIND heisenbug) +- http://sourceforge.net/apps/trac/bigdata/ticket/708 (BIND heisenbug - race condition on select query with BIND) - http://sourceforge.net/apps/trac/bigdata/ticket/711 (sparql protocol: mime type application/sparql-query) - http://sourceforge.net/apps/trac/bigdata/ticket/712 (SELECT ?x { OPTIONAL { ?x eg:doesNotExist eg:doesNotExist } } incorrect) - http://sourceforge.net/apps/trac/bigdata/ticket/715 (Interrupt of thread submitting a query for evaluation does not always terminate the AbstractRunningQuery) - http://sourceforge.net/apps/trac/bigdata/ticket/716 (Verify that IRunningQuery instances (and nested queries) are correctly cancelled when interrupted) +- http://sourceforge.net/apps/trac/bigdata/ticket/718 (HAJournalServer needs to handle ZK client connection loss) - http://sourceforge.net/apps/trac/bigdata/ticket/720 (HA3 simultaneous service start failure) - http://sourceforge.net/apps/trac/bigdata/ticket/723 (HA asynchronous tasks must be canceled when invariants are changed) +- http://sourceforge.net/apps/trac/bigdata/ticket/725 (FILTER EXISTS in subselect) - http://sourceforge.net/apps/trac/bigdata/ticket/726 (Logically empty HALog for committed transaction) +- http://sourceforge.net/apps/trac/bigdata/ticket/727 (DELETE/INSERT fails with OPTIONAL non-matching WHERE) - http://sourceforge.net/apps/trac/bigdata/ticket/728 (Refactor to create HAClient) +- http://sourceforge.net/apps/trac/bigdata/ticket/729 (ant bundleJar not working) - http://sourceforge.net/apps/trac/bigdata/ticket/731 (CBD and Update leads to 500 status code) - http://sourceforge.net/apps/trac/bigdata/ticket/732 (describe statement limit does not work) +- http://sourceforge.net/apps/trac/bigdata/ticket/733 (Range optimizer not optimizing Slice service) - http://sourceforge.net/apps/trac/bigdata/ticket/734 (two property paths interfere) - http://sourceforge.net/apps/trac/bigdata/ticket/736 (MIN() malfunction) +- http://sourceforge.net/apps/trac/bigdata/ticket/737 (class cast exception) +- http://sourceforge.net/apps/trac/bigdata/ticket/739 (Inconsistent treatment of bind and optional property path) +- http://sourceforge.net/apps/trac/bigdata/ticket/741 (ctc-striterators should build as independent top-level project (Apache2)) - http://sourceforge.net/apps/trac/bigdata/ticket/743 (AbstractTripleStore.destroy() does not filter for correct prefix) +- http://sourceforge.net/apps/trac/bigdata/ticket/746 (Assertion error) +- http://sourceforge.net/apps/trac/bigdata/ticket/747 (BOUND bug) +- http://sourceforge.net/apps/trac/bigdata/ticket/748 (incorrect join with subselect renaming vars) - http://sourceforge.net/apps/trac/bigdata/ticket/754 (Failure to setup SERVICE hook and changeLog for Unisolated and Read/Write connections) - http://sourceforge.net/apps/trac/bigdata/ticket/755 (Concurrent QuorumActors can interfere leading to failure to progress) -- http://sourceforge.net/apps/trac/bigdata/ticket/718 (HAJournalServer needs to handle ZK client connection loss) +- http://sourceforge.net/apps/trac/bigdata/ticket/756 (order by and group_concat) - http://sourceforge.net/apps/trac/bigdata/ticket/760 (Code review on 2-phase commit protocol) - http://sourceforge.net/apps/trac/bigdata/ticket/764 (RESYNC failure (HA)) +- http://sourceforge.net/apps/trac/bigdata/ticket/770 (alpp ordering) - http://sourceforge.net/apps/trac/bigdata/ticket/772 (Query timeout only checked at operator start/stop.) -- http://sourceforge.net/apps/trac/bigdata/ticket/777 (ConcurrentModificationException in ASTComplexOptionalOptimizer) +- http://sourceforge.net/apps/trac/bigdata/ticket/776 (Closed as duplicate of #490) +- http://sourceforge.net/apps/trac/bigdata/ticket/778 (HA Leader fail results in transient problem with allocations on other services) - http://sourceforge.net/apps/trac/bigdata/ticket/783 (Operator Alerts (HA)) 1.2.4: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-18 16:41:17
|
Revision: 7675 http://bigdata.svn.sourceforge.net/bigdata/?rev=7675&view=rev Author: thompsonbry Date: 2013-12-18 16:41:11 +0000 (Wed, 18 Dec 2013) Log Message: ----------- Disabling some known bad tests for the 1.3.0 release. These are linked to BigdataStatics.runKnownBadTests Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3ChangeLeader.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3ChangeLeader.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3ChangeLeader.java 2013-12-18 15:21:37 UTC (rev 7674) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3ChangeLeader.java 2013-12-18 16:41:11 UTC (rev 7675) @@ -92,7 +92,12 @@ } public void testStartABC_KillLeader_RandomTrans() throws Exception { - if(BigdataStatics.runKnownBadTests)return; + if (!BigdataStatics.runKnownBadTests) { + /* + * FIXME Test disabled for the 1.3.0 release. + */ + return; + } fail("Test disabled pending reconcilation of socket ticket"); final Random r = new Random(); final int ntrans = r.nextInt(900); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-12-18 15:21:37 UTC (rev 7674) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-12-18 16:41:11 UTC (rev 7675) @@ -35,6 +35,7 @@ import org.apache.zookeeper.ZooKeeper; +import com.bigdata.BigdataStatics; import com.bigdata.ha.HACommitGlue; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; @@ -1614,12 +1615,16 @@ * service) will accept new transactions. There could be a race here between * a fully met quorum and new transactions, but eventually all services will * be joined with the met quorum and at the same commit point. - * - * FIXME This test fails. Figure out why. */ public void testABC_stopZookeeper_failA_startZookeeper_quorumMeetsAgainOnNewToken() throws Exception { - + if (!BigdataStatics.runKnownBadTests) { + /* + * FIXME Test disabled for the 1.3.0 release. This test fails. + * Figure out why. + */ + return; + } final HAGlue serverA = startA(); final HAGlue serverB = startB(); final HAGlue serverC = startC(); @@ -1735,11 +1740,19 @@ * transaction. This should push A into an error state. When we restart * zookeeper the quorum should break and then reform (in some arbitrary) * order. Services should be at the last recorded commit point. New - * transactions should be accepted. FIXME Test fails. Figure out why. + * transactions should be accepted. */ public void testAB_stopZookeeper_failB_startZookeeper_quorumBreaksThenMeets() throws Exception { + if (!BigdataStatics.runKnownBadTests) { + /* + * FIXME Test disabled for the 1.3.0 release. This test fails. + * Figure out why. + */ + return; + } + final HAGlue serverA = startA(); final HAGlue serverB = startB(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-18 15:21:45
|
Revision: 7674 http://bigdata.svn.sourceforge.net/bigdata/?rev=7674&view=rev Author: thompsonbry Date: 2013-12-18 15:21:37 +0000 (Wed, 18 Dec 2013) Log Message: ----------- Added logic in the resetPipeline implementation to await a pipeline change (up to a timeout) such that the problem service is no longer a neighbor of a given service. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java 2013-12-18 15:03:44 UTC (rev 7673) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java 2013-12-18 15:21:37 UTC (rev 7674) @@ -23,6 +23,8 @@ */ package com.bigdata.ha; +import java.util.UUID; + public class HAPipelineResetRequest implements IHAPipelineResetRequest { /** @@ -31,11 +33,16 @@ private static final long serialVersionUID = 1L; private long token; - - public HAPipelineResetRequest(final long token) { + private UUID problemServiceId; + private long timeoutNanos; + + public HAPipelineResetRequest(final long token, + final UUID problemServiceId, final long timeoutNanos) { this.token = token; + this.problemServiceId = problemServiceId; + this.timeoutNanos = timeoutNanos; } - + @Override public long token() { return token; @@ -43,7 +50,18 @@ @Override public String toString() { - return super.toString() + "{token=" + token + "}"; + return super.toString() + "{token=" + token + ", problemServiceId=" + + problemServiceId + ", timeoutNanos=" + timeoutNanos + "}"; } + + @Override + public UUID getProblemServiceId() { + return problemServiceId; + } + + @Override + public long getTimeoutNanos() { + return timeoutNanos; + } } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java 2013-12-18 15:03:44 UTC (rev 7673) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java 2013-12-18 15:21:37 UTC (rev 7674) @@ -23,6 +23,8 @@ */ package com.bigdata.ha; +import java.util.UUID; + import com.bigdata.ha.msg.IHAMessage; /** @@ -36,5 +38,21 @@ * The quorum token in effect on the leader when this request was generated. */ long token(); + + /** + * The {@link UUID} of the service that the leader has forced from the + * pipeline + * + * @return The {@link UUID} of the problem service -or- <code>null</code> if + * the leader did not identify a problem service. + */ + UUID getProblemServiceId(); + /** + * How long to await the state where the problem service is no longer part + * of the write pipeline for a service that is upstream or downstream of the + * problem service. + */ + long getTimeoutNanos(); + } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-18 15:03:44 UTC (rev 7673) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-18 15:21:37 UTC (rev 7674) @@ -46,7 +46,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -229,6 +229,12 @@ */ private final ReentrantLock lock = new ReentrantLock(); + /** + * Condition signalled when the write replication pipeline has been changed + * (either the upstream and/or downstream service was changed). + */ + private final Condition pipelineChanged = lock.newCondition(); + /** send service (iff this is the leader). */ private HASendService sendService; @@ -595,6 +601,10 @@ super.pipelineChange(oldDownStreamId, newDownStreamId); lock.lock(); try { + if (oldDownStreamId == newDownStreamId) { + // Nothing to do (both null or same UUID reference). + return; + } if (oldDownStreamId != null && newDownStreamId != null && oldDownStreamId.equals(newDownStreamId)) { /* @@ -604,8 +614,10 @@ return; } // The address of the next service in the pipeline. - final InetSocketAddress addrNext = newDownStreamId == null ? null - : getAddrNext(newDownStreamId); +// final InetSocketAddress addrNext = getAddrNext(newDownStreamId); + final PipelineState<S> nextState = getAddrNext(newDownStreamId); + final InetSocketAddress addrNext = nextState == null ? null + : nextState.addr; if (log.isInfoEnabled()) log.info("oldDownStreamId=" + oldDownStreamId + ",newDownStreamId=" + newDownStreamId @@ -636,7 +648,10 @@ receiveService.changeDownStream(addrNext); } // populate and/or clear the cache. - cachePipelineState(newDownStreamId); + pipelineStateRef.set(nextState); +// cachePipelineState(newDownStreamId); + // Signal pipeline change. + pipelineChanged.signalAll(); if (log.isDebugEnabled()) log.debug("pipelineChange - done."); } finally { @@ -657,6 +672,8 @@ if (log.isInfoEnabled()) log.info("receiveService=" + receiveService); receiveService.changeUpStream(); + // Signal pipeline change. + pipelineChanged.signalAll(); } } finally { lock.unlock(); @@ -688,7 +705,7 @@ * * @return It's {@link InetSocketAddress} */ - private InetSocketAddress getAddrNext(final UUID downStreamId) { + private PipelineState<S> getAddrNext(final UUID downStreamId) { if (downStreamId == null) return null; @@ -697,9 +714,10 @@ try { - final InetSocketAddress addrNext = service.getWritePipelineAddr(); + final InetSocketAddress addrNext = service + .getWritePipelineAddr(); - return addrNext; + return new PipelineState<S>(service, addrNext); } catch (IOException e) { @@ -757,50 +775,52 @@ } // clear cache. pipelineStateRef.set(null); + // Signal pipeline change. + pipelineChanged.signalAll(); } finally { lock.unlock(); } } - /** - * Populate or clear the {@link #pipelineState} cache. - * <p> - * Note: The only times we need to populate the {@link #pipelineState} are - * in response to a {@link #pipelineChange(UUID, UUID)} event or in response - * to message a {@link #pipelineElectedLeader()} event. - * - * @param downStreamId - * The downstream service {@link UUID}. - */ - private void cachePipelineState(final UUID downStreamId) { - - if (downStreamId == null) { - - pipelineStateRef.set(null); - - return; - - } - - final S nextService = member.getService(downStreamId); - - final PipelineState<S> pipelineState = new PipelineState<S>(); - - try { - - pipelineState.addr = nextService.getWritePipelineAddr(); - - } catch (IOException e) { - - throw new RuntimeException(e); - - } - - pipelineState.service = nextService; - - pipelineStateRef.set(pipelineState); - - } +// /** +// * Populate or clear the {@link #pipelineState} cache. +// * <p> +// * Note: The only times we need to populate the {@link #pipelineState} are +// * in response to a {@link #pipelineChange(UUID, UUID)} event or in response +// * to message a {@link #pipelineElectedLeader()} event. +// * +// * @param downStreamId +// * The downstream service {@link UUID}. +// */ +// private void cachePipelineState(final UUID downStreamId) { +// +// if (downStreamId == null) { +// +// pipelineStateRef.set(null); +// +// return; +// +// } +// +// final S nextService = member.getService(downStreamId); +// +// final PipelineState<S> pipelineState = new PipelineState<S>(); +// +// try { +// +// pipelineState.addr = nextService.getWritePipelineAddr(); +// +// } catch (IOException e) { +// +// throw new RuntimeException(e); +// +// } +// +// pipelineState.service = nextService; +// +// pipelineStateRef.set(pipelineState); +// +// } /** * Setup the send service. @@ -825,15 +845,20 @@ * handle downstreamId != null conditionally. */ final UUID downstreamId = member.getDownstreamServiceId(); - if (downstreamId != null) { - // The address of the next service in the pipeline. - final InetSocketAddress addrNext = member.getService( - downstreamId).getWritePipelineAddr(); + // The address of the next service in the pipeline. + final PipelineState<S> nextState = getAddrNext(downstreamId); + if (nextState != null) { +// // The address of the next service in the pipeline. +// final InetSocketAddress addrNext = member.getService( +// downstreamId).getWritePipelineAddr(); // Start the send service. - sendService.start(addrNext); + sendService.start(nextState.addr); } // populate and/or clear the cache. - cachePipelineState(downstreamId); + pipelineStateRef.set(nextState); +// cachePipelineState(downstreamId); + // Signal pipeline change. + pipelineChanged.signalAll(); } catch (Throwable t) { try { tearDown(); @@ -865,11 +890,16 @@ final InetSocketAddress addrSelf = member.getService() .getWritePipelineAddr(); // Address of the downstream service (if any). - final InetSocketAddress addrNext = downstreamId == null ? null - : member.getService(downstreamId).getWritePipelineAddr(); +// final InetSocketAddress addrNext = downstreamId == null ? null +// : member.getService(downstreamId).getWritePipelineAddr(); +// final InetSocketAddress addrNext = getAddrNext(downstreamId); + final PipelineState<S> nextServiceState = getAddrNext(downstreamId); + final InetSocketAddress addrNext = nextServiceState == null ? null + : nextServiceState.addr; // Setup the receive service. - receiveService = new HAReceiveService<HAMessageWrapper>(addrSelf, - addrNext, new IHAReceiveCallback<HAMessageWrapper>() { + receiveService = new HAReceiveService<HAMessageWrapper>( + addrSelf, addrNext, + new IHAReceiveCallback<HAMessageWrapper>() { @Override public void callback(final HAMessageWrapper msg, final ByteBuffer data) throws Exception { @@ -892,6 +922,8 @@ // Start the receive service - will not return until service is // running receiveService.start(); + // Signal pipeline change. + pipelineChanged.signalAll(); } catch (Throwable t) { /* * Always tear down if there was a setup problem to avoid leaking @@ -1251,21 +1283,92 @@ } + /** + * If there is an identified problem service and that service is either + * our upstream or downstream service, then we need to wait until we + * observe a pipeline change event such that it is no longer our + * upstream or downstream service. Otherwise we can go ahead and reset + * our pipeline. + * + * TODO What if this service is the problem service? + */ + private boolean isProblemServiceOurNeighbor() { + + final UUID psid = req.getProblemServiceId(); + + if (psid == null) { + + return false; + + } + + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + + if (psid.equals(priorAndNext[0])) + return true; + + if (psid.equals(priorAndNext[1])) + return true; + + return false; + + } + private IHAPipelineResetResponse doRunWithLock() throws Exception { log.warn("Will reset pipeline: req=" + req); - // tear down send and/or receive services. - innerEventHandler.tearDown(); + final long begin = System.nanoTime(); + final long timeout = req.getTimeoutNanos(); + long remaining = timeout; - // The current pipeline order. - final UUID[] pipelineOrder = member.getQuorum().getPipeline(); - // The index of this service in the pipeline order. - final int index = getIndex(serviceId, pipelineOrder); - if (index == 0) { - innerEventHandler.setUpSendService(); - } else if (index > 0) { - innerEventHandler.setUpReceiveService(); + if (isProblemServiceOurNeighbor()) { + + log.warn("Problem service is our neighbor."); + + do { + + pipelineChanged.await(remaining, TimeUnit.NANOSECONDS); + + // remaining = timeout - elapsed + remaining = timeout - (begin - System.nanoTime()); + + } while (isProblemServiceOurNeighbor() && remaining > 0); + + if (isProblemServiceOurNeighbor()) { + + /* + * Timeout elapsed. + * + * Note: This could be a false timeout, e.g., the problem + * service left and re-entered and is still our neighbor. + * However, the leader will just log and ignore the problem. + * If the pipeline is working again, then all is good. If + * not, then it will force out the problem service and reset + * the pipeline again. + */ + throw new TimeoutException(); + + } + + } else { + + log.warn("Problem service is not our neighbor."); + + // tear down send and/or receive services. + innerEventHandler.tearDown(); + + // The current pipeline order. + final UUID[] pipelineOrder = member.getQuorum().getPipeline(); + // The index of this service in the pipeline order. + final int index = getIndex(serviceId, pipelineOrder); + if (index == 0) { + innerEventHandler.setUpSendService(); + } else if (index > 0) { + innerEventHandler.setUpReceiveService(); + } + } return new HAPipelineResetResponse(); @@ -1835,7 +1938,12 @@ /* * Await the Futures, but spend more time waiting on the * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. + * second. Timeouts are ignored during this loop - they + * are used to let us wait longer on the local Future + * than on the remote Future. ExecutionExceptions are + * also ignored. We want to continue this loop until + * both Futures are done. Interrupts are not trapped, so + * an interrupt will still exit the loop. */ while (!futSnd.isDone() || !futRec.isDone()) { /* @@ -1846,16 +1954,24 @@ try { futSnd.get(1L, TimeUnit.SECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } try { futRec.get(10L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } } - // Note: Both futures are DONE at this point! + /* + * Note: Both futures are DONE at this point. However, + * we want to check the remote Future for the downstream + * service first in order to accurately report the + * service that was the source of a pipeline replication + * problem. + */ + futRec.get(); futSnd.get(); - futRec.get(); } finally { if (!futRec.isDone()) { @@ -1989,8 +2105,8 @@ } catch (Throwable e) { - // Log and continue. - log.error("Problem on reset pipeline", e); + // Log and continue. Details are logged by resetPipeline(). + log.warn("Problem(s) on reset pipeline: " + e); if(InnerCause.isInnerCause(e, InterruptedException.class)) { // Propagate interrupt. @@ -2046,7 +2162,9 @@ member.assertLeader(token); - final IHAPipelineResetRequest msg = new HAPipelineResetRequest(token); + // TODO Configure timeout on HAJournalServer. + final IHAPipelineResetRequest msg = new HAPipelineResetRequest(token, + problemServiceId, TimeUnit.MILLISECONDS.toNanos(5000)); /* * To minimize latency, we first submit the futures for the other @@ -2400,7 +2518,12 @@ /* * Await the Futures, but spend more time waiting on the * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. + * second. Timeouts are ignored during this loop - they + * are used to let us wait longer on the local Future + * than on the remote Future. ExecutionExceptions are + * also ignored. We want to continue this loop until + * both Futures are done. Interrupts are not trapped, so + * an interrupt will still exit the loop. */ while (!futRec.isDone() || !futRep.isDone()) { /* @@ -2414,17 +2537,25 @@ try { futRec.get(1L, TimeUnit.SECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } try { futRep.get(10L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } } - // Note: Both futures are DONE at this point! + /* + * Note: Both futures are DONE at this point. However, + * we want to check the remote Future for the downstream + * service first in order to accurately report the + * service that was the source of a pipeline replication + * problem. + */ futRec.get(); futRep.get(); - + } finally { if (!futRep.isDone()) { // cancel remote Future unless done. @@ -2522,10 +2653,17 @@ */ public S service; + /** Deserialization constructor. */ + @SuppressWarnings("unused") public PipelineState() { } + public PipelineState(final S service, final InetSocketAddress addr) { + this.service = service; + this.addr = addr; + } + @SuppressWarnings("unchecked") public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-18 15:03:51
|
Revision: 7673 http://bigdata.svn.sourceforge.net/bigdata/?rev=7673&view=rev Author: thompsonbry Date: 2013-12-18 15:03:44 +0000 (Wed, 18 Dec 2013) Log Message: ----------- removed double increment of nwritten in HASendService Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-17 19:24:45 UTC (rev 7672) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-18 15:03:44 UTC (rev 7673) @@ -691,7 +691,13 @@ */ final int nbytes; - if (false || log.isDebugEnabled()) { // FIXME add debug latency + if (false || log.isDebugEnabled()) { + /* + * Debug only code. This breaks down the payload into + * small packets and adds some latency between them as + * well. This models what is otherwise a less common, + * but more stressful, pattern. + */ final int limit = data.limit(); if (data.position() < (limit - 50000)) { data.limit(data.position() + 50000); @@ -712,21 +718,11 @@ nwritten += nbytes; } - nwritten += nbytes; - if (log.isTraceEnabled()) log.trace("Sent " + nbytes + " bytes with " + nwritten + " of out " + remaining + " written so far"); } - - /* - * The ACK by the receiver divides the HASend requests into - * distinct operations. Without this handshaking, the next - * available payload would be on the way as soon as the last - * byte of the current payload was written. - */ -// awaitAck(socketChannel); } finally { @@ -747,145 +743,6 @@ } -// /** -// * -// * @param socketChannel -// * @throws IOException -// */ -// private void awaitAck(final SocketChannel socketChannel) -// throws IOException { -// -// log.debug("Awaiting (N)ACK"); -// -// // FIXME Optimize. -// final ByteBuffer b = ByteBuffer.wrap(new byte[] { -1 }); -// -// while (socketChannel.isOpen()) { -// -// final int nread = socketChannel.read(b); -// -// if (nread == 1) { -// -// final byte ret = b.array()[0]; -// -// if (ret == ACK) { -// -// // Received ACK. -// log.debug("ACK"); -// return; -// -// } -// -// log.error("NACK"); -// return; -// -// } -// -// throw new IOException("Expecting ACK, not " + nread + " bytes"); -// -// } -// -// // channel is closed. -// throw new AsynchronousCloseException(); -// -//// /* -//// * We should now have parameters ready in the WriteMessage and can -//// * begin transferring data from the stream to the writeCache. -//// */ -//// final long begin = System.currentTimeMillis(); -//// long mark = begin; -//// -//// // #of bytes remaining (to be received). -//// int rem = b.remaining(); -//// -//// // End of stream flag. -//// boolean EOS = false; -//// -//// // for debug retain number of low level reads -//// int reads = 0; -//// -//// while (rem > 0 && !EOS) { -//// -//// // block up to the timeout. -//// final int nkeys = client.clientSelector.select(10000/* ms */); -//// -//// if (nkeys == 0) { -//// -//// /* -//// * Nothing available. -//// */ -//// -//// // time since last mark. -//// final long now = System.currentTimeMillis(); -//// final long elapsed = now - mark; -//// -//// if (elapsed > 10000) { -//// // Issue warning if we have been blocked for a while. -//// log.warn("Blocked: awaiting " + rem + " out of " -//// + message.getSize() + " bytes."); -//// mark = now;// reset mark. -//// } -//// -//// if (!client.client.isOpen() -//// || !client.clientSelector.isOpen()) { -//// -//// /* -//// * The channel has been closed. The request must be -//// * failed. TODO Or set EOF:=true? -//// * -//// * Note: The [callback] is NOT notified. The service -//// * that issued the RMI request to this service to -//// * receive the payload over the HAReceivedService will -//// * see this exception thrown back across the RMI -//// * request. -//// * -//// * @see HAReceiveService.receiveData(). -//// */ -//// -//// throw new AsynchronousCloseException(); -//// -//// } -//// -//// // no keys. nothing to read. -//// continue; -//// -//// } -//// -//// final Set<SelectionKey> keys = client.clientSelector -//// .selectedKeys(); -//// -//// final Iterator<SelectionKey> iter = keys.iterator(); -//// -//// while (iter.hasNext()) { -//// -//// iter.next(); -//// iter.remove(); -//// -//// final int rdlen = client.client.read(b); -//// -//// if (log.isTraceEnabled()) -//// log.trace("Read " + rdlen + " bytes of " -//// + (rdlen > 0 ? rem - rdlen : rem) -//// + " bytes remaining."); -//// -//// if (rdlen > 0) { -//// reads++; -//// } -//// -//// if (rdlen == -1) { -//// // The stream is closed? -//// EOS = true; -//// break; -//// } -//// -//// rem -= rdlen; -//// -//// } -//// -//// } // while( rem > 0 ) -// -// } - } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-17 19:24:51
|
Revision: 7672 http://bigdata.svn.sourceforge.net/bigdata/?rev=7672&view=rev Author: thompsonbry Date: 2013-12-17 19:24:45 +0000 (Tue, 17 Dec 2013) Log Message: ----------- The resetPipeline() code now takes the outer lock and then tears down the send/receive service and restarts them. This does not clear up the problem, especially when the HASendService is sending 50k packets. I tried replacing the sendLock with a semaphore to get more confidence that were not yielding the lock and allowing more than one send at a time. This does not change anything. We need to drill down further on the behavior when the send/receive services are torn down and the setup again and how this is interacting with the timing of the forceRemoveService() and the re-replication of the message and payload from the leader. I have not yet drilled down on the logs in any depth for this version of the code. I expect that there is an interaction between the timing of those different activities such that we get a repeated series of resetPipeline() requests rather than actually recovering a pipeline in a known good order and clean slate condition. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 19:11:24 UTC (rev 7671) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 19:24:45 UTC (rev 7672) @@ -41,6 +41,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -594,6 +595,14 @@ super.pipelineChange(oldDownStreamId, newDownStreamId); lock.lock(); try { + if (oldDownStreamId != null && newDownStreamId != null + && oldDownStreamId.equals(newDownStreamId)) { + /* + * Nothing to do. The pipeline is already configured + * correctly. + */ + return; + } // The address of the next service in the pipeline. final InetSocketAddress addrNext = newDownStreamId == null ? null : getAddrNext(newDownStreamId); @@ -1222,7 +1231,6 @@ private class ResetPipelineTaskImpl implements Callable<IHAPipelineResetResponse> { - @SuppressWarnings("unused") private final IHAPipelineResetRequest req; public ResetPipelineTaskImpl(final IHAPipelineResetRequest req) { @@ -1234,20 +1242,30 @@ @Override public IHAPipelineResetResponse call() throws Exception { - // FIXME Versus using the inner handler? -// innerEventHandler.pipelineUpstreamChange(); -// innerEventHandler.pipelineChange(oldDownStreamId, newDownStreamId); - log.warn("Will reset pipeline"); - if (receiveService != null) { + lock.lock(); + try { + return doRunWithLock(); + } finally { + lock.unlock(); + } + + } - receiveService.changeUpStream(); + private IHAPipelineResetResponse doRunWithLock() throws Exception { - } + log.warn("Will reset pipeline: req=" + req); - if (sendService != null) { + // tear down send and/or receive services. + innerEventHandler.tearDown(); - sendService.closeChannel(); - + // The current pipeline order. + final UUID[] pipelineOrder = member.getQuorum().getPipeline(); + // The index of this service in the pipeline order. + final int index = getIndex(serviceId, pipelineOrder); + if (index == 0) { + innerEventHandler.setUpSendService(); + } else if (index > 0) { + innerEventHandler.setUpReceiveService(); } return new HAPipelineResetResponse(); @@ -1748,14 +1766,14 @@ private final PipelineState<S> downstream; private final HASendService sendService; private final QuorumPipelineImpl<S> outerClass; - private final Lock sendLock; + private final Semaphore sendLock; public SendBufferTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, final HASendService sendService, - final QuorumPipelineImpl<S> outerClass, final Lock sendLock) { + final QuorumPipelineImpl<S> outerClass, final Semaphore sendLock) { this.member = member; this.token = token; @@ -1778,7 +1796,7 @@ * write pipeline at a time. */ - sendLock.lock(); + sendLock.acquire(); try { @@ -1788,7 +1806,7 @@ } finally { - sendLock.unlock(); + sendLock.release(); } @@ -1920,25 +1938,29 @@ problemServiceId = priorAndNext[1]; - member.getActor().forceRemoveService(problemServiceId); - } else if (remoteCause != null) { problemServiceId = remoteCause.getProblemServiceId(); - member.getActor().forceRemoveService(problemServiceId); - } else { // Do not remove anybody. } + + if (problemServiceId != null) { + + // Force out the problem service. + member.getActor().forceRemoveService(problemServiceId); + + } } catch (Throwable e) { // Log and continue. - log.error("Problem on node removal", e); - + log.error("Problem on force remove: problemServiceId=" + + problemServiceId, e); + if(InnerCause.isInnerCause(e, InterruptedException.class)) { // Propagate interrupt. Thread.currentThread().interrupt(); @@ -2009,7 +2031,7 @@ */ private void resetPipeline(final long token, final UUID problemServiceId) { - log.error("Leader will reset pipeline: " + token); + log.error("Leader will reset pipeline: token=" + token+", problemServiceId=" + problemServiceId); /* * We will only message the services that are in the pipeline. @@ -2134,7 +2156,8 @@ * Lock used to ensure that at most one message is being sent along the * write pipeline at a time. */ - private final Lock sendLock = new ReentrantLock(); +// private final Lock sendLock = new ReentrantLock(); + private final Semaphore sendLock = new Semaphore(1/*permits*/); @Override public Future<Void> receiveAndReplicate(final IHASyncRequest req, This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-17 19:11:30
|
Revision: 7671 http://bigdata.svn.sourceforge.net/bigdata/?rev=7671&view=rev Author: thompsonbry Date: 2013-12-17 19:11:24 +0000 (Tue, 17 Dec 2013) Log Message: ----------- Removed public closeChannel() method. We can probably use the existing mechanisms in QuorumPipelineImpl to tearDown() and setup the sendService and receiveService. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-17 19:09:41 UTC (rev 7670) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-17 19:11:24 UTC (rev 7671) @@ -269,14 +269,14 @@ } } - /** - * Close the {@link SocketChannel} to the downsteam service (blocking). - */ - public void closeChannel() { - synchronized (this.socketChannel) { - closeSocketChannelNoBlock(); - } - } +// /** +// * Close the {@link SocketChannel} to the downsteam service (blocking). +// */ +// public void closeChannel() { +// synchronized (this.socketChannel) { +// closeSocketChannelNoBlock(); +// } +// } /** * Close the {@link SocketChannel} to the downstream service (non-blocking). This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-17 19:09:48
|
Revision: 7670 http://bigdata.svn.sourceforge.net/bigdata/?rev=7670&view=rev Author: thompsonbry Date: 2013-12-17 19:09:41 +0000 (Tue, 17 Dec 2013) Log Message: ----------- toString() prettyup. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-17 18:03:37 UTC (rev 7669) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-17 19:09:41 UTC (rev 7670) @@ -121,7 +121,7 @@ return super.toString() + "{messageId=" + messageId + ",originalSenderId=" + originalSenderId + ",senderId=" - + senderId + ",token=" + token + ", replicationFactor=" + + senderId + ",token=" + token + ",replicationFactor=" + replicationFactor + "}"; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-17 18:03:44
|
Revision: 7669 http://bigdata.svn.sourceforge.net/bigdata/?rev=7669&view=rev Author: thompsonbry Date: 2013-12-17 18:03:37 +0000 (Tue, 17 Dec 2013) Log Message: ----------- Bug fix to code looking for root cause of interrupted exception in launder throwable. It was examining the wrong stack trace. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 18:00:44 UTC (rev 7668) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 18:03:37 UTC (rev 7669) @@ -1939,7 +1939,7 @@ // Log and continue. log.error("Problem on node removal", e); - if(InnerCause.isInnerCause(t, InterruptedException.class)) { + if(InnerCause.isInnerCause(e, InterruptedException.class)) { // Propagate interrupt. Thread.currentThread().interrupt(); } @@ -1970,7 +1970,7 @@ // Log and continue. log.error("Problem on reset pipeline", e); - if(InnerCause.isInnerCause(t, InterruptedException.class)) { + if(InnerCause.isInnerCause(e, InterruptedException.class)) { // Propagate interrupt. Thread.currentThread().interrupt(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-17 18:00:50
|
Revision: 7668 http://bigdata.svn.sourceforge.net/bigdata/?rev=7668&view=rev Author: thompsonbry Date: 2013-12-17 18:00:44 +0000 (Tue, 17 Dec 2013) Log Message: ----------- Sync to martyn. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 17:27:53 UTC (rev 7667) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 18:00:44 UTC (rev 7668) @@ -1237,7 +1237,7 @@ // FIXME Versus using the inner handler? // innerEventHandler.pipelineUpstreamChange(); // innerEventHandler.pipelineChange(oldDownStreamId, newDownStreamId); - + log.warn("Will reset pipeline"); if (receiveService != null) { receiveService.changeUpStream(); @@ -1755,7 +1755,7 @@ final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, final HASendService sendService, - final QuorumPipelineImpl outerClass, final Lock sendLock) { + final QuorumPipelineImpl<S> outerClass, final Lock sendLock) { this.member = member; this.token = token; @@ -1873,7 +1873,7 @@ */ static private <S extends HAPipelineGlue> void launderPipelineException( final boolean isLeader, final long token, - final QuorumMember<S> member, final QuorumPipelineImpl outerClass, + final QuorumMember<S> member, final QuorumPipelineImpl<S> outerClass, final Throwable t) { log.warn("isLeader=" + isLeader + ", t=" + t, t); @@ -1904,23 +1904,29 @@ member.getServiceId()); if (isLeader) { + + // The problem service (iff identified). + UUID problemServiceId = null; try { - + /* * If we can identify the problem service, then force it out of * the pipeline. It can re-enter the pipeline once it * transitions through its ERROR state. */ + if (directCause != null) { - member.getActor().forceRemoveService(priorAndNext[1]); + problemServiceId = priorAndNext[1]; + + member.getActor().forceRemoveService(problemServiceId); } else if (remoteCause != null) { - final UUID problemService = remoteCause.getProblemServiceId(); + problemServiceId = remoteCause.getProblemServiceId(); - member.getActor().forceRemoveService(problemService); + member.getActor().forceRemoveService(problemServiceId); } else { @@ -1928,28 +1934,46 @@ } - /** - * Reset the pipeline on each service (including the leader). If - * replication fails, the socket connections both upstream and - * downstream of the point of failure can be left in an - * indeterminate state with partially buffered data. In order to - * bring the pipeline back into a known state (without forcing a - * quorum break) we message each service in the pipeline to - * reset its HAReceiveService (including the inner - * HASendService. The next message and payload relayed from the - * leader will cause new socket connections to be established. - * - * @see <a - * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" - * > HA Wire Pulling and Sudden Kills </a> - */ - outerClass.resetPipeline(token); + } catch (Throwable e) { + + // Log and continue. + log.error("Problem on node removal", e); - } catch (Exception e) { + if(InnerCause.isInnerCause(t, InterruptedException.class)) { + // Propagate interrupt. + Thread.currentThread().interrupt(); + } - log.error("Problem on node removal", e); + } - throw new RuntimeException(e); + /** + * Reset the pipeline on each service (including the leader). If + * replication fails, the socket connections both upstream and + * downstream of the point of failure can be left in an + * indeterminate state with partially buffered data. In order to + * bring the pipeline back into a known state (without forcing a + * quorum break) we message each service in the pipeline to reset + * its HAReceiveService (including the inner HASendService. The next + * message and payload relayed from the leader will cause new socket + * connections to be established. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" + * > HA Wire Pulling and Sudden Kills </a> + */ + try { + + outerClass.resetPipeline(token, problemServiceId); + + } catch (Throwable e) { + + // Log and continue. + log.error("Problem on reset pipeline", e); + + if(InnerCause.isInnerCause(t, InterruptedException.class)) { + // Propagate interrupt. + Thread.currentThread().interrupt(); + } } @@ -1980,9 +2004,10 @@ * * @param token * The quorum token on the leader. - * @param member + * @param problemServiceId + * The problem service in the write pipeline (if known). */ - private void resetPipeline(final long token) { + private void resetPipeline(final long token, final UUID problemServiceId) { log.error("Leader will reset pipeline: " + token); @@ -2019,7 +2044,7 @@ */ final Future<IHAPipelineResetResponse> rf = member .getExecutor().submit( - new PipelineResetMessageTask(member, + new PipelineResetMessageTask<S>(member, serviceId, msg)); // add to list of futures we will check. @@ -2067,10 +2092,6 @@ /* * If there were any errors, then throw an exception listing them. - * - * TODO But only throw an exception for the joined services. - * Non-joined services, we just long an error (or simply do not tell - * them to do an abort()). */ if (!causes.isEmpty()) { // Throw exception back to the leader. Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-17 17:27:53 UTC (rev 7667) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-17 18:00:44 UTC (rev 7668) @@ -1117,7 +1117,7 @@ public Future<IHAPipelineResetResponse> resetPipeline( final IHAPipelineResetRequest req) throws IOException { - checkMethod("resetPipeline", new Class[] {}); + checkMethod("resetPipeline", new Class[] {IHAPipelineResetRequest.class}); return super.resetPipeline(req); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-17 17:28:04
|
Revision: 7667 http://bigdata.svn.sourceforge.net/bigdata/?rev=7667&view=rev Author: thompsonbry Date: 2013-12-17 17:27:53 +0000 (Tue, 17 Dec 2013) Log Message: ----------- Added a resetPipeline() method to HAPipelineGlue. This is intended to reset each service that is in the pipeline when the leader launders a pipeline exception. See #724 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServer.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/MockQuorumMember.java Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/AbstractMessageTask.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetResponse.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetResponse.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/ServiceLookup.java Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/AbstractMessageTask.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/AbstractMessageTask.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/AbstractMessageTask.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,87 @@ +package com.bigdata.ha; + +import java.io.IOException; +import java.rmi.Remote; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +import com.bigdata.ha.msg.IHAMessage; +import com.bigdata.quorum.ServiceLookup; + +/** + * Helper class submits the RMI for a PREPARE, COMMIT, or ABORT message. This is + * used to execute the different requests in parallel on a local executor + * service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +abstract class AbstractMessageTask<S extends Remote, T, M extends IHAMessage> + implements Callable<T> { + + private final ServiceLookup<S> serviceLookup; + private final UUID serviceId; + protected final M msg; + + public AbstractMessageTask(final ServiceLookup<S> serviceLookup, + final UUID serviceId, final M msg) { + + this.serviceLookup = serviceLookup; + + this.serviceId = serviceId; + + this.msg = msg; + + } + + @Override + final public T call() throws Exception { + + /* + * Note: This code MAY be interrupted at any point if the Future for the + * task is cancelled. If it is interrupted during the RMI, then the + * expectation is that the NIO will be interrupted in a timely manner + * throwing back some sort of IOException indicating the asynchronous + * close of the IO channel or cancel of the RMI. + */ + + // Resolve proxy for remote service. + final S service = serviceLookup.getService(serviceId); + + // RMI. + final Future<T> ft = doRMI(service); + + try { + + /* + * Await the inner Future for the RMI. + * + * Note: In fact, this is a ThickFuture so it is already done by the + * time the RMI returns. + */ + + return ft.get(); + + } finally { + + ft.cancel(true/* mayInterruptIfRunning */); + + } + + } + + /** + * Invoke the specific RMI using the message supplied to the constructor. + * + * @param service + * The service (resolved from the service {@link UUID} supplied + * to the constructor). + * + * @return The result of submitting that RMI to the remote service. + * + * @throws IOException + * if there is a problem with the RMI. + */ + abstract protected Future<T> doRMI(final S service) throws IOException; + +} \ No newline at end of file Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -43,6 +43,8 @@ import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.ha.msg.IHAWriteSetStateRequest; import com.bigdata.ha.msg.IHAWriteSetStateResponse; +import com.bigdata.ha.pipeline.HAReceiveService; +import com.bigdata.ha.pipeline.HASendService; import com.bigdata.io.writecache.WriteCache; import com.bigdata.journal.WriteExecutorService; import com.bigdata.service.proxy.ThickFuture; @@ -121,6 +123,26 @@ Future<Void> moveToEndOfPipeline() throws IOException; /** + * Reset the pipeline (blocking). This message is used to handle an error in + * pipeline replication. If replication fails, the socket connections both + * upstream and downstream of the point of failure can be left in an + * indeterminate state with partially buffered data. In order to bring the + * pipeline back into a known state (without forcing a quorum break) we + * message each service in the pipeline to reset its + * {@link HAReceiveService} (including the inner {@link HASendService}). The + * next message and payload relayed from the leader will cause new socket + * connections to be established. + * + * @param msg The request. + * + * @return The {@link Future} for the operation on the remote service. + * + * @throws IOException + */ + Future<IHAPipelineResetResponse> resetPipeline(IHAPipelineResetRequest req) + throws IOException; + + /** * Accept metadata describing an NIO buffer transfer along the write * pipeline. This method is never invoked on the master. It is only invoked * on the failover nodes, including the last node in the failover chain. Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,49 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha; + +public class HAPipelineResetRequest implements IHAPipelineResetRequest { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private long token; + + public HAPipelineResetRequest(final long token) { + this.token = token; + } + + @Override + public long token() { + return token; + } + + @Override + public String toString() { + return super.toString() + "{token=" + token + "}"; + } + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetResponse.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetResponse.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetResponse.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,33 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha; + +public class HAPipelineResetResponse implements IHAPipelineResetResponse { + + /** + * + */ + private static final long serialVersionUID = 1L; + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,40 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha; + +import com.bigdata.ha.msg.IHAMessage; + +/** + * Message requesting a pipeline reset on a service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public interface IHAPipelineResetRequest extends IHAMessage { + + /** + * The quorum token in effect on the leader when this request was generated. + */ + long token(); + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetResponse.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetResponse.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetResponse.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,35 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha; + +import com.bigdata.ha.msg.IHAMessage; + +/** + * Message reporting the outcome of a pipeline reset on a service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public interface IHAPipelineResetResponse extends IHAMessage { + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -28,7 +28,6 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -44,14 +43,12 @@ import com.bigdata.ha.msg.IHA2PhaseAbortMessage; import com.bigdata.ha.msg.IHA2PhaseCommitMessage; import com.bigdata.ha.msg.IHA2PhasePrepareMessage; -import com.bigdata.ha.msg.IHAMessage; import com.bigdata.journal.IRootBlockView; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumMember; import com.bigdata.quorum.QuorumStateChangeListener; import com.bigdata.quorum.QuorumStateChangeListenerBase; -import com.bigdata.service.proxy.ThickFuture; -import com.bigdata.util.InnerCause; +import com.bigdata.quorum.ServiceLookup; import com.bigdata.util.concurrent.ExecutionExceptions; /** @@ -59,9 +56,9 @@ */ public class QuorumCommitImpl<S extends HACommitGlue> extends QuorumStateChangeListenerBase implements QuorumCommit<S>, - QuorumStateChangeListener { + QuorumStateChangeListener, ServiceLookup<HACommitGlue> { - static private transient final Logger log = Logger + static transient final Logger log = Logger .getLogger(QuorumCommitImpl.class); private final QuorumMember<S> member; @@ -83,62 +80,13 @@ return member.getQuorum(); } - - private HACommitGlue getService(final UUID serviceId) { + public HACommitGlue getService(final UUID serviceId) { + return member.getService(serviceId); } - - /** - * Cancel the requests on the remote services (RMI). This is a best effort - * implementation. Any RMI related errors are trapped and ignored in order - * to be robust to failures in RMI when we try to cancel the futures. - * <p> - * NOte: This is not being done in parallel. However, due to a DGC thread - * leak issue, we now use {@link ThickFuture}s. Thus, the tasks that are - * being cancelled are all local tasks running on the - * {@link #executorService}. If that local task is doing an RMI, then - * cancelling it will cause an interrupt in the NIO request. - */ - private <F extends Future<T>, T> void cancelFutures(final List<F> futures) { - if (log.isInfoEnabled()) - log.info(""); - - for (F f : futures) { - - if (f == null) { - - continue; - - } - - try { - - if (!f.isDone()) { - - f.cancel(true/* mayInterruptIfRunning */); - - } - - } catch (Throwable t) { - - if (InnerCause.isInnerCause(t, InterruptedException.class)) { - - // Propagate interrupt. - Thread.currentThread().interrupt(); - - } - - // ignored (to be robust). - - } - - } - - } - /** * {@inheritDoc} * <p> @@ -270,7 +218,7 @@ * remote service. We will await this task below. */ final Future<Boolean> rf = executorService - .submit(new PrepareMessageTask(serviceId, + .submit(new PrepareMessageTask(this, serviceId, msgForJoinedService)); // add to list of futures we will check. @@ -379,7 +327,7 @@ } finally { - cancelFutures(localFutures); + QuorumServiceBase.cancelFutures(localFutures); } @@ -469,7 +417,7 @@ * remote service. */ final Future<Void> rf = executorService - .submit(new CommitMessageTask(serviceId, + .submit(new CommitMessageTask(this, serviceId, msgJoinedService)); // add to list of futures we will check. @@ -533,7 +481,7 @@ } finally { // Ensure that all futures are cancelled. - cancelFutures(localFutures); + QuorumServiceBase.cancelFutures(localFutures); } @@ -580,7 +528,7 @@ * remote service. */ final Future<Void> rf = executorService - .submit(new AbortMessageTask(serviceId, msg)); + .submit(new AbortMessageTask(this, serviceId, msg)); // add to list of futures we will check. localFutures.add(rf); @@ -643,86 +591,24 @@ } finally { // Ensure that all futures are cancelled. - cancelFutures(localFutures); + QuorumServiceBase.cancelFutures(localFutures); } } - /** - * Helper class submits the RMI for a PREPARE, COMMIT, or ABORT message. - * This is used to execute the different requests in parallel on a local - * executor service. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private abstract class AbstractMessageTask<T, M extends IHAMessage> - implements Callable<T> { + static private class PrepareMessageTask extends + AbstractMessageTask<HACommitGlue, Boolean, IHA2PhasePrepareMessage> { - private final UUID serviceId; - protected final M msg; + public PrepareMessageTask( + final ServiceLookup<HACommitGlue> serviceLookup, + final UUID serviceId, final IHA2PhasePrepareMessage msg) { - public AbstractMessageTask(final UUID serviceId, final M msg) { + super(serviceLookup, serviceId, msg); - this.serviceId = serviceId; - - this.msg = msg; - } @Override - final public T call() throws Exception { - - /* - * Note: This code MAY be interrupted at any point if the Future for - * the task is cancelled. If it is interrupted during the RMI, then - * the expectation is that the NIO will be interrupted in a timely - * manner throwing back some sort of IOException indicating the - * asynchronous close of the IO channel or cancel of the RMI. - */ - - // Resolve proxy for remote service. - final HACommitGlue service = getService(serviceId); - - // RMI. - final Future<T> ft = doRMI(service); - - try { - - /* - * Await the inner Future for the RMI. - * - * Note: In fact, this is a ThickFuture so it is already done by - * the time the RMI returns. - */ - - return ft.get(); - - } finally { - - ft.cancel(true/* mayInterruptIfRunning */); - - } - - } - - abstract protected Future<T> doRMI(final HACommitGlue service) - throws IOException; - - } - - private class PrepareMessageTask extends - AbstractMessageTask<Boolean, IHA2PhasePrepareMessage> { - - public PrepareMessageTask(final UUID serviceId, - final IHA2PhasePrepareMessage msg) { - - super(serviceId, msg); - - } - - @Override protected Future<Boolean> doRMI(final HACommitGlue service) throws IOException { @@ -732,13 +618,14 @@ } - private class CommitMessageTask extends - AbstractMessageTask<Void, IHA2PhaseCommitMessage> { + static private class CommitMessageTask extends + AbstractMessageTask<HACommitGlue, Void, IHA2PhaseCommitMessage> { - public CommitMessageTask(final UUID serviceId, - final IHA2PhaseCommitMessage msg) { + public CommitMessageTask( + final ServiceLookup<HACommitGlue> serviceLookup, + final UUID serviceId, final IHA2PhaseCommitMessage msg) { - super(serviceId, msg); + super(serviceLookup, serviceId, msg); } @@ -751,13 +638,14 @@ } - private class AbortMessageTask extends - AbstractMessageTask<Void, IHA2PhaseAbortMessage> { + static private class AbortMessageTask extends + AbstractMessageTask<HACommitGlue, Void, IHA2PhaseAbortMessage> { - public AbortMessageTask(final UUID serviceId, - final IHA2PhaseAbortMessage msg) { + public AbortMessageTask( + final ServiceLookup<HACommitGlue> serviceLookup, + final UUID serviceId, final IHA2PhaseAbortMessage msg) { - super(serviceId, msg); + super(serviceLookup, serviceId, msg); } @@ -769,5 +657,5 @@ } } - + } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -36,6 +36,8 @@ import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; +import com.bigdata.ha.pipeline.HAReceiveService; +import com.bigdata.ha.pipeline.HASendService; import com.bigdata.io.writecache.WriteCache; import com.bigdata.journal.IRootBlockView; import com.bigdata.quorum.Quorum; @@ -95,6 +97,26 @@ Future<Void> receiveAndReplicate(IHASyncRequest req, IHASendState snd, IHAWriteMessage msg) throws IOException; + /** + * Reset the pipeline (blocking). This message is used to handle an error in + * pipeline replication. If replication fails, the socket connections both + * upstream and downstream of the point of failure can be left in an + * indeterminate state with partially buffered data. In order to bring the + * pipeline back into a known state (without forcing a quorum break) we + * message each service in the pipeline to reset its + * {@link HAReceiveService} (including the inner {@link HASendService}). The + * next message and payload relayed from the leader will cause new socket + * connections to be established. + * + * @param msg The request. + * + * @return The {@link Future} for the operation on the local service. + * + * @throws IOException + */ + Future<IHAPipelineResetResponse> resetPipeline(IHAPipelineResetRequest req) + throws IOException; + /* * Note: Method removed since it does not appear necessary to let this * service out of the scope of the QuorumPipelineImpl and the send service Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -30,6 +30,8 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -70,7 +72,9 @@ import com.bigdata.quorum.QuorumStateChangeEventEnum; import com.bigdata.quorum.QuorumStateChangeListener; import com.bigdata.quorum.QuorumStateChangeListenerBase; +import com.bigdata.quorum.ServiceLookup; import com.bigdata.util.InnerCause; +import com.bigdata.util.concurrent.ExecutionExceptions; /** * {@link QuorumPipeline} implementation. @@ -1197,6 +1201,61 @@ } + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + final IHAPipelineResetRequest req) throws IOException { + + final FutureTask<IHAPipelineResetResponse> ft = new FutureTask<IHAPipelineResetResponse>( + new ResetPipelineTaskImpl(req)); + + member.getExecutor().submit(ft); + + return ft; + + } + + /** + * Task resets the pipeline on this service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + private class ResetPipelineTaskImpl implements + Callable<IHAPipelineResetResponse> { + + @SuppressWarnings("unused") + private final IHAPipelineResetRequest req; + + public ResetPipelineTaskImpl(final IHAPipelineResetRequest req) { + + this.req = req; + + } + + @Override + public IHAPipelineResetResponse call() throws Exception { + + // FIXME Versus using the inner handler? +// innerEventHandler.pipelineUpstreamChange(); +// innerEventHandler.pipelineChange(oldDownStreamId, newDownStreamId); + + if (receiveService != null) { + + receiveService.changeUpStream(); + + } + + if (sendService != null) { + + sendService.closeChannel(); + + } + + return new HAPipelineResetResponse(); + + } + + } + /* * This is the leader, so send() the buffer. */ @@ -1558,7 +1617,8 @@ final ByteBuffer b = this.b.duplicate(); new SendBufferTask<S>(member, quorumToken, req, snd, msg, b, - downstream, sendService, sendLock).call(); + downstream, sendService, QuorumPipelineImpl.this, + sendLock).call(); return; @@ -1687,13 +1747,15 @@ private final ByteBuffer b; private final PipelineState<S> downstream; private final HASendService sendService; + private final QuorumPipelineImpl<S> outerClass; private final Lock sendLock; public SendBufferTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, - final HASendService sendService, final Lock sendLock) { + final HASendService sendService, + final QuorumPipelineImpl outerClass, final Lock sendLock) { this.member = member; this.token = token; @@ -1703,6 +1765,7 @@ this.b = b; this.downstream = downstream; this.sendService = sendService; + this.outerClass = outerClass; this.sendLock = sendLock; } @@ -1785,27 +1848,33 @@ } finally { // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); // FIXME Cancel hits wrong send() payload. + futSnd.cancel(true/* mayInterruptIfRunning */); } } catch (Throwable t) { - launderPipelineException(true/* isLeader */, member, t); + launderPipelineException(true/* isLeader */, token, member, outerClass, t); } } - } + } // class SendBufferTask /** * Launder an exception thrown during pipeline replication. * * @param isLeader * <code>true</code> iff this service is the quorum leader. + * @param token + * The quorum token. * @param member * The {@link QuorumMember} for this service. + * @param outerClass + * The outer class - required for {@link #resetPipeline()}. * @param t * The throwable. */ - static private void launderPipelineException(final boolean isLeader, - final QuorumMember<?> member, final Throwable t) { + static private <S extends HAPipelineGlue> void launderPipelineException( + final boolean isLeader, final long token, + final QuorumMember<S> member, final QuorumPipelineImpl outerClass, + final Throwable t) { log.warn("isLeader=" + isLeader + ", t=" + t, t); @@ -1838,6 +1907,11 @@ try { + /* + * If we can identify the problem service, then force it out of + * the pipeline. It can re-enter the pipeline once it + * transitions through its ERROR state. + */ if (directCause != null) { member.getActor().forceRemoveService(priorAndNext[1]); @@ -1854,6 +1928,23 @@ } + /** + * Reset the pipeline on each service (including the leader). If + * replication fails, the socket connections both upstream and + * downstream of the point of failure can be left in an + * indeterminate state with partially buffered data. In order to + * bring the pipeline back into a known state (without forcing a + * quorum break) we message each service in the pipeline to + * reset its HAReceiveService (including the inner + * HASendService. The next message and payload relayed from the + * leader will cause new socket connections to be established. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" + * > HA Wire Pulling and Sudden Kills </a> + */ + outerClass.resetPipeline(token); + } catch (Exception e) { log.error("Problem on node removal", e); @@ -1880,8 +1971,145 @@ } } - + /** + * Issue concurrent requests to each service in the pipeline to reset the + * pipeline on that service. The request to the leader is executed in the + * caller's thread so it will own whatever locks the caller already owns - + * this is done to avoid deadlock. + * + * @param token + * The quorum token on the leader. + * @param member + */ + private void resetPipeline(final long token) { + + log.error("Leader will reset pipeline: " + token); + + /* + * We will only message the services that are in the pipeline. + * + * For services (other than the leader) in the quorum, submit the + * RunnableFutures to an Executor. + * + * For the leader, we do this in the caller's thread (to avoid + * possible deadlocks). + */ + final UUID[] pipelineIds = member.getQuorum().getPipeline(); + + member.assertLeader(token); + + final IHAPipelineResetRequest msg = new HAPipelineResetRequest(token); + + /* + * To minimize latency, we first submit the futures for the other + * services and then do f.run() on the leader. + */ + final List<Future<IHAPipelineResetResponse>> localFutures = new LinkedList<Future<IHAPipelineResetResponse>>(); + + try { + + for (int i = 1; i < pipelineIds.length; i++) { + + final UUID serviceId = pipelineIds[i]; + + /* + * Submit task on local executor. The task will do an RMI to the + * remote service. + */ + final Future<IHAPipelineResetResponse> rf = member + .getExecutor().submit( + new PipelineResetMessageTask(member, + serviceId, msg)); + + // add to list of futures we will check. + localFutures.add(rf); + + } + + { + /* + * Run the operation on the leader using a local method call + * (non-RMI) in the caller's thread to avoid deadlock. + */ + member.assertLeader(token); + final FutureTask<IHAPipelineResetResponse> ft = new FutureTask<IHAPipelineResetResponse>( + new ResetPipelineTaskImpl(msg)); + localFutures.add(ft); + ft.run();// run on the leader. + } + /* + * Check the futures for the other services in the quorum. + */ + final List<Throwable> causes = new LinkedList<Throwable>(); + for (Future<IHAPipelineResetResponse> ft : localFutures) { + try { + ft.get(); // TODO Timeout? + } catch (InterruptedException ex) { + log.error(ex, ex); + causes.add(ex); + } catch (ExecutionException ex) { + log.error(ex, ex); + causes.add(ex); + } catch (RuntimeException ex) { + /* + * Note: ClientFuture.get() can throw a RuntimeException + * if there is a problem with the RMI call. In this case + * we do not know whether the Future is done. + */ + log.error(ex, ex); + causes.add(ex); + } finally { + // Note: cancelling a *local* Future wrapping an RMI. + ft.cancel(true/* mayInterruptIfRunning */); + } + } + + /* + * If there were any errors, then throw an exception listing them. + * + * TODO But only throw an exception for the joined services. + * Non-joined services, we just long an error (or simply do not tell + * them to do an abort()). + */ + if (!causes.isEmpty()) { + // Throw exception back to the leader. + if (causes.size() == 1) + throw new RuntimeException(causes.get(0)); + throw new RuntimeException("remote errors: nfailures=" + + causes.size(), new ExecutionExceptions(causes)); + } + + } finally { + + // Ensure that all futures are cancelled. + QuorumServiceBase.cancelFutures(localFutures); + + } + + } + + static private class PipelineResetMessageTask<S extends HAPipelineGlue> + extends + AbstractMessageTask<S, IHAPipelineResetResponse, IHAPipelineResetRequest> { + + public PipelineResetMessageTask(final ServiceLookup<S> serviceLookup, + final UUID serviceId, final IHAPipelineResetRequest msg) { + + super(serviceLookup, serviceId, msg); + + } + + @Override + protected Future<IHAPipelineResetResponse> doRMI(final S service) + throws IOException { + + return service.resetPipeline(msg); + } + + } + + /** * Lock used to ensure that at most one message is being sent along the * write pipeline at a time. */ @@ -1974,7 +2202,7 @@ ft = new FutureTask<Void>(new ReceiveAndReplicateTask<S>( member, token, req, snd, msg, b, downstream, - receiveService)); + receiveService, QuorumPipelineImpl.this)); } @@ -2078,13 +2306,15 @@ private final ByteBuffer b; private final PipelineState<S> downstream; private final HAReceiveService<HAMessageWrapper> receiveService; + private final QuorumPipelineImpl<S> outerClass; public ReceiveAndReplicateTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, - final HAReceiveService<HAMessageWrapper> receiveService) { + final HAReceiveService<HAMessageWrapper> receiveService, + final QuorumPipelineImpl<S> outerClass) { this.member = member; this.token = token; @@ -2094,6 +2324,7 @@ this.b = b; this.downstream = downstream; this.receiveService = receiveService; + this.outerClass = outerClass; } @Override @@ -2159,10 +2390,11 @@ } finally { // cancel the local Future. - futRec.cancel(true/* mayInterruptIfRunning */); // FIXME Cancel hits wrong send() payload? + futRec.cancel(true/* mayInterruptIfRunning */); } } catch (Throwable t) { - launderPipelineException(false/* isLeader */, member, t); + launderPipelineException(false/* isLeader */, token, member, + outerClass, t); } // done return null; Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -29,6 +29,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -44,6 +45,8 @@ import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.Journal; import com.bigdata.quorum.AbstractQuorumMember; +import com.bigdata.service.proxy.ThickFuture; +import com.bigdata.util.InnerCause; /** * Abstract implementation provides the logic for distributing messages for the @@ -249,6 +252,14 @@ } + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + IHAPipelineResetRequest req) throws IOException { + + return pipelineImpl.resetPipeline(req); + + } + /** * Core implementation handles the message and payload when received on a * service. @@ -430,4 +441,54 @@ } + /** + * Cancel the requests on the remote services (RMI). This is a best effort + * implementation. Any RMI related errors are trapped and ignored in order + * to be robust to failures in RMI when we try to cancel the futures. + * <p> + * NOte: This is not being done in parallel. However, due to a DGC thread + * leak issue, we now use {@link ThickFuture}s. Thus, the tasks that are + * being cancelled are all local tasks running on the + * {@link #executorService}. If that local task is doing an RMI, then + * cancelling it will cause an interrupt in the NIO request. + */ + public static <F extends Future<T>, T> void cancelFutures( + final List<F> futures) { + + if (log.isInfoEnabled()) + log.info(""); + + for (F f : futures) { + + if (f == null) { + + continue; + + } + + try { + + if (!f.isDone()) { + + f.cancel(true/* mayInterruptIfRunning */); + + } + + } catch (Throwable t) { + + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + + // Propagate interrupt. + Thread.currentThread().interrupt(); + + } + + // ignored (to be robust). + + } + + } + + } + } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -117,9 +117,14 @@ * data to a downstream service. */ private final HASendService sendService; - - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - + + public HASendService getSendService() { + return sendService; + } + + private final ExecutorService executor = Executors + .newSingleThreadExecutor(); + // private ServerSocketChannel server; // private FutureTask<Void> readFuture; Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -258,16 +258,7 @@ return; } try { - final SocketChannel socketChannel = this.socketChannel.get(); - if (socketChannel != null) { - try { - socketChannel.close(); - } catch (IOException ex) { - log.error("Ignoring exception during close: " + ex, ex); - } finally { - this.socketChannel.set(null); - } - } + closeSocketChannelNoBlock(); } finally { // shutdown executor. tmp.shutdownNow(); @@ -279,6 +270,33 @@ } /** + * Close the {@link SocketChannel} to the downsteam service (blocking). + */ + public void closeChannel() { + synchronized (this.socketChannel) { + closeSocketChannelNoBlock(); + } + } + + /** + * Close the {@link SocketChannel} to the downstream service (non-blocking). + */ + private void closeSocketChannelNoBlock() { + final SocketChannel socketChannel = this.socketChannel.get(); + if (socketChannel != null) { + try { + socketChannel.close(); + } catch (IOException ex) { + log.error("Ignoring exception during close: " + ex, ex); + } finally { + this.socketChannel.set(null); + } + if (log.isInfoEnabled()) + log.info("Closed socket channel"); + } + } + + /** * Send the bytes {@link ByteBuffer#remaining()} in the buffer to the * configured {@link InetSocketAddress}. * <p> @@ -393,7 +411,7 @@ * A series of timeouts used when we need to re-open the * {@link SocketChannel}. */ - private final static long[] retryMillis = new long[] { 1, 5, 10, 50, 100, 250, 500 }; + private final static long[] retryMillis = new long[] { 1, 5, 10, 50, 100, 250, 250, 250, 250 }; /** * (Re-)open the {@link SocketChannel} if it is closed and this service is @@ -448,12 +466,14 @@ socketChannel.set(sc = openChannel(addrNext.get())); if (log.isInfoEnabled()) - log.info("Opened channel on try: " + tryno); + log.info("Opened channel on try: " + tryno + + ", addrNext=" + addrNext); } catch (IOException e) { if (log.isInfoEnabled()) - log.info("Failed to open channel on try: " + tryno); + log.info("Failed to open channel on try: " + tryno + + ", addrNext=" + addrNext); if (tryno < retryMillis.length) { @@ -671,7 +691,7 @@ */ final int nbytes; - if (false&&log.isDebugEnabled()) { // add debug latency + if (false || log.isDebugEnabled()) { // FIXME add debug latency final int limit = data.limit(); if (data.position() < (limit - 50000)) { data.limit(data.position() + 50000); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -100,6 +100,8 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; import com.bigdata.ha.HATXSGlue; +import com.bigdata.ha.IHAPipelineResetRequest; +import com.bigdata.ha.IHAPipelineResetResponse; import com.bigdata.ha.IIndexManagerCallable; import com.bigdata.ha.IJoinedAndNonJoinedServices; import com.bigdata.ha.JoinedAndNonJoinedServices; @@ -8018,6 +8020,14 @@ return getProxy(ft); } + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + final IHAPipelineResetRequest req) throws IOException { + final Future<IHAPipelineResetResponse> f = quorum.getClient() + .resetPipeline(req); + return getProxy(f); + } + /* * HATXSGlue. * @@ -8243,7 +8253,6 @@ } - }; /** Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -41,7 +41,8 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -public interface QuorumClient<S extends Remote> extends QuorumListener { +public interface QuorumClient<S extends Remote> extends QuorumListener, + ServiceLookup<S> { /** * The fully qualified identifier of the logical service whose quorum state Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/ServiceLookup.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/ServiceLookup.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/ServiceLookup.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,51 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Jun 2, 2010 + */ +package com.bigdata.quorum; + +import java.rmi.Remote; +import java.util.UUID; + +public interface ServiceLookup<S extends Remote> { + + /** + * Return the remote interface used to perform HA operations on a member of + * quorum. + * + * @param serviceId + * The {@link UUID} associated with the service. + * + * @return The remote interface for that quorum member. + * + * @throws IllegalArgumentException + * if the argument is <code>null</code> + * @throws QuorumException + * if there is no {@link Quorum} member with that + * <i>serviceId</i>. + */ + S getService(UUID serviceId); + +} Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -49,6 +49,8 @@ import com.bigdata.ha.HAGlueBase; import com.bigdata.ha.HAPipelineGlue; +import com.bigdata.ha.IHAPipelineResetRequest; +import com.bigdata.ha.IHAPipelineResetResponse; import com.bigdata.ha.QuorumPipeline; import com.bigdata.ha.QuorumPipelineImpl; import com.bigdata.ha.msg.IHALogRequest; @@ -271,6 +273,12 @@ throw new UnsupportedOperationException(); } + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + final IHAPipelineResetRequest req) throws IOException { + throw new UnsupportedOperationException(); + } + } // class MockHAPipelineGlue /** @@ -493,6 +501,12 @@ // NOP } + + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + IHAPipelineResetRequest req) throws IOException { + throw new UnsupportedOperationException(); + } } // MockQuorumMemberImpl Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -56,6 +56,8 @@ import org.apache.log4j.Logger; import com.bigdata.ha.HAPipelineGlue; +import com.bigdata.ha.IHAPipelineResetRequest; +import com.bigdata.ha.IHAPipelineResetResponse; import com.bigdata.ha.msg.IHALogRequest; import com.bigdata.ha.msg.IHALogRootBlocksRequest; import com.bigdata.ha.msg.IHALogRootBlocksResponse; @@ -1335,6 +1337,12 @@ throw new UnsupportedOperationException(); } + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + IHAPipelineResetRequest req) throws IOException { + throw new UnsupportedOperationException(); + } + } // MockService } // MockQuorumMember Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -54,6 +54,8 @@ import com.bigdata.counters.PIDUtil; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; +import com.bigdata.ha.IHAPipelineResetRequest; +import com.bigdata.ha.IHAPipelineResetResponse; import com.bigdata.ha.QuorumService; import com.bigdata.ha.RunState; import com.bigdata.ha.msg.IHA2PhaseAbortMessage; @@ -1112,6 +1114,15 @@ } @Override + public Future<IHAPipelineResetResponse> resetPipeline( + final IHAPipelineResetRequest req) throws IOException { + + checkMethod("resetPipeline", new Class[] {}); + + return super.resetPipeline(req); + } + + @Override public Future<Void> receiveAndReplicate(final IHASyncRequest req, final IHASendState snd, final IHAWriteMessage msg) throws IOException { Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServer.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServer.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServer.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -26,16 +26,27 @@ */ package com.bigdata.journal.jini.ha; +import java.io.IOException; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import net.jini.config.Configuration; import net.jini.core.lookup.ServiceID; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; +import com.bigdata.ha.IndexManagerCallable; import com.bigdata.ha.msg.HADigestRequest; +import com.bigdata.journal.jini.ha.HAJournalTest.HAGlueTest; +import com.bigdata.util.InnerCause; +import com.bigdata.util.concurrent.DaemonThreadFactory; /** * Life cycle and related tests for a single remote {@link HAJournalServer} out @@ -347,4 +358,153 @@ } + /** + * This test is used to characterize what happens when we interrupt an RMI. + * Most methods on the {@link HAGlue} interface are synchronous - they block + * while some behavior is executed. This is even true for some methods that + * return a {@link Future} in order to avoid overhead associated with the + * export of a proxy and DGC thread leaks (since fixed in River). + * <p> + * This unit test setups up a service and then issues an RMI that invokes a + * {@link Thread#sleep(long)} method on the service. The thread that issues + * the RMI is then interrupted during the sleep. + * + * @throws Exception + */ + public void test_interruptRMI() throws Exception { + + // Start a service. + final HAGlue serverA = startA(); + + final AtomicReference<Throwable> localCause = new AtomicReference<Throwable>(); + + final ExecutorService executorService = Executors + .newSingleThreadScheduledExecutor(DaemonThreadFactory + .defaultThreadFactory()); + + try { + + final FutureTask<Void> localFuture = new FutureTask<Void>( + new Callable<Void>() { + + @Override + public Void call() throws Exception { + + try { + final Future<Void> ft = ((HAGlueTest) serverA) + .submit(new SleepTask(6000/* ms */), + false/* asyncFuture */); + + return ft.get(); + } catch (Throwable t) { + localCause.set(t); + log.error(t, t); + throw new RuntimeException(t); + } finally { + log.warn("Local submit of remote task is done."); + } + } + }); + /* + * Submit task that will execute sleep on A. This task will block + * until A finishes its sleep. When we cancel this task, the RMI to + * A will be interrupted. + */ + executorService.execute(localFuture); + + // Wait a bit to ensure that the task was started on A. + Thread.sleep(2000/* ms */); + + // interrupt the local future. will cause interrupt of the RMI. + localFuture.cancel(true/*mayInterruptIfRunning*/); + + } finally { + + executorService.shutdownNow(); + + } + + /* + * The local root cause of the RMI failure is an InterruptedException. + * + * Note: There is a data race between when the [localCause] is set and + * when we exit the code block above. This is because we are + * interrupting the local task and have no means to await the completion + * of its error handling routine which sets the [localCause]. + */ + { + assertCondition(new Runnable() { + @Override + public void run() { + final Throwable tmp = localCause.get(); + assertNotNull(tmp); + assertTrue(InnerCause.isInnerCause(tmp, + InterruptedException.class)); + } + }, 10000/*timeout*/, TimeUnit.MILLISECONDS); + } + + /* + * Verify the root cause as observed by A for the interrupt. It should + * also be an InterruptedException. + * + * Note: Again, there is a data race. + * + * Note: Because we might retry this, we do NOT use the getAndClearXXX() + * method to recover the remote exception. + */ + { + assertCondition(new Runnable() { + @Override + public void run() { + Throwable tmp; + try { + tmp = ((HAGlueTest) serverA).getLastRootCause(); + } catch (IOException e) { + throw new RuntimeException(e); + } + assertNotNull(tmp); + log.warn("Received non-null lastRootCause=" + tmp, tmp); + assertTrue(InnerCause.isInnerCause(tmp, + InterruptedException.class)); + } + }, 10000/* timeout */, TimeUnit.MILLISECONDS); + } + + } + + /** + * Task sleeps for a specified duration. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private static class SleepTask extends IndexManagerCallable<Void> { + + private static final long serial... [truncated message content] |
From: <mrp...@us...> - 2013-12-17 01:00:24
|
Revision: 7666 http://bigdata.svn.sourceforge.net/bigdata/?rev=7666&view=rev Author: mrpersonick Date: 2013-12-17 01:00:14 +0000 (Tue, 17 Dec 2013) Log Message: ----------- rolling back RDR changes in advance of the 1.3.0 release Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/QueryHints.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestStatementBuffer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java 2013-12-16 20:05:18 UTC (rev 7665) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java 2013-12-17 01:00:14 UTC (rev 7666) @@ -27,11 +27,9 @@ package com.bigdata.rdf.rio; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.Map; import java.util.Set; @@ -41,13 +39,11 @@ import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; -import org.semanticweb.yars.nx.namespace.RDF; import com.bigdata.rdf.changesets.ChangeAction; import com.bigdata.rdf.changesets.ChangeRecord; import com.bigdata.rdf.changesets.IChangeLog; import com.bigdata.rdf.internal.IV; -import com.bigdata.rdf.internal.impl.bnode.SidIV; import com.bigdata.rdf.model.BigdataBNode; import com.bigdata.rdf.model.BigdataBNodeImpl; import com.bigdata.rdf.model.BigdataResource; @@ -155,14 +151,6 @@ * the canonicalizing {@link #bnodes} mapping. */ private Set<BigdataStatement> deferredStmts; - - /** - * RDR statements. Map to a bnode used in other statements. Need to defer - * both the reified statement (since it comes in piecemeal) and the - * statements about it (since we need to make sure the ground version is - * present). - */ - private Map<BigdataBNodeImpl, ReifiedStmt> reifiedStmts; /** * <code>true</code> if statement identifiers are enabled. @@ -370,7 +358,7 @@ log.info("capacity=" + capacity + ", sids=" + statementIdentifiers + ", statementStore=" + statementStore + ", database=" - + database + ", arity=" + arity); + + database); } @@ -457,63 +445,13 @@ log.info("processing " + deferredStmts.size() + " deferred statements"); - /* - * Need to flush the terms out to the dictionary or the reification - * process will not work correctly. - */ - incrementalWrite(); +// incrementalWrite(); try { // Note: temporary override - clear by finally{}. statementIdentifiers = false; - // stage 0 - if (reifiedStmts != null) { - - for (Map.Entry<BigdataBNodeImpl, ReifiedStmt> e : reifiedStmts.entrySet()) { - - final BigdataBNodeImpl sid = e.getKey(); - - final ReifiedStmt reifiedStmt = e.getValue(); - - if (!reifiedStmt.isFullyBound(arity)) { - - log.warn("unfinished reified stmt: " + reifiedStmt); - - continue; - - } - - final BigdataStatement stmt = valueFactory.createStatement( - reifiedStmt.getSubject(), - reifiedStmt.getPredicate(), - reifiedStmt.getObject(), - reifiedStmt.getContext(), - StatementEnum.Explicit); - - sid.setStatement(stmt); - - sid.setIV(new SidIV(new SPO(stmt))); - - if (log.isInfoEnabled()) { - log.info("reified sid conversion: sid=" + sid + ", stmt=" + stmt); - } - - } - - if (log.isInfoEnabled()) { - - for (BigdataBNodeImpl sid : reifiedStmts.keySet()) { - - log.info("sid: " + sid + ", iv=" + sid.getIV()); - - } - - } - - } - // stage 1. { @@ -527,10 +465,6 @@ final BigdataStatement stmt = itr.next(); - if (log.isDebugEnabled()) { - log.debug(stmt.getSubject() + ", sid=" + ((BigdataBNode) stmt.getSubject()).isStatementIdentifier() + ", iv=" + stmt.s()); - } - if (stmt.getSubject() instanceof BNode && ((BigdataBNode) stmt.getSubject()).isStatementIdentifier()) continue; @@ -586,10 +520,6 @@ final BigdataStatement stmt = itr.next(); - if (log.isDebugEnabled()) { - log.debug(stmt.getSubject() + ", iv=" + stmt.s()); - } - if (stmt.getSubject() instanceof BNode && ((BigdataBNode) stmt.getSubject()).isStatementIdentifier() && stmt.s() == null) @@ -641,14 +571,6 @@ if (nremaining > 0) { - if (log.isDebugEnabled()) { - - for (BigdataStatement s : deferredStmts) { - log.debug("could not ground: " + s); - } - - } - throw new StatementCyclesException( "" + nremaining + " statements can not be grounded"); @@ -665,8 +587,6 @@ deferredStmts = null; - reifiedStmts = null; - } } @@ -691,8 +611,6 @@ deferredStmts = null; - reifiedStmts = null; - } /** @@ -824,10 +742,6 @@ if (log.isInfoEnabled()) { log.info("writing " + numTerms); - - for (int i = 0; i < numTerms; i++) { - log.info("term: " + terms[i]); - } } @@ -999,13 +913,13 @@ if (c == null) continue; -// if (c instanceof URI) { -// -// throw new UnificationException( -// "URI not permitted in context position when statement identifiers are enabled: " -// + stmt); -// -// } + if (c instanceof URI) { + + throw new UnificationException( + "URI not permitted in context position when statement identifiers are enabled: " + + stmt); + + } if( c instanceof BNode) { @@ -1102,10 +1016,6 @@ log.info("writing " + numStmts + " on " + (statementStore != null ? "statementStore" : "database")); - - for (int i = 0; i < numStmts; i++) { - log.info("spo: " + stmts[i]); - } } @@ -1255,8 +1165,6 @@ protected void handleStatement(Resource s, URI p, Value o, Resource c, StatementEnum type) { -// if (arity == 3) c = null; - s = (Resource) valueFactory.asValue(s); p = (URI) valueFactory.asValue(p); o = valueFactory.asValue(o); @@ -1321,56 +1229,16 @@ * that it is being used as a statement identifier). */ - log.info(stmt); - - if (s instanceof BNode && - (RDF.SUBJECT.toString().equals(p.toString()) || RDF.PREDICATE.toString().equals(p.toString()) || RDF.OBJECT.toString().equals(p.toString())) || - (RDF.STATEMENT.toString().equals(o.toString()) && RDF.TYPE.toString().equals(p.toString()))) { - - if (!(RDF.STATEMENT.toString().equals(o.toString()) && RDF.TYPE.toString().equals(p.toString()))) { - - final BigdataBNodeImpl sid = (BigdataBNodeImpl) s; - - if (reifiedStmts == null) { - - reifiedStmts = new HashMap<BigdataBNodeImpl, ReifiedStmt>(); - - } - - final ReifiedStmt reifiedStmt; - if (reifiedStmts.containsKey(sid)) { - - reifiedStmt = reifiedStmts.get(sid); - - } else { - - reifiedStmt = new ReifiedStmt(); - - reifiedStmts.put(sid, reifiedStmt); - - } - - reifiedStmt.set(p, (BigdataValue) o); - - if (log.isDebugEnabled()) - log.debug("reified piece: "+stmt); - - } + if (deferredStmts == null) { - } else { - - if (deferredStmts == null) { - - deferredStmts = new HashSet<BigdataStatement>(stmts.length); - - } - - deferredStmts.add(stmt); - - if (log.isDebugEnabled()) - log.debug("deferred: "+stmt); - - } + deferredStmts = new HashSet<BigdataStatement>(stmts.length); + + } + + deferredStmts.add(stmt); + + if (log.isDebugEnabled()) + log.debug("deferred: "+stmt); } else { @@ -1491,94 +1359,5 @@ } } - - private static class ReifiedStmt implements Statement { - /** - * - */ - private static final long serialVersionUID = -7706421769807306702L; - - private BigdataResource s; - private BigdataURI p; - private BigdataValue o; - private BigdataResource c; - - public ReifiedStmt() { - } - - public boolean isFullyBound(final int arity) { - return s != null && p != null && o != null && (arity > 3 ? c != null : true); - } - - @Override - public BigdataResource getContext() { - return c; - } - - @Override - public BigdataValue getObject() { - return o; - } - - @Override - public BigdataURI getPredicate() { - return p; - } - - @Override - public BigdataResource getSubject() { - return s; - } - - public void set(final URI p, final BigdataValue o) { - - if (p.toString().equals(RDF.SUBJECT.toString())) { - - setSubject((BigdataResource) o); - - } else if (p.toString().equals(RDF.PREDICATE.toString())) { - - setPredicate((BigdataURI) o); - - } else if (p.toString().equals(RDF.OBJECT.toString())) { - - setObject(o); - -// } else if (p.equals(RDF.CONTEXT)) { -// -// setPredicate((URI) c); -// - } else { - - throw new IllegalArgumentException(); - - } - - } - - public void setSubject(final BigdataResource s) { - this.s = s; - } - - public void setPredicate(final BigdataURI p) { - this.p = p; - } - - public void setObject(final BigdataValue o) { - this.o = o; - } - - public void setContext(final BigdataResource c) { - this.c = c; - } - - public String toString() { - - return "<" + s + ", " + p + ", " + o + ", " + c + ">"; - - } - - } - } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/QueryHints.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/QueryHints.java 2013-12-16 20:05:18 UTC (rev 7665) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/QueryHints.java 2013-12-17 01:00:14 UTC (rev 7666) @@ -459,7 +459,7 @@ */ String REIFICATION_DONE_RIGHT = "reificationDoneRight"; - boolean DEFAULT_REIFICATION_DONE_RIGHT = true; + boolean DEFAULT_REIFICATION_DONE_RIGHT = false; /** * Used to mark a predicate as "range safe" - that is, we can safely Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestStatementBuffer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestStatementBuffer.java 2013-12-16 20:05:18 UTC (rev 7665) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestStatementBuffer.java 2013-12-17 01:00:14 UTC (rev 7666) @@ -280,9 +280,6 @@ */ public void test_reificationDoneRight_disabled() { - if (QueryHints.DEFAULT_REIFICATION_DONE_RIGHT) - return; - final int capacity = 20; final Properties properties = new Properties(getProperties()); @@ -450,19 +447,6 @@ */ return; } - - if (!store.isStatementIdentifiers()) { - /** - * Disabled. FIXME This should be ON for TRIPLES or QUADS. It - * only works in the SIDS mode right now. The root cause is - * - * <pre> - * Caused by: java.lang.IllegalArgumentException: context bound, but not quads or sids: < TermId(7B), TermId(5U), com.bigdata.rdf.internal.impl.literal.LiteralExtensionIV@25889b2f, TermId(8B) : Explicit > - * at com.bigdata.rdf.spo.SPOIndexWriter.call(SPOIndexWriter.java:275) - * </pre> - */ - return; - } // * @prefix : <http://example.com/> . // * @prefix news: <http://example.com/news/> . @@ -516,10 +500,10 @@ // metadata statements. final BigdataStatement mds1 = vf.createStatement(s1, dcSource, - newsSybase, vf.createBNode(), StatementEnum.Explicit); + newsSybase, null/* context */, StatementEnum.Explicit); final BigdataStatement mds2 = vf.createStatement(s1, dcCreated, - createdDate, vf.createBNode(), StatementEnum.Explicit); + createdDate, null/* context */, StatementEnum.Explicit); buffer.add(mds1); @@ -566,17 +550,13 @@ assertEquals(sidIV1.getInlineValue().s(), mds1.s()); assertEquals(sidIV1.getInlineValue().p(), mds1.p()); assertEquals(sidIV1.getInlineValue().o(), mds1.o()); + assertNull(sidIV1.getInlineValue().c()); assertEquals(sidIV2.getInlineValue().s(), mds2.s()); assertEquals(sidIV2.getInlineValue().p(), mds2.p()); assertEquals(sidIV2.getInlineValue().o(), mds2.o()); + assertNull(sidIV2.getInlineValue().c()); - /* - * FIXME Implement quads mode RDR - */ -// assertNull(sidIV1.getInlineValue().c()); -// assertNull(sidIV2.getInlineValue().c()); - } finally { store.__tearDownUnitTest(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 20:05:24
|
Revision: 7665 http://bigdata.svn.sourceforge.net/bigdata/?rev=7665&view=rev Author: thompsonbry Date: 2013-12-16 20:05:18 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Branch for work on RDR. See #526. MikeP is going to back out the RDR changes from the main development branch and re-commit them into this branch. I am going to setup CI on the RDR branch. RDR will be in 1.3.1. Added Paths: ----------- branches/RDR/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 18:42:56
|
Revision: 7664 http://bigdata.svn.sourceforge.net/bigdata/?rev=7664&view=rev Author: thompsonbry Date: 2013-12-16 18:42:47 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Updated release notes. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt 2013-12-16 18:27:40 UTC (rev 7663) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt 2013-12-16 18:42:47 UTC (rev 7664) @@ -20,10 +20,8 @@ New features: -- High availability. +- High availability [10]. - Property Path performance enhancements. -- Reification Done Right (alpha). -- RDF Graph Mining API (alpha). - Plus numerous other bug fixes and performance enhancements. Feature summary: @@ -40,6 +38,8 @@ Road map [3]: +- RDF Graph Mining API ([12]). +- Reification Done Right ([11]). - Runtime Query Optimizer for Analytic Query mode; - Performance optimization for scale-out clusters; and - Simplified deployment, configuration, and administration for scale-out clusters. @@ -347,6 +347,8 @@ [8] http://sourceforge.net/projects/bigdata/files/bigdata/ [9] http://sourceforge.net/apps/mediawiki/bigdata/index.php?title=DataMigration [10] http://sourceforge.net/apps/mediawiki/bigdata/index.php?title=HAJournalServer +[11] http://www.bigdata.com/whitepapers/reifSPARQL.pdf +[12] http://sourceforge.net/apps/mediawiki/bigdata/index.php?title=RDF_GAS_API About bigdata: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 18:27:47
|
Revision: 7663 http://bigdata.svn.sourceforge.net/bigdata/?rev=7663&view=rev Author: thompsonbry Date: 2013-12-16 18:27:40 +0000 (Mon, 16 Dec 2013) Log Message: ----------- bug fix for HASendState.getMarker(). It was not including the version (2 bytes). Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/msg/TestAll.java Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/msg/TestHASendState.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-16 18:09:07 UTC (rev 7662) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-16 18:27:40 UTC (rev 7663) @@ -1,6 +1,8 @@ package com.bigdata.ha.msg; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; import java.io.Externalizable; import java.io.IOException; @@ -8,7 +10,6 @@ import java.io.ObjectOutput; import java.util.UUID; -import com.bigdata.io.DataInputBuffer; import com.bigdata.io.DataOutputBuffer; import com.bigdata.rawstore.Bytes; @@ -87,8 +88,13 @@ @Override public byte[] getMarker() { - final byte[] a = new byte[MAGIC_SIZE + currentVersionLen]; + final int len = MAGIC_SIZE + currentVersionLen; +// final ByteArrayOutputStream baos = new ByteArrayOutputStream(len); +// +// final DataOutputStream dob = new DataOutputStream(baos); + final byte[] a = new byte[len]; + final DataOutputBuffer dob = new DataOutputBuffer(0/* len */, a); try { @@ -97,14 +103,17 @@ writeExternal2(dob); + dob.flush(); + +// return baos.toByteArray(); + return a; + } catch (IOException e) { throw new RuntimeException(e); } - return a; - } @Override @@ -150,6 +159,7 @@ private static final transient short VERSION0 = 0x0; private static final transient int VERSION0_LEN = // + Bytes.SIZEOF_SHORT + // version Bytes.SIZEOF_LONG + // messageId Bytes.SIZEOF_UUID + // originalSenderId Bytes.SIZEOF_UUID + // senderId @@ -205,24 +215,24 @@ * <code>null</code>. */ static public IHASendState decode(final byte[] a) throws IOException { - return null; -// if (a == null) -// return null; -// -// final HASendState tmp = new HASendState(); -// -// final DataInputBuffer dis = new DataInputBuffer(a); -// -// final long magic = dis.readLong(); -// -// if (magic != MAGIC) -// throw new IOException("Bad magic: expected=" + MAGIC + ", actual=" -// + magic); -// -// tmp.readExternal2(dis); -// -// return tmp; + + if (a == null) + return null; + + final HASendState tmp = new HASendState(); + final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a)); + + final long magic = dis.readLong(); + + if (magic != MAGIC) + throw new IOException("Bad magic: expected=" + MAGIC + ", actual=" + + magic); + + tmp.readExternal2(dis); + + return tmp; + } @Override Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/msg/TestAll.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/msg/TestAll.java 2013-12-16 18:09:07 UTC (rev 7662) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/msg/TestAll.java 2013-12-16 18:27:40 UTC (rev 7663) @@ -62,7 +62,9 @@ final TestSuite suite = new TestSuite("HA messages"); suite.addTestSuite(TestHAWriteMessage.class); - + + suite.addTestSuite(TestHASendState.class); + return suite; } Added: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/msg/TestHASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/msg/TestHASendState.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/msg/TestHASendState.java 2013-12-16 18:27:40 UTC (rev 7663) @@ -0,0 +1,80 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.msg; + +import java.io.IOException; +import java.util.UUID; + +import com.bigdata.io.SerializerUtil; + +import junit.framework.TestCase2; + +public class TestHASendState extends TestCase2 { + + public TestHASendState() { + } + + public TestHASendState(String name) { + super(name); + } + + public void test_roundTrip() throws IOException { + + final long messageId = 5; + final UUID originalSenderUUID = UUID.randomUUID(); + final UUID senderId = UUID.randomUUID(); + final long quorumToken = 12; + final int replicationFactor = 3; + + final HASendState expected = new HASendState(messageId, + originalSenderUUID, senderId, quorumToken, replicationFactor); + + final byte[] b = SerializerUtil.serialize(expected); + + final HASendState actual = (HASendState) SerializerUtil.deserialize(b); + + assertEquals(expected, actual); + + } + + public void test_getMarker_decode() throws IOException { + + final long messageId = 5; + final UUID originalSenderUUID = UUID.randomUUID(); + final UUID senderId = UUID.randomUUID(); + final long quorumToken = 12; + final int replicationFactor = 3; + + final HASendState expected = new HASendState(messageId, + originalSenderUUID, senderId, quorumToken, replicationFactor); + + final byte[] b = expected.getMarker(); + + final HASendState actual = (HASendState) HASendState.decode(b); + + assertEquals(expected, actual); + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 18:09:13
|
Revision: 7662 http://bigdata.svn.sourceforge.net/bigdata/?rev=7662&view=rev Author: thompsonbry Date: 2013-12-16 18:09:07 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Commented out broken decode to prevent false EOF Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-16 18:06:19 UTC (rev 7661) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-16 18:09:07 UTC (rev 7662) @@ -205,24 +205,24 @@ * <code>null</code>. */ static public IHASendState decode(final byte[] a) throws IOException { + return null; +// if (a == null) +// return null; +// +// final HASendState tmp = new HASendState(); +// +// final DataInputBuffer dis = new DataInputBuffer(a); +// +// final long magic = dis.readLong(); +// +// if (magic != MAGIC) +// throw new IOException("Bad magic: expected=" + MAGIC + ", actual=" +// + magic); +// +// tmp.readExternal2(dis); +// +// return tmp; - if (a == null) - return null; - - final HASendState tmp = new HASendState(); - - final DataInputBuffer dis = new DataInputBuffer(a); - - final long magic = dis.readLong(); - - if (magic != MAGIC) - throw new IOException("Bad magic: expected=" + MAGIC + ", actual=" - + magic); - - tmp.readExternal2(dis); - - return tmp; - } @Override This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 18:06:27
|
Revision: 7661 http://bigdata.svn.sourceforge.net/bigdata/?rev=7661&view=rev Author: thompsonbry Date: 2013-12-16 18:06:19 +0000 (Mon, 16 Dec 2013) Log Message: ----------- QuorumPipelineImpl - just javadoc. HASendService - transparency for the HASendState if task fails. Also includes hack for small send chunks - must be enabled in the code. HASendState - added method to decode the marker. HAReceiveService - added decode of the marker and more information when closing the client connection. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -1785,7 +1785,7 @@ } finally { // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); + futSnd.cancel(true/* mayInterruptIfRunning */); // FIXME Cancel hits wrong send() payload. } } catch (Throwable t) { launderPipelineException(true/* isLeader */, member, t); @@ -1795,10 +1795,14 @@ } /** - * Launder an exception thrown during pipeline replication. + * Launder an exception thrown during pipeline replication. + * * @param isLeader + * <code>true</code> iff this service is the quorum leader. * @param member + * The {@link QuorumMember} for this service. * @param t + * The throwable. */ static private void launderPipelineException(final boolean isLeader, final QuorumMember<?> member, final Throwable t) { @@ -2155,7 +2159,7 @@ } finally { // cancel the local Future. - futRec.cancel(true/* mayInterruptIfRunning */); + futRec.cancel(true/* mayInterruptIfRunning */); // FIXME Cancel hits wrong send() payload? } } catch (Throwable t) { launderPipelineException(false/* isLeader */, member, t); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -1,5 +1,6 @@ package com.bigdata.ha.msg; +import java.io.DataInput; import java.io.DataOutput; import java.io.Externalizable; import java.io.IOException; @@ -7,6 +8,7 @@ import java.io.ObjectOutput; import java.util.UUID; +import com.bigdata.io.DataInputBuffer; import com.bigdata.io.DataOutputBuffer; import com.bigdata.rawstore.Bytes; @@ -162,6 +164,12 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + readExternal2(in); + + } + + private void readExternal2(final DataInput in) throws IOException { + final short version = in.readShort(); if (version != VERSION0) @@ -185,6 +193,38 @@ } + /** + * Decode the value returned by {@link #getMarker()}. This has the magic + * followed by {@link #writeExternal2(DataOutput)}. It does not have the + * object serialization metadata. + * + * @param a + * The encoded marker. + * + * @return The decoded marker -or- <code>null</code> iff the argument is + * <code>null</code>. + */ + static public IHASendState decode(final byte[] a) throws IOException { + + if (a == null) + return null; + + final HASendState tmp = new HASendState(); + + final DataInputBuffer dis = new DataInputBuffer(a); + + final long magic = dis.readLong(); + + if (magic != MAGIC) + throw new IOException("Bad magic: expected=" + MAGIC + ", actual=" + + magic); + + tmp.readExternal2(dis); + + return tmp; + + } + @Override public void writeExternal(final ObjectOutput out) throws IOException { Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -55,6 +55,7 @@ import com.bigdata.btree.BytesUtil; import com.bigdata.ha.QuorumPipelineImpl; import com.bigdata.ha.msg.HAMessageWrapper; +import com.bigdata.ha.msg.HASendState; import com.bigdata.ha.msg.IHAMessage; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -665,7 +666,10 @@ // must register OP_READ selector on the new client clientKey = client.register(clientSelector, SelectionKey.OP_READ); - + + if (log.isInfoEnabled()) + log.info("Accepted new connection"); + // this.downstream = downstream; // // // Prepare downstream (if any) for incremental transfers @@ -687,7 +691,9 @@ @Override public String toString() { + final Socket s = client.socket(); + return super.toString() // + "{client.isOpen()=" + client.isOpen()// + ",client.isConnected()=" + client.isConnected()// @@ -701,7 +707,7 @@ private void close() throws IOException { if (log.isInfoEnabled()) - log.info("Closing client connection"); + log.info("Closing client connection: " + this); clientKey.cancel(); @@ -970,7 +976,14 @@ * for the InterruptedException, ClosedByInterruptException, * etc. */ - log.error("client=" + clientRef.get() + ", cause=" + t, t); + log.error( + "client=" + + clientRef.get() + + ", msg=" + + message + + ", marker=" + + HASendState.decode(message.getHASendState() + .getMarker()) + ", cause=" + t, t); if (t instanceof Exception) throw (Exception) t; Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -39,6 +39,7 @@ import org.apache.log4j.Logger; +import com.bigdata.ha.msg.HASendState; import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.Haltable; @@ -589,7 +590,8 @@ + sc + (sc == null ? "" : ", sc.isOpen()=" + sc.isOpen() + ", sc.isConnected()=" + sc.isConnected()) - + ", cause=" + t, t); + + ", marker=" + HASendState.decode(marker) + ", cause=" + + t, t); if (t instanceof Exception) throw (Exception) t; @@ -668,8 +670,28 @@ * buffer. */ - final int nbytes = socketChannel.write(data); + final int nbytes; + if (false&&log.isDebugEnabled()) { // add debug latency + final int limit = data.limit(); + if (data.position() < (limit - 50000)) { + data.limit(data.position() + 50000); + } + nbytes = socketChannel.write(data); + data.limit(limit); + nwritten += nbytes; + log.debug("Written " + nwritten + " of total " + + data.limit()); + + if (nwritten < limit) { + Thread.sleep(1); + } + } else { + + nbytes = socketChannel.write(data); + nwritten += nbytes; + } + nwritten += nbytes; if (log.isTraceEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 16:54:44
|
Revision: 7660 http://bigdata.svn.sourceforge.net/bigdata/?rev=7660&view=rev Author: thompsonbry Date: 2013-12-16 16:54:38 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Pushed down a read() method that closes the upstream socket channel if there is an error reading from A. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:19:27 UTC (rev 7659) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:54:38 UTC (rev 7660) @@ -698,7 +698,7 @@ } - public void close() throws IOException { + private void close() throws IOException { if (log.isInfoEnabled()) log.info("Closing client connection"); @@ -718,7 +718,32 @@ // } // } } + + } + /** + * Wraps {@link SocketChannel#read(ByteBuffer)} to test for an EOF and + * calls {@link #close()} if an EOF is reached. + * + * @param dst + * The destination buffer. + * + * @return The #of bytes read. + * + * @throws IOException + */ + private int read(final ByteBuffer dst) throws IOException { + + final int rdlen = client.read(dst); + + if (rdlen == -1) { + + close(); + + } + + return rdlen; + } /** @@ -726,7 +751,7 @@ * control back to the leader. The leader will then handle this in * {@link QuorumPipelineImpl}'s retrySend() method. */ - public void checkFirstCause() throws RuntimeException { + private void checkFirstCause() throws RuntimeException { final Throwable t = firstCause.getAndSet(null); @@ -1162,7 +1187,7 @@ } - final int rdlen = client.client.read(localBuffer); + final int rdlen = client.read(localBuffer); if (log.isTraceEnabled()) log.trace("Read " + rdlen + " bytes with " @@ -1381,7 +1406,7 @@ markerBB.limit(remtok); markerBB.position(0); - final int rdLen = client.client.read(markerBB); + final int rdLen = client.read(markerBB); if (rdLen == -1) { throw new IOException("EOF: nreads=" + nreads + ", bytesRead=" + bytesRead); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:19:27 UTC (rev 7659) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:54:38 UTC (rev 7660) @@ -587,8 +587,8 @@ log.error("socketChannel=" + sc - + (sc == null ? "" : ", sc.isOpen()" + sc.isOpen() - + ", sc.isConnected()" + sc.isConnected()) + + (sc == null ? "" : ", sc.isOpen()=" + sc.isOpen() + + ", sc.isConnected()=" + sc.isConnected()) + ", cause=" + t, t); if (t instanceof Exception) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 16:19:37
|
Revision: 7659 http://bigdata.svn.sourceforge.net/bigdata/?rev=7659&view=rev Author: thompsonbry Date: 2013-12-16 16:19:27 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to martyn with more logging. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:56:36 UTC (rev 7658) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:19:27 UTC (rev 7659) @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.BindException; import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.SelectionKey; @@ -686,9 +687,12 @@ @Override public String toString() { - + final Socket s = client.socket(); return super.toString() // + "{client.isOpen()=" + client.isOpen()// + + ",client.isConnected()=" + client.isConnected()// + + ",socket.isInputShutdown()=" + + (s == null ? "N/A" : s.isInputShutdown())// + ",clientSelector.isOpen=" + clientSelector.isOpen()// + "}"; Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 15:56:36 UTC (rev 7658) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:19:27 UTC (rev 7659) @@ -553,21 +553,58 @@ */ protected /*static*/ class IncSendTask implements Callable<Void> { - private final ByteBuffer data; - private final byte[] marker; + private final ByteBuffer data; + private final byte[] marker; - public IncSendTask(final ByteBuffer data, final byte[] marker) { + public IncSendTask(final ByteBuffer data, final byte[] marker) { - if (data == null) - throw new IllegalArgumentException(); + if (data == null) + throw new IllegalArgumentException(); - this.data = data; - this.marker = marker; - } + this.data = data; + this.marker = marker; + } - @Override - public Void call() throws Exception { + @Override + public Void call() throws Exception { + try { + + return doInnerCall(); + + } catch (Throwable t) { + + /* + * Log anything thrown out of this task. We check the Future of + * this task, but that does not tell us what exception is thrown + * in the Thread executing the task when the Future is cancelled + * and that thread is interrupted. In particular, we are looking + * for the InterruptedException, ClosedByInterruptException, + * etc. + */ + + final SocketChannel sc = socketChannel.get(); + + log.error("socketChannel=" + + sc + + (sc == null ? "" : ", sc.isOpen()" + sc.isOpen() + + ", sc.isConnected()" + sc.isConnected()) + + ", cause=" + t, t); + + if (t instanceof Exception) + throw (Exception) t; + + if (t instanceof RuntimeException) + throw (RuntimeException) t; + + throw new RuntimeException(t); + + } + + } + + private Void doInnerCall() throws Exception { + // defer until we actually run. final SocketChannel socketChannel = reopenChannel(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 15:56:45
|
Revision: 7658 http://bigdata.svn.sourceforge.net/bigdata/?rev=7658&view=rev Author: thompsonbry Date: 2013-12-16 15:56:36 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to martyn. more logging. Also, Client now blocks until connected in the ctor. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:22:25 UTC (rev 7657) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:56:36 UTC (rev 7658) @@ -656,7 +656,9 @@ */ client = server.accept(); client.configureBlocking(false); - + if (!client.finishConnect()) + throw new IOException("Upstream client not connected"); + clientSelector = Selector.open(); // must register OP_READ selector on the new client @@ -682,6 +684,16 @@ } + @Override + public String toString() { + + return super.toString() // + + "{client.isOpen()=" + client.isOpen()// + + ",clientSelector.isOpen=" + clientSelector.isOpen()// + + "}"; + + } + public void close() throws IOException { if (log.isInfoEnabled()) @@ -929,9 +941,8 @@ * for the InterruptedException, ClosedByInterruptException, * etc. */ - - log.error(t, t); - + log.error("client=" + clientRef.get() + ", cause=" + t, t); + if (t instanceof Exception) throw (Exception) t; @@ -986,8 +997,11 @@ // // } - if (client == null || !client.client.isOpen()) { + if (client == null || !client.client.isOpen() + || !client.clientSelector.isOpen()) { + log.warn("Re-opening upstream client connection"); + final Client tmp = clientRef.getAndSet(null); if (tmp != null) { // Close existing connection if not open. @@ -1137,8 +1151,11 @@ iter.next(); iter.remove(); - if (!drainUtil.foundMarker()) { + if (!drainUtil.findMarker()) { + + // continue to drain until the marker. continue; + } final int rdlen = client.client.read(localBuffer); @@ -1341,8 +1358,15 @@ * could read large amounts of data only a few bytes at a time, however * this is not in reality a significant overhead. */ - boolean foundMarker() throws IOException { + boolean findMarker() throws IOException { + if (markerIndex == marker.length) { + + // Marker already found for this payload. + return true; + + } + if (log.isDebugEnabled()) log.debug("Looking for token, " + BytesUtil.toHexString(marker) + ", reads: " + nreads); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 15:22:36
|
Revision: 7657 http://bigdata.svn.sourceforge.net/bigdata/?rev=7657&view=rev Author: thompsonbry Date: 2013-12-16 15:22:25 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to Martyn with logging to let us observe the thrown cause in the ReadTask when the Future of that task is cancelled. See #724 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:01:47 UTC (rev 7656) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:22:25 UTC (rev 7657) @@ -914,7 +914,38 @@ @Override public Void call() throws Exception { + + try { + + return doInnerCall(); + + } catch (Throwable t) { + + /* + * Log anything thrown out of this task. We check the Future of + * this task, but that does not tell us what exception is thrown + * in the Thread executing the task when the Future is cancelled + * and that thread is interrupted. In particular, we are looking + * for the InterruptedException, ClosedByInterruptException, + * etc. + */ + + log.error(t, t); + + if (t instanceof Exception) + throw (Exception) t; + + if (t instanceof RuntimeException) + throw (RuntimeException) t; + + throw new RuntimeException(t); + + } + + } + private Void doInnerCall() throws Exception { + // awaitAccept(); // // /* This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 15:01:54
|
Revision: 7656 http://bigdata.svn.sourceforge.net/bigdata/?rev=7656&view=rev Author: thompsonbry Date: 2013-12-16 15:01:47 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to martyn. Added more logging to the HAReceiveService. Suspect java is closing the client channel when we cancel the readFuture. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 14:54:41 UTC (rev 7655) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:01:47 UTC (rev 7656) @@ -683,9 +683,16 @@ } public void close() throws IOException { + + if (log.isInfoEnabled()) + log.info("Closing client connection"); + clientKey.cancel(); + try { + client.close(); + } finally { // try { clientSelector.close(); @@ -695,6 +702,7 @@ // } // } } + } /** @@ -1134,7 +1142,8 @@ } // while( rem > 0 && !EOS ) if (localBuffer.position() != message.getSize()) - throw new IOException("Receive length error: localBuffer.pos=" + throw new IOException("Receive length error: rem=" + rem + + ", EOS=" + EOS + ", localBuffer.pos=" + localBuffer.position() + ", message.size=" + message.getSize()); @@ -1651,7 +1660,8 @@ if (oldClient != null) { -// log.warn("Cleared Client reference."); + if (log.isInfoEnabled()) + log.info("Closing client connection"); try { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 14:54:49
|
Revision: 7655 http://bigdata.svn.sourceforge.net/bigdata/?rev=7655&view=rev Author: thompsonbry Date: 2013-12-16 14:54:41 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to martyn. - HASendService : log level changes only. - HAReceiveService: (a) lifted the heap byte[] for amortizing checksum data xfer costs out of the ReadTask to reduce heap churn; (b) logging changes, especially for DrainToMarker. @see #724 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 14:08:57 UTC (rev 7654) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 14:54:41 UTC (rev 7655) @@ -224,6 +224,15 @@ */ private final AtomicReference<InetSocketAddress> addrNextRef; + /** + * Private buffer used to incrementally compute the checksum of the data as + * it is received. The purpose of this buffer is to take advantage of more + * efficient bulk copy operations from the NIO buffer into a local byte[] on + * the Java heap against which we then track the evolving checksum of the + * data. + */ + private final byte[] heapBuffer = new byte[512]; + /* * Note: toString() implementation is non-blocking. */ @@ -533,8 +542,8 @@ // Setup task to read buffer for that message. readFuture = waitFuture = new FutureTask<Void>( new ReadTask<M>(server, clientRef, msg, - localBuffer, sendService, addrNextRef, - callback)); + localBuffer, heapBuffer, sendService, + addrNextRef, callback)); // [waitFuture] is available for receiveData(). futureReady.signalAll(); @@ -568,7 +577,7 @@ try { readFuture.get(); } catch (Exception e) { - log.warn(e, e); + log.error(e, e); } lock.lockInterruptibly(); @@ -757,21 +766,10 @@ private final Adler32 chk = new Adler32(); + private final byte[] heapBuffer; + /** - * Private buffer used to incrementally compute the checksum of the data - * as it is received. The purpose of this buffer is to take advantage of - * more efficient bulk copy operations from the NIO buffer into a local - * byte[] on the Java heap against which we then track the evolving - * checksum of the data. * - * FIXME Why isn't this buffer scoped to the outer HAReceiveService? By - * being an inner class field, we allocate it once per payload - * received.... - */ - private final byte[] a = new byte[512]; - - /** - * * @param server * @param clientRef * The client socket, selector, etc. @@ -797,7 +795,8 @@ */ public ReadTask(final ServerSocketChannel server, final AtomicReference<Client> clientRef, final M message, - final ByteBuffer localBuffer, final HASendService downstream, + final ByteBuffer localBuffer, final byte[] heapBuffer, + final HASendService downstream, final AtomicReference<InetSocketAddress> addrNextRef, final IHAReceiveCallback<M> callback) { @@ -807,6 +806,8 @@ throw new IllegalArgumentException(); if (message == null) throw new IllegalArgumentException(); + if (heapBuffer == null) + throw new IllegalArgumentException(); if (localBuffer == null) throw new IllegalArgumentException(); if (downstream == null) @@ -816,6 +817,7 @@ this.clientRef = clientRef; this.message = message; this.localBuffer = localBuffer; + this.heapBuffer = heapBuffer; this.sendService = downstream; this.addrNextRef = addrNextRef; this.callback = callback; @@ -868,8 +870,10 @@ } /** - * Update the running checksum. - * + * Update the running checksum. This uses the {@link #heapBuffer} to + * amoritize the cost of the transfers for the incremental checksum + * maintenance. + * * @param rdlen * The #of bytes read in the last read from the socket into * the {@link #localBuffer}. @@ -885,21 +889,22 @@ // rewind to the first byte to be read. b.position(mark - rdlen); - for (int pos = mark - rdlen; pos < mark; pos += a.length) { + for (int pos = mark - rdlen; pos < mark; pos += heapBuffer.length) { // #of bytes to copy into the local byte[]. - final int len = Math.min(mark - pos, a.length); + final int len = Math.min(mark - pos, heapBuffer.length); // copy into Java heap byte[], advancing b.position(). - b.get(a, 0/* off */, len); + b.get(heapBuffer, 0/* off */, len); // update the running checksum. - chk.update(a, 0/* off */, len); + chk.update(heapBuffer, 0/* off */, len); } } + @Override public Void call() throws Exception { // awaitAccept(); @@ -1265,11 +1270,14 @@ final private ByteBuffer markerBB; final private Client client; + private boolean foundMarkerInInitialPosition = true; private int markerIndex = 0; - private int nmarkerreads = 0; + private int nreads = 0; private int nmarkerbytematches = 0; + private long bytesRead = 0L; DrainToMarkerUtil(final byte[] marker, final Client client) { + this.marker = marker; this.markerBuffer = marker == null ? null : new byte[marker.length]; this.markerBB = marker == null ? null : ByteBuffer @@ -1297,7 +1305,7 @@ if (log.isDebugEnabled()) log.debug("Looking for token, " + BytesUtil.toHexString(marker) - + ", reads: " + nmarkerreads); + + ", reads: " + nreads); while (markerIndex < marker.length) { @@ -1306,10 +1314,23 @@ markerBB.position(0); final int rdLen = client.client.read(markerBB); + if (rdLen == -1) { + throw new IOException("EOF: nreads=" + nreads + + ", bytesRead=" + bytesRead); + } + nreads++; + bytesRead += rdLen; for (int i = 0; i < rdLen; i++) { if (markerBuffer[i] != marker[markerIndex]) { - if (nmarkerreads < 2) - log.warn("TOKEN MISMATCH"); + if (foundMarkerInInitialPosition) { + /* + * The marker was not found in the initial position + * in the stream. We are going to drain data until + * we can match the marker. + */ + foundMarkerInInitialPosition = false; + log.error("Marker not found: skipping"); + } markerIndex = 0; if (markerBuffer[i] == marker[markerIndex]) { markerIndex++; @@ -1320,21 +1341,25 @@ } } - nmarkerreads++; - if (nmarkerreads % 10000 == 0) { + if (nreads % 10000 == 0) { if (log.isDebugEnabled()) - log.debug("...still looking, reads: " + nmarkerreads); + log.debug("...still looking: reads=" + nreads + + ", bytesRead=" + bytesRead); } } - if (markerIndex != marker.length) { // not sufficient data ready + if (markerIndex != marker.length) { + /* + * Partial marker has been read, but we do not have enough data + * for a full match yet. + */ if (log.isDebugEnabled()) log.debug("Not found token yet!"); return false; } else { if (log.isDebugEnabled()) - log.debug("Found token after " + nmarkerreads + log.debug("Found token after " + nreads + " token reads and " + nmarkerbytematches + " byte matches"); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 14:08:57 UTC (rev 7654) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 14:54:41 UTC (rev 7655) @@ -247,8 +247,8 @@ * {@link HAReceiveService}. */ synchronized public void terminate() { - if (log.isDebugEnabled()) - log.debug(toString() + " : stopping."); + if (log.isInfoEnabled()) + log.info(toString() + " : stopping."); final ExecutorService tmp = executorRef.getAndSet(null); if (tmp == null) { // Not running. @@ -446,13 +446,13 @@ socketChannel.set(sc = openChannel(addrNext.get())); - if (log.isTraceEnabled()) - log.trace("Opened channel on try: " + tryno); + if (log.isInfoEnabled()) + log.info("Opened channel on try: " + tryno); } catch (IOException e) { if (log.isInfoEnabled()) - log.info("Failed to open channel on try: " + tryno); + log.info("Failed to open channel on try: " + tryno); if (tryno < retryMillis.length) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 14:09:04
|
Revision: 7654 http://bigdata.svn.sourceforge.net/bigdata/?rev=7654&view=rev Author: thompsonbry Date: 2013-12-16 14:08:57 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Modified AbstractServer writeServiceIdOnFile() to provide both the UUID and ServiceID forms of the assigned ID to the log. Modified Paths: -------------- branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java Modified: branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-12-16 13:28:37 UTC (rev 7653) +++ branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-12-16 14:08:57 UTC (rev 7654) @@ -1393,7 +1393,8 @@ if (log.isInfoEnabled()) log.info("ServiceID saved: file=" + serviceIdFile - + ", serviceID=" + serviceID); + + ", serviceID=" + serviceID + ", serviceUUID=" + + JiniUtil.serviceID2UUID(serviceID)); } finally { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 13:28:45
|
Revision: 7653 http://bigdata.svn.sourceforge.net/bigdata/?rev=7653&view=rev Author: thompsonbry Date: 2013-12-16 13:28:37 +0000 (Mon, 16 Dec 2013) Log Message: ----------- final and override annotations Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultSnapshotPolicy.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/NoSnapshotPolicy.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultSnapshotPolicy.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultSnapshotPolicy.java 2013-12-15 20:36:09 UTC (rev 7652) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultSnapshotPolicy.java 2013-12-16 13:28:37 UTC (rev 7653) @@ -136,6 +136,7 @@ } + @Override public void init(final HAJournal jnl) { // delay until next run (ms). Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/NoSnapshotPolicy.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/NoSnapshotPolicy.java 2013-12-15 20:36:09 UTC (rev 7652) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/NoSnapshotPolicy.java 2013-12-16 13:28:37 UTC (rev 7653) @@ -34,7 +34,7 @@ public class NoSnapshotPolicy implements ISnapshotPolicy { @Override - public void init(HAJournal jnl) { + public void init(final HAJournal jnl) { // NOP } @@ -45,6 +45,7 @@ } + @Override public IHASnapshotRequest newSnapshotRequest() { return new HASnapshotRequest(100/* percentLogSize */); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-15 20:36:16
|
Revision: 7652 http://bigdata.svn.sourceforge.net/bigdata/?rev=7652&view=rev Author: thompsonbry Date: 2013-12-15 20:36:09 +0000 (Sun, 15 Dec 2013) Log Message: ----------- final and @Override attributes Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStoreWithoutStatementIdentifiers.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStoreWithoutStatementIdentifiers.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStoreWithoutStatementIdentifiers.java 2013-12-14 15:14:59 UTC (rev 7651) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStoreWithoutStatementIdentifiers.java 2013-12-15 20:36:09 UTC (rev 7652) @@ -64,7 +64,7 @@ * Use a proxy test suite and specify the delegate. */ - ProxyTestSuite suite = new ProxyTestSuite(delegate, + final ProxyTestSuite suite = new ProxyTestSuite(delegate, "Local Triple Store Without Statement Identifiers"); /* @@ -95,6 +95,7 @@ } + @Override public Properties getProperties() { final Properties properties = super.getProperties(); @@ -114,7 +115,8 @@ } - protected AbstractTripleStore getStore(Properties properties) { + @Override + protected AbstractTripleStore getStore(final Properties properties) { return LocalTripleStore.getInstance(properties); @@ -133,6 +135,7 @@ * be re-opened, e.g., from failure to obtain a file lock, * etc. */ + @Override protected AbstractTripleStore reopenStore(final AbstractTripleStore store) { // close the store. @@ -146,13 +149,13 @@ } // Note: clone to avoid modifying!!! - Properties properties = (Properties) getProperties().clone(); + final Properties properties = (Properties) getProperties().clone(); // Turn this off now since we want to re-open the same store. properties.setProperty(Options.CREATE_TEMP_FILE, "false"); // The backing file that we need to re-open. - File file = ((LocalTripleStore) store).getIndexManager().getFile(); + final File file = ((LocalTripleStore) store).getIndexManager().getFile(); assertNotNull(file); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |