This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <tho...@us...> - 2013-12-14 15:15:06
|
Revision: 7651 http://bigdata.svn.sourceforge.net/bigdata/?rev=7651&view=rev Author: thompsonbry Date: 2013-12-14 15:14:59 +0000 (Sat, 14 Dec 2013) Log Message: ----------- update to the release notes for 1.3.0. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt 2013-12-14 15:14:16 UTC (rev 7650) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt 2013-12-14 15:14:59 UTC (rev 7651) @@ -10,7 +10,7 @@ Starting with the 1.3.0 release, we offer a tarball artifact [10] for easy installation of the HA replication cluster. -You can download the WAR from: +You can download the WAR (standalone) or HA artifacts from: http://sourceforge.net/projects/bigdata/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-14 15:14:23
|
Revision: 7650 http://bigdata.svn.sourceforge.net/bigdata/?rev=7650&view=rev Author: thompsonbry Date: 2013-12-14 15:14:16 +0000 (Sat, 14 Dec 2013) Log Message: ----------- update to the release notes for 1.3.0. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt 2013-12-14 13:24:46 UTC (rev 7649) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/releases/RELEASE_1_3_0.txt 2013-12-14 15:14:16 UTC (rev 7650) @@ -1,4 +1,4 @@ -This is a minor release of bigdata(R). +This is a major release of bigdata(R). Bigdata is a horizontally-scaled, open-source architecture for indexed data with an emphasis on RDF capable of loading 1B triples in under one hour on a 15 node cluster. Bigdata operates in both a single machine mode (Journal), highly available replication cluster mode (HAJournalServer), and a horizontally sharded cluster mode (BigdataFederation). The Journal provides fast scalable ACID indexed storage for very large data sets, up to 50 billion triples / quads. The HAJournalServer adds replication, online backup, horizontal scaling of query, and high availability. The federation provides fast scalable shard-wise parallel indexed storage using dynamic sharding and shard-wise ACID updates and incremental cluster size growth. Both platforms support fully concurrent readers with snapshot isolation. @@ -8,6 +8,8 @@ Starting with the 1.0.0 release, we offer a WAR artifact [8] for easy installation of the single machine RDF database. For custom development and cluster installations we recommend checking out the code from SVN using the tag for this release. The code will build automatically under eclipse. You can also build the code using the ant script. The cluster installer requires the use of the ant script. +Starting with the 1.3.0 release, we offer a tarball artifact [10] for easy installation of the HA replication cluster. + You can download the WAR from: http://sourceforge.net/projects/bigdata/ @@ -18,16 +20,17 @@ New features: -- SPARQL 1.1 Update Extensions (SPARQL UPDATE for named solution sets). See https://sourceforge.net/apps/mediawiki/bigdata/index.php?title=SPARQL_Update for more information. -- SPARQL 1.1 Property Paths. -- Remote Java client for Multi-Tenancy extensions NanoSparqlServer -- Sesame 2.6.10 dependency +- High availability. +- Property Path performance enhancements. +- Reification Done Right (alpha). +- RDF Graph Mining API (alpha). - Plus numerous other bug fixes and performance enhancements. Feature summary: +- Highly Available Replication Clusters (HAJournalServer [10]) - Single machine data storage to ~50B triples/quads (RWStore); -- Clustered data storage is essentially unlimited; +- Clustered data storage is essentially unlimited (BigdataFederation); - Simple embedded and/or webapp deployment (NanoSparqlServer); - Triples, quads, or triples with provenance (SIDs); - Fast RDFS+ inference and truth maintenance; @@ -37,14 +40,64 @@ Road map [3]: -- High availability for the journal and the cluster. -- Runtime Query Optimizer for Analytic Query mode; and -- Simplified deployment, configuration, and administration for clusters. +- Runtime Query Optimizer for Analytic Query mode; +- Performance optimization for scale-out clusters; and +- Simplified deployment, configuration, and administration for scale-out clusters. Change log: Note: Versions with (*) MAY require data migration. For details, see [9]. +1.3.0: + +- http://sourceforge.net/apps/trac/bigdata/ticket/530 (Journal HA) +- http://sourceforge.net/apps/trac/bigdata/ticket/621 (Coalesce write cache records and install reads in cache) +- http://sourceforge.net/apps/trac/bigdata/ticket/623 (HA TXS) +- http://sourceforge.net/apps/trac/bigdata/ticket/639 (Remove triple-buffering in RWStore) +- http://sourceforge.net/apps/trac/bigdata/ticket/645 (HA backup) +- http://sourceforge.net/apps/trac/bigdata/ticket/646 (River not compatible with newer 1.6.0 and 1.7.0 JVMs) +- http://sourceforge.net/apps/trac/bigdata/ticket/652 (Compress write cache blocks for replication and in HALogs) +- http://sourceforge.net/apps/trac/bigdata/ticket/674 (WCS write cache compaction causes errors in RWS postHACommit()) +- http://sourceforge.net/apps/trac/bigdata/ticket/678 (DGC Thread and Open File Leaks: sendHALogForWriteSet()) +- http://sourceforge.net/apps/trac/bigdata/ticket/679 (HAJournalServer can not restart due to logically empty log file) +- http://sourceforge.net/apps/trac/bigdata/ticket/681 (HAJournalServer deadlock: pipelineRemove() and getLeaderId()) +- http://sourceforge.net/apps/trac/bigdata/ticket/684 (Optimization with skos altLabel) +- http://sourceforge.net/apps/trac/bigdata/ticket/686 (Consensus protocol does not detect clock skew correctly) +- http://sourceforge.net/apps/trac/bigdata/ticket/687 (HAJournalServer Cache not populated) +- http://sourceforge.net/apps/trac/bigdata/ticket/689 (Missing URL encoding in RemoteRepositoryManager) +- http://sourceforge.net/apps/trac/bigdata/ticket/691 (Failed to re-interrupt thread in HAJournalServer) +- http://sourceforge.net/apps/trac/bigdata/ticket/693 (OneOrMorePath SPARQL property path expression ignored) +- http://sourceforge.net/apps/trac/bigdata/ticket/694 (Transparently cancel update/query in RemoteRepository) +- http://sourceforge.net/apps/trac/bigdata/ticket/695 (HAJournalServer reports "follower" but is in SeekConsensus and is not participating in commits.) +- http://sourceforge.net/apps/trac/bigdata/ticket/696 (Incorrect HttpEntity consuming in RemoteRepositoryManager) +- http://sourceforge.net/apps/trac/bigdata/ticket/701 (Problems in BackgroundTupleResult) +- http://sourceforge.net/apps/trac/bigdata/ticket/704 (ask does not return json) +- http://sourceforge.net/apps/trac/bigdata/ticket/705 (Race between QueryEngine.putIfAbsent() and shutdownNow()) +- http://sourceforge.net/apps/trac/bigdata/ticket/706 (MultiSourceSequentialCloseableIterator.nextSource() can throw NPE) +- http://sourceforge.net/apps/trac/bigdata/ticket/707 (BlockingBuffer.close() does not unblock threads) +- http://sourceforge.net/apps/trac/bigdata/ticket/708 (BIND heisenbug) +- http://sourceforge.net/apps/trac/bigdata/ticket/711 (sparql protocol: mime type application/sparql-query) +- http://sourceforge.net/apps/trac/bigdata/ticket/712 (SELECT ?x { OPTIONAL { ?x eg:doesNotExist eg:doesNotExist } } incorrect) +- http://sourceforge.net/apps/trac/bigdata/ticket/715 (Interrupt of thread submitting a query for evaluation does not always terminate the AbstractRunningQuery) +- http://sourceforge.net/apps/trac/bigdata/ticket/716 (Verify that IRunningQuery instances (and nested queries) are correctly cancelled when interrupted) +- http://sourceforge.net/apps/trac/bigdata/ticket/720 (HA3 simultaneous service start failure) +- http://sourceforge.net/apps/trac/bigdata/ticket/723 (HA asynchronous tasks must be canceled when invariants are changed) +- http://sourceforge.net/apps/trac/bigdata/ticket/726 (Logically empty HALog for committed transaction) +- http://sourceforge.net/apps/trac/bigdata/ticket/728 (Refactor to create HAClient) +- http://sourceforge.net/apps/trac/bigdata/ticket/731 (CBD and Update leads to 500 status code) +- http://sourceforge.net/apps/trac/bigdata/ticket/732 (describe statement limit does not work) +- http://sourceforge.net/apps/trac/bigdata/ticket/734 (two property paths interfere) +- http://sourceforge.net/apps/trac/bigdata/ticket/736 (MIN() malfunction) +- http://sourceforge.net/apps/trac/bigdata/ticket/743 (AbstractTripleStore.destroy() does not filter for correct prefix) +- http://sourceforge.net/apps/trac/bigdata/ticket/754 (Failure to setup SERVICE hook and changeLog for Unisolated and Read/Write connections) +- http://sourceforge.net/apps/trac/bigdata/ticket/755 (Concurrent QuorumActors can interfere leading to failure to progress) +- http://sourceforge.net/apps/trac/bigdata/ticket/718 (HAJournalServer needs to handle ZK client connection loss) +- http://sourceforge.net/apps/trac/bigdata/ticket/760 (Code review on 2-phase commit protocol) +- http://sourceforge.net/apps/trac/bigdata/ticket/764 (RESYNC failure (HA)) +- http://sourceforge.net/apps/trac/bigdata/ticket/772 (Query timeout only checked at operator start/stop.) +- http://sourceforge.net/apps/trac/bigdata/ticket/777 (ConcurrentModificationException in ASTComplexOptionalOptimizer) +- http://sourceforge.net/apps/trac/bigdata/ticket/783 (Operator Alerts (HA)) + 1.2.4: - http://sourceforge.net/apps/trac/bigdata/ticket/777 (ConcurrentModificationException in ASTComplexOptionalOptimizer) @@ -293,6 +346,7 @@ [7] http://www.systap.com/bigdata.htm [8] http://sourceforge.net/projects/bigdata/files/bigdata/ [9] http://sourceforge.net/apps/mediawiki/bigdata/index.php?title=DataMigration +[10] http://sourceforge.net/apps/mediawiki/bigdata/index.php?title=HAJournalServer About bigdata: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-14 13:24:53
|
Revision: 7649 http://bigdata.svn.sourceforge.net/bigdata/?rev=7649&view=rev Author: thompsonbry Date: 2013-12-14 13:24:46 +0000 (Sat, 14 Dec 2013) Log Message: ----------- I have added performance counters for the "Volumes" for the HAJournal. These are reported out through the web interface on the /counters page as illustrated below. While in this example, all storage for the HAJournalServer is on the same volume, the counters are reported for each relevant directory. Using this REST-ful API, it is trivial to create a nagios or similar integration that monitors the Volumes under /counters (or that directly accesses http://localhost:8090/counters?path=%2FVolumes). {{{ / Volumes ... / Volumes / Data Volume Bytes Available 59,861,090,304 / Volumes / HALog Volume Bytes Available 59,861,090,304 / Volumes / Service Volume Bytes Available 59,861,090,304 / Volumes / Snapshot Volume Bytes Available 59,861,090,304 / Volumes / Temp Volume Bytes Available 59,861,090,304 }}} Exposing these performance counters does not directly force the service into an OPERATOR state. However, it does allow flexible and configurable frameworks (such as nagios) to alert an operator in time to take corrective action. See #783 Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-12-13 21:05:10 UTC (rev 7648) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-12-14 13:24:46 UTC (rev 7649) @@ -54,6 +54,8 @@ import org.apache.log4j.Logger; import com.bigdata.concurrent.FutureTaskInvariantMon; +import com.bigdata.counters.CounterSet; +import com.bigdata.counters.Instrument; import com.bigdata.ha.HAGlue; import com.bigdata.ha.QuorumService; import com.bigdata.ha.RunState; @@ -94,6 +96,7 @@ import com.bigdata.journal.jini.ha.HAJournalServer.NSSConfigurationOptions; import com.bigdata.journal.jini.ha.HAJournalServer.RunStateEnum; import com.bigdata.quorum.Quorum; +import com.bigdata.resources.StoreManager.IStoreManagerCounters; import com.bigdata.service.AbstractTransactionService; import com.bigdata.service.jini.JiniClient; import com.bigdata.service.jini.RemoteAdministrable; @@ -729,6 +732,208 @@ // } /** + * Interface for additional performance counters exposed by the + * {@link HAJournal}. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + public interface IHAJournalCounters { + + /** + * The namespace for counters pertaining to free space on the various + * volumes. + */ + String Volumes = "Volumes"; + +// /** +// * The configured service directory. +// */ +// String ServiceDir = "ServiceDir"; +// +// /** +// * The configured data directory (the directory in which the Journal +// * file is stored). +// */ +// String DataDir = "DataDir"; +// +// /** +// * The configured HALog directory. +// */ +// String HALogDir = "HALogDir"; +// +// /** +// * The configured Snapshot directory. +// */ +// String SnapshotDir = "ShapshotDir"; +// +// /** +// * The configured tmp directory. +// */ +// String TmpDir = "TmpDir"; + + /** + * The #of bytes available on the disk volume on which the service + * directory is located. + * + * @see HAJournalServer#getServiceDir() + */ + String ServiceDirBytesAvailable = "Service Volume Bytes Available"; + + /** + * The #of bytes available on the disk volume on which the data + * directory is located (the directory in which the Journal file + * is stored). + * + * @see Journal#getDataDir() + */ + String DataDirBytesAvailable = "Data Volume Bytes Available"; + + /** + * The #of bytes available on the disk volume on which the HALog + * directory is located. + * + * @see HALogNexus#getHALogDir() + */ + String HALogDirBytesAvailable = "HALog Volume Bytes Available"; + + /** + * The #of bytes available on the disk volume on which the snapshot + * directory is located. + * + * @see SnapshotManager#getSnapshotDir() + */ + String SnapshotDirBytesAvailable = "Snapshot Volume Bytes Available"; + + /** + * The #of bytes available on the disk volume on which the temporary + * directory is located. + * + * @see Journal#getTmpDir() + */ + String TmpDirBytesAvailable = "Temp Volume Bytes Available"; + + } + + /** + * {@inheritDoc} + * <p> + * Overridden to attach additional performance counters. + * + * @see IHAJournalCounters + */ + @Override + public CounterSet getCounters() { + + final CounterSet root = super.getCounters(); + { + + final CounterSet tmp = root.makePath(IHAJournalCounters.Volumes); + + tmp.addCounter(IHAJournalCounters.ServiceDirBytesAvailable, + new Instrument<Long>() { + @Override + public void sample() { + setValue(getFreeSpace(server.getServiceDir())); + } + }); + + tmp.addCounter(IHAJournalCounters.DataDirBytesAvailable, + new Instrument<Long>() { + @Override + public void sample() { + final File dir = getDataDir(); + if (dir != null) { + // Note: in case Journal is not durable. + setValue(getFreeSpace(dir)); + } + } + }); + + tmp.addCounter(IHAJournalCounters.HALogDirBytesAvailable, + new Instrument<Long>() { + @Override + public void sample() { + setValue(getFreeSpace(getHALogNexus().getHALogDir())); + } + }); + + tmp.addCounter(IHAJournalCounters.SnapshotDirBytesAvailable, + new Instrument<Long>() { + @Override + public void sample() { + setValue(getFreeSpace(getSnapshotManager() + .getSnapshotDir())); + } + }); + + tmp.addCounter(IStoreManagerCounters.TmpDirBytesAvailable, + new Instrument<Long>() { + @Override + public void sample() { + setValue(getFreeSpace(getTmpDir())); + } + }); + + } + + return root; + + } + + /** + * Return the free space in bytes on the volume hosting some directory. + * + * @param dir + * A directory hosted on some volume. + * + * @return The #of bytes of free space remaining for the volume hosting the + * directory -or- <code>-1L</code> if the free space could not be + * determined. + */ + /* + * Note: This was written using Apache FileSystemUtil originally. That would + * shell out "df" under un*x. Unfortunately, shelling out a child process + * requires a commitment from the OS to support a process with as much + * process space as the parent. For the data service, that is a lot of RAM. + * In general, the O/S allows "over committment" of the available swap + * space, but you can run out of swap and then you have a problem. If the + * host was configured with scanty swap, then this problem could be + * triggered very easily and would show up as "Could not allocate memory". + * + * See http://forums.sun.com/thread.jspa?messageID=9834041#9834041 + */ + static protected long getFreeSpace(final File dir) { + + try { + + if(!dir.exists()) { + + return -1; + + } + + /* + * Note: This return 0L if there is no free space or if the File + * does not "name" a partition in the file system semantics. That + * is why we check dir.exists() above. + */ + + return dir.getUsableSpace(); + + } catch(Throwable t) { + + log.error("Could not get free space: dir=" + dir + " : " + + t, t); + + // the error is logger and ignored. + return -1L; + + } + + } + + /** * Extended implementation supports RMI. */ protected class HAGlueService extends BasicHA implements This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-13 21:05:16
|
Revision: 7648 http://bigdata.svn.sourceforge.net/bigdata/?rev=7648&view=rev Author: thompsonbry Date: 2013-12-13 21:05:10 +0000 (Fri, 13 Dec 2013) Log Message: ----------- comment on purgeHA() Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-12-13 21:01:07 UTC (rev 7647) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-12-13 21:05:10 UTC (rev 7648) @@ -4181,10 +4181,17 @@ * Halt operation. * * Note: This is not an error, but we can not remove - * snapshots or HALogs if this invariant is violated. This - * is because a leader fail could then cause the - * IRestorePolicy to be violated if a service that lacked - * some HALogs was elected as the leader. + * snapshots or HALogs if this invariant is violated. + * + * Note: We do not permit HALog files to be purged if the + * quorum is not fully met. This is done in order to prevent + * a situation a leader would not have sufficient log files + * on hand to restore the failed service. If this were to + * occur, then the failed service would have to undergo a + * disaster rebuild rather than simply resynchronizing from + * the leader. Hence, HALog files are NOT purged unless the + * quorum is fully met (all services for its replication + * count are joined with the met quorum). */ return; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-13 21:01:13
|
Revision: 7647 http://bigdata.svn.sourceforge.net/bigdata/?rev=7647&view=rev Author: thompsonbry Date: 2013-12-13 21:01:07 +0000 (Fri, 13 Dec 2013) Log Message: ----------- comment on purgeLogs() Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-12-13 20:55:50 UTC (rev 7646) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-12-13 21:01:07 UTC (rev 7647) @@ -4181,7 +4181,10 @@ * Halt operation. * * Note: This is not an error, but we can not remove - * snapshots or HALogs if this invariant is violated. + * snapshots or HALogs if this invariant is violated. This + * is because a leader fail could then cause the + * IRestorePolicy to be violated if a service that lacked + * some HALogs was elected as the leader. */ return; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-13 20:55:56
|
Revision: 7646 http://bigdata.svn.sourceforge.net/bigdata/?rev=7646&view=rev Author: thompsonbry Date: 2013-12-13 20:55:50 +0000 (Fri, 13 Dec 2013) Log Message: ----------- Added option to force a service into the error state. The service will then automatically attempt to recover from the error state. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java 2013-12-13 20:40:29 UTC (rev 7645) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java 2013-12-13 20:55:50 UTC (rev 7646) @@ -92,6 +92,18 @@ */ static final String REBUILD = "rebuild"; + /** + * Force this service into the error state. It will automatically attempt to + * recover from the error state. This basically "kicks" the service. + * + * @see QuorumService#enterErrorState() + * + * TODO Move this declaration to {@link StatusServlet} once we are done + * reconciling between the 1.2.x maintenance branch and the READ_CACHE + * branch. + */ + static final String ERROR = "error"; + final private IIndexManager indexManager; public HAStatusServletUtil(final IIndexManager indexManager) { @@ -586,6 +598,23 @@ } } + + if (quorumService != null) { + + /* + * Force the service into the "ERROR" state. It will automatically + * attempt to recover. + */ + + final String val = req.getParameter(HAStatusServletUtil.ERROR); + + if (val != null) { + + quorumService.enterErrorState(); + + } + + } /* * Display the NSS port, host, and leader/follower/not-joined This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-13 20:40:36
|
Revision: 7645 http://bigdata.svn.sourceforge.net/bigdata/?rev=7645&view=rev Author: thompsonbry Date: 2013-12-13 20:40:29 +0000 (Fri, 13 Dec 2013) Log Message: ----------- fixed unit test that was leaking files. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/io/TestNameAndExtensionFilter.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/io/TestNameAndExtensionFilter.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/io/TestNameAndExtensionFilter.java 2013-12-13 18:55:30 UTC (rev 7644) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/io/TestNameAndExtensionFilter.java 2013-12-13 20:40:29 UTC (rev 7645) @@ -53,7 +53,7 @@ /** * @param name */ - public TestNameAndExtensionFilter(String name) { + public TestNameAndExtensionFilter(final String name) { super(name); } @@ -64,85 +64,93 @@ * @param expected * @param actual */ - public void assertSameFiles( File[] expected, File[] actual ) - { - - if( expected == null ) { - - throw new AssertionError( "expected is null."); - + private void assertSameFiles(final File[] expected, final File[] actual) { + + if (expected == null) { + + throw new AssertionError("expected is null."); + } - if( actual == null ) { - - fail( "actual is null."); - + if (actual == null) { + + fail("actual is null."); + } - assertEquals( "#of files", expected.length, actual.length ); - + assertEquals("#of files", expected.length, actual.length); + // Insert the expected files into a set. - Set expectedSet = new HashSet(); - - for( int i=0; i<expected.length; i++ ) { + final Set<String> expectedSet = new HashSet<String>(); - File expectedFile = expected[ i ]; - - if( expectedFile == null ) { + for (int i = 0; i < expected.length; i++) { - throw new AssertionError( "expected file is null at index="+i ); - + final File expectedFile = expected[i]; + + if (expectedFile == null) { + + throw new AssertionError("expected file is null at index=" + i); + } - if( ! expectedSet.add( expectedFile.toString() ) ) { - - throw new AssertionError( "expected File[] contains duplicate: expected["+i+"]="+expectedFile ); - + if (!expectedSet.add(expectedFile.toString())) { + + throw new AssertionError( + "expected File[] contains duplicate: expected[" + i + + "]=" + expectedFile); + } - + } /* * Verify that each actual file occurs in the expectedSet using a * selection without replacement policy. */ - - for( int i=0; i<actual.length; i++ ) { - - File actualFile = actual[ i ]; - - if( actualFile == null ) { - - fail( "actual file is null at index="+i ); - + + for (int i = 0; i < actual.length; i++) { + + final File actualFile = actual[i]; + + if (actualFile == null) { + + fail("actual file is null at index=" + i); + } - - if( ! expectedSet.remove( actual[ i ].toString() ) ) { - - fail( "actual file="+actualFile+" at index="+i+" was not found in expected files." ); - + + if (!expectedSet.remove(actual[i].toString())) { + + fail("actual file=" + actualFile + " at index=" + i + + " was not found in expected files."); + } - + } - + } - + /** * Test verifies that no files are found using a guarenteed unique basename. */ - public void test_filter_001() throws IOException - { - - final File basefile = File.createTempFile(getName(),"-test"); - basefile.deleteOnExit(); - - final String basename = basefile.toString(); - System.err.println( "basename="+basename ); - - NameAndExtensionFilter logFilter = new NameAndExtensionFilter( basename, ".log" ); - - assertSameFiles( new File[]{}, logFilter.getFiles() ); - + public void test_filter_001() throws IOException { + + final File basefile = File.createTempFile(getName(), "-test"); + + try { + + final String basename = basefile.toString(); + + final NameAndExtensionFilter logFilter = new NameAndExtensionFilter( + basename, ".log"); + + assertSameFiles(new File[] {}, logFilter.getFiles()); + + } finally { + + basefile.delete(); + + } + } /** @@ -150,33 +158,48 @@ */ public void test_filter_002() throws IOException { - int N = 100; - - final File logBaseFile = File.createTempFile(getName(),"-test"); - logBaseFile.deleteOnExit(); - - final String basename = logBaseFile.toString(); - System.err.println( "basename="+basename ); - - NameAndExtensionFilter logFilter = new NameAndExtensionFilter( basename, ".log" ); + final int N = 100; - Vector v = new Vector( N ); - - for( int i=0; i<N; i++ ) { + final Vector<File> v = new Vector<File>(N); - File logFile = new File( basename+"."+i+".log" ); - logFile.deleteOnExit(); - logFile.createNewFile(); -// System.err.println( "logFile="+logFile ); - - v.add( logFile ); - + final File logBaseFile = File.createTempFile(getName(), "-test"); + // logBaseFile.deleteOnExit(); + + try { + + final String basename = logBaseFile.toString(); + // System.err.println( "basename="+basename ); + + final NameAndExtensionFilter logFilter = new NameAndExtensionFilter( + basename, ".log"); + + for (int i = 0; i < N; i++) { + + final File logFile = new File(basename + "." + i + ".log"); + // logFile.deleteOnExit(); + logFile.createNewFile(); + // System.err.println( "logFile="+logFile ); + + v.add(logFile); + + } + + final File[] expectedFiles = (File[]) v.toArray(new File[] {}); + + assertSameFiles(expectedFiles, logFilter.getFiles()); + + } finally { + + logBaseFile.delete(); + + for (File f : v) { + + f.delete(); + + } + } - File[] expectedFiles = (File[]) v.toArray(new File[]{}); - - assertSameFiles( expectedFiles, logFilter.getFiles() ); - } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-13 18:55:37
|
Revision: 7644 http://bigdata.svn.sourceforge.net/bigdata/?rev=7644&view=rev Author: thompsonbry Date: 2013-12-13 18:55:30 +0000 (Fri, 13 Dec 2013) Log Message: ----------- also modified the receiveAndReplicate code path to wait until both futures are done before dropping out of the while() loop. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 16:59:23 UTC (rev 7643) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 18:55:30 UTC (rev 7644) @@ -1756,7 +1756,7 @@ * local Future and only check the remote Future every * second. Timeouts are ignored during this loop. */ - while (!futSnd.isDone() && !futRec.isDone()) { + while (!futSnd.isDone() || !futRec.isDone()) { /* * Make sure leader's quorum token remains valid for * ALL writes. @@ -1771,6 +1771,8 @@ } catch (TimeoutException ignore) { } } + + // Note: Both futures are DONE at this point! futSnd.get(); futRec.get(); @@ -2121,7 +2123,7 @@ * local Future and only check the remote Future every * second. Timeouts are ignored during this loop. */ - while (!futRec.isDone() && !futRep.isDone()) { + while (!futRec.isDone() || !futRep.isDone()) { /* * The token must remain valid, even if this service * is not joined with the met quorum. If fact, @@ -2139,6 +2141,8 @@ } catch (TimeoutException ignore) { } } + + // Note: Both futures are DONE at this point! futRec.get(); futRep.get(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-13 16:59:31
|
Revision: 7643 http://bigdata.svn.sourceforge.net/bigdata/?rev=7643&view=rev Author: thompsonbry Date: 2013-12-13 16:59:23 +0000 (Fri, 13 Dec 2013) Log Message: ----------- added log when we *will* run forceRemoveService() as well as when it actually runs. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-12-13 16:54:30 UTC (rev 7642) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-12-13 16:59:23 UTC (rev 7643) @@ -2349,6 +2349,9 @@ */ @Override final public void forceRemoveService(final UUID psid) { + if (log.isInfoEnabled()) + log.info("Will force remove of service" + ": thisService=" + + serviceId + ", otherServiceId=" + psid); runActorTask(new ForceRemoveServiceTask(psid)); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-13 16:54:38
|
Revision: 7642 http://bigdata.svn.sourceforge.net/bigdata/?rev=7642&view=rev Author: thompsonbry Date: 2013-12-13 16:54:30 +0000 (Fri, 13 Dec 2013) Log Message: ----------- Sync to Martyn and CI changes to the error handling for pipeline replication and some new unit tests. See #724 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineDownstreamChange.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineUpstreamChange.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineChangeException.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineException.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/ImmediateDownstreamReplicationException.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/NestedPipelineException.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineImmediateDownstreamReplicationException.java Removed Paths: ------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java Deleted: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -1,56 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.ha; - -import java.util.UUID; - -/** - * PipelineException is thrown from RMI calls to communicate - * the root cause of a pipeline problem. The caller is then able - * to take action: for example to remove the problem service - * from the quorum. - */ -public class PipelineException extends RuntimeException { - - /** - * Generated ID - */ - private static final long serialVersionUID = 8019938954269914574L; - - /** The UUID of the service that could not be reached. */ - private final UUID serviceId; - - public PipelineException(final UUID serviceId, final Throwable t) { - super(t); - - this.serviceId = serviceId; - } - - /** Return the UUID of the service that could not be reached. */ - public UUID getProblemServiceId() { - return serviceId; - } - -} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -57,6 +57,9 @@ import com.bigdata.ha.pipeline.HAReceiveService; import com.bigdata.ha.pipeline.HAReceiveService.IHAReceiveCallback; import com.bigdata.ha.pipeline.HASendService; +import com.bigdata.ha.pipeline.ImmediateDownstreamReplicationException; +import com.bigdata.ha.pipeline.NestedPipelineException; +import com.bigdata.ha.pipeline.PipelineImmediateDownstreamReplicationException; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.IBufferAccess; import com.bigdata.quorum.QCE; @@ -1729,91 +1732,147 @@ } private void doRunWithLock() throws InterruptedException, - ExecutionException, IOException { + ExecutionException, IOException { - try { - // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService - .send(b, snd.getMarker()); + // Get Future for send() outcome on local service. + final Future<Void> futSnd = sendService.send(b, snd.getMarker()); + try { + try { - try { + // Get Future for receive outcome on the remote service + // (RMI). + final Future<Void> futRec; + try { + futRec = downstream.service.receiveAndReplicate(req, + snd, msg); + } catch (IOException ex) { // RMI error. + throw new ImmediateDownstreamReplicationException(ex); + } - // Get Future for receive outcome on the remote service - // (RMI). - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, snd, msg); + try { - try { + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futSnd.isDone() && !futRec.isDone()) { + /* + * Make sure leader's quorum token remains valid for + * ALL writes. + */ + member.assertLeader(token); + try { + futSnd.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futRec.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futSnd.get(); + futRec.get(); - /* - * Await the Futures, but spend more time waiting on the - * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * Make sure leader's quorum token remains valid for - * ALL writes. - */ - member.assertLeader(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + } finally { + if (!futRec.isDone()) { + // cancel remote Future unless done. + futRec.cancel(true/* mayInterruptIfRunning */); + } + } - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec.cancel(true/* mayInterruptIfRunning */); - } - } + } finally { + // cancel the local Future. + futSnd.cancel(true/* mayInterruptIfRunning */); + } + } catch (Throwable t) { + launderPipelineException(true/* isLeader */, member, t); + } + } + + } - } finally { - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } + /** + * Launder an exception thrown during pipeline replication. + * @param isLeader + * @param member + * @param t + */ + static private void launderPipelineException(final boolean isLeader, + final QuorumMember<?> member, final Throwable t) { - } catch (Throwable t) { - // check inner cause for downstream PipelineException - final PipelineException pe = (PipelineException) InnerCause - .getInnerCause(t, PipelineException.class); - final UUID problemService; - if (pe != null) { - // throw pe; // throw it upstream - already should have been - // handled - problemService = pe.getProblemServiceId(); - } else { - final UUID[] priorAndNext = member.getQuorum() - .getPipelinePriorAndNext(member.getServiceId()); - problemService = priorAndNext[1]; - } + log.warn("isLeader=" + isLeader + ", t=" + t, t); + + /* + * When non-null, some service downstream of this service had a problem + * replicating to a follower. + */ + final PipelineImmediateDownstreamReplicationException remoteCause = (PipelineImmediateDownstreamReplicationException) InnerCause.getInnerCause(t, + PipelineImmediateDownstreamReplicationException.class); - // determine next pipeline service id - log.warn("Problem with downstream service: " + problemService, - t); + /* + * When non-null, this service has a problem with replication to its + * immediate follower. + * + * Note: if [remoteCause!=null], then we DO NOT look for a direct cause + * (since there will be one wrapped up in the exception trace for some + * remote service rather than for this service). + */ + final ImmediateDownstreamReplicationException directCause = remoteCause == null ? (ImmediateDownstreamReplicationException) InnerCause + .getInnerCause(t, + ImmediateDownstreamReplicationException.class) + : null; - // Carry out remedial work directly - BAD - log.error("Really need to remove service " + problemService); + final UUID thisService = member.getServiceId(); - try { - member.getActor().forceRemoveService(problemService); - } catch (Exception e) { - log.warn("Problem on node removal", e); + final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext( + member.getServiceId()); - throw new RuntimeException(e); - } + if (isLeader) { + + try { + + if (directCause != null) { - throw new PipelineException(problemService, t); + member.getActor().forceRemoveService(priorAndNext[1]); - } - } + } else if (remoteCause != null) { + + final UUID problemService = remoteCause.getProblemServiceId(); + + member.getActor().forceRemoveService(problemService); + + } else { + + // Do not remove anybody. + + } + + } catch (Exception e) { + + log.error("Problem on node removal", e); + + throw new RuntimeException(e); + + } + + } + + if (directCause != null) { + + throw new PipelineImmediateDownstreamReplicationException( + thisService, priorAndNext, t); + + } else if (remoteCause != null) { + + throw new NestedPipelineException(t); + + } else { + + throw new RuntimeException(t); + + } + } /** @@ -1829,10 +1888,11 @@ /* * FIXME We should probably pass the quorum token through from the - * leader for ALL replicated writes. This uses the leader's quorum token - * when it is available (for a live write) and otherwise uses the - * current quorum token (for historical writes, since we are not - * providing the leader's token in this case). + * leader for ALL replicated writes [this is now done by the + * IHASendState but the code is not really using that data yet]. This + * uses the leader's quorum token when it is available (for a live + * write) and otherwise uses the current quorum token (for historical + * writes, since we are not providing the leader's token in this case). */ final long token = req == null ? msg.getQuorumToken() : member .getQuorum().token(); @@ -1966,8 +2026,8 @@ final HAMessageWrapper wrappedMsg = new HAMessageWrapper( req, snd, msg); - // Get Future for send() outcome on local service. - final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, + // Get Future for receive() outcome on local service. + final Future<Void> futRec = receiveService.receiveData(wrappedMsg, b); try { @@ -1978,7 +2038,7 @@ // Verify token remains valid. member.getQuorum().assertQuorum(token); // Await the future. - return futSnd.get(1000, TimeUnit.MILLISECONDS); + return futRec.get(1000, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { // Timeout. Ignore and retry loop. Thread.sleep(100/* ms */); @@ -1989,7 +2049,7 @@ } finally { // cancel the local Future. - futSnd.cancel(true/*mayInterruptIfRunning*/); + futRec.cancel(true/*mayInterruptIfRunning*/); } @@ -2031,89 +2091,75 @@ } @Override - public Void call() throws Exception { + public Void call() throws Exception { - // wrap the messages together. - final HAMessageWrapper wrappedMsg = new HAMessageWrapper(req, snd, - msg); + // wrap the messages together. + final HAMessageWrapper wrappedMsg = new HAMessageWrapper( + req, snd, msg); - // Get Future for send() outcome on local service. - final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, - b); + // Get Future for receive() outcome on local service. + final Future<Void> futRec = receiveService.receiveData(wrappedMsg, + b); - try { - try { + try { + try { - // Get future for receive outcome on the remote - // service. - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, snd, msg); + // Get Future for receive outcome on the remote service + // (RMI). + final Future<Void> futRep; + try { + futRep = downstream.service.receiveAndReplicate(req, + snd, msg); + } catch (IOException ex) { // RMI error. + throw new ImmediateDownstreamReplicationException(ex); + } - try { + try { - /* - * Await the Futures, but spend more time waiting on the - * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * The token must remain valid, even if this service - * is not joined with the met quorum. If fact, - * services MUST replicate writes regardless of - * whether or not they are joined with the met - * quorum, but only while there is a met quorum. - */ - member.getQuorum().assertQuorum(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futRec.isDone() && !futRep.isDone()) { + /* + * The token must remain valid, even if this service + * is not joined with the met quorum. If fact, + * services MUST replicate writes regardless of + * whether or not they are joined with the met + * quorum, but only while there is a met quorum. + */ + member.getQuorum().assertQuorum(token); + try { + futRec.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futRep.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futRec.get(); + futRep.get(); - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec.cancel(true/* mayInterruptIfRunning */); - } - } + } finally { + if (!futRep.isDone()) { + // cancel remote Future unless done. + futRep.cancel(true/* mayInterruptIfRunning */); + } + } - } finally { - // Is it possible that this cancel conflicts with throwing - // the PipelineException? - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } - } catch (Throwable t) { - // determine the problem service, which may be further downstream - // if the Throwable contains a PipelineException innerCause - final PipelineException pe = (PipelineException) InnerCause - .getInnerCause(t, PipelineException.class); - final UUID problemService; - if (pe != null) { - problemService = pe.getProblemServiceId(); - } else { - final UUID[] priorAndNext = member.getQuorum() - .getPipelinePriorAndNext(member.getServiceId()); - problemService = priorAndNext[1]; - } + } finally { + // cancel the local Future. + futRec.cancel(true/* mayInterruptIfRunning */); + } + } catch (Throwable t) { + launderPipelineException(false/* isLeader */, member, t); + } + // done + return null; + } - log.warn("Problem with downstream service: " + problemService, - t); - - throw new PipelineException(problemService, t); - } - - // done - return null; - } - } /** Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineChangeException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineChangeException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineChangeException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -0,0 +1,56 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Jun 7, 2013. + */ +package com.bigdata.ha.pipeline; + +/** + * A quorum related exception dealing with the write replication pipeline. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +abstract public class AbstractPipelineChangeException extends AbstractPipelineException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public AbstractPipelineChangeException() { + } + + public AbstractPipelineChangeException(String message) { + super(message); + } + + public AbstractPipelineChangeException(Throwable cause) { + super(cause); + } + + public AbstractPipelineChangeException(String message, Throwable cause) { + super(message, cause); + } + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -0,0 +1,55 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.pipeline; + +import com.bigdata.quorum.QuorumException; + +/** + * A quorum related exception dealing with the write replication pipeline. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +abstract public class AbstractPipelineException extends QuorumException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public AbstractPipelineException() { + } + + public AbstractPipelineException(String message) { + super(message); + } + + public AbstractPipelineException(Throwable cause) { + super(cause); + } + + public AbstractPipelineException(String message, Throwable cause) { + super(message, cause); + } + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -763,6 +763,10 @@ * more efficient bulk copy operations from the NIO buffer into a local * byte[] on the Java heap against which we then track the evolving * checksum of the data. + * + * FIXME Why isn't this buffer scoped to the outer HAReceiveService? By + * being an inner class field, we allocate it once per payload + * received.... */ private final byte[] a = new byte[512]; @@ -1118,17 +1122,7 @@ callback.incReceive(message, reads, rdlen, rem); } -// if (downstreamFirstCause == null) { -// try { - forwardReceivedBytes(client, rdlen); -// } catch (ExecutionException ex) { -// log.error( -// "Downstream replication failure" -// + ": will drain payload and then rethrow exception: rootCause=" -// + ex, ex); -// downstreamFirstCause = ex; -// } -// } + forwardReceivedBytes(client, rdlen); } // while(itr.hasNext()) @@ -1154,28 +1148,6 @@ + ", actual=" + (int) chk.getValue()); } -// if (downstreamFirstCause != null) { -// -// /** -// * Replication to the downstream service failed. The payload has -// * been fully drained. This ensures that we do not leave part of -// * the payload in the upstream socket channel. We now wrap and -// * rethrow the root cause of the downstream failure. The leader -// * will handle this by forcing the remove of the downstream -// * service and then re-replicating the payload. -// * -// * @see <a -// * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" -// * > HA wire pulling and sure kill testing </a> -// */ -// -// throw new RuntimeException( -// "Downstream replication failure: msg=" + message -// + ", ex=" + downstreamFirstCause, -// downstreamFirstCause); -// -// } - // Check for termination. client.checkFirstCause(); @@ -1211,6 +1183,7 @@ * * @throws ExecutionException * @throws InterruptedException + * @throws ImmediateDownstreamReplicationException * * @todo Since the downstream writes are against a blocking mode * channel, the receiver on this node runs in sync with the @@ -1222,8 +1195,11 @@ * HA wire pulling and sure kill testing </a> */ private void forwardReceivedBytes(final Client client, final int rdlen) - throws InterruptedException, ExecutionException { + throws InterruptedException, ExecutionException, + ImmediateDownstreamReplicationException { + while (true) { + if (rdlen != 0 && addrNextRef.get() != null) { if (log.isTraceEnabled()) log.trace("Incremental send of " + rdlen + " bytes"); @@ -1265,6 +1241,7 @@ : null).get(); } break; // break out of the inner while loop. + } // while(true) } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -30,6 +30,7 @@ import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -38,6 +39,9 @@ import org.apache.log4j.Logger; +import com.bigdata.util.InnerCause; +import com.bigdata.util.concurrent.Haltable; + /** * A service for sending raw {@link ByteBuffer}s across a socket. This service * supports the HA write pipeline. This service is designed to be paired with an @@ -82,9 +86,6 @@ private static final Logger log = Logger.getLogger(HASendService.class); -// static final byte ACK = 1; -// static final byte NACK = 0; - /** * The Internet socket address of the receiving service. */ @@ -104,6 +105,7 @@ /* * Note: toString() must be thread-safe. */ + @Override public String toString() { return super.toString() + "{addrNext=" + addrNext + "}"; @@ -295,6 +297,9 @@ * @return The {@link Future} which can be used to await the outcome of this * operation. * + * @throws InterruptedException + * @throws ImmediateDownstreamReplicationException + * * @throws IllegalArgumentException * if the buffer is <code>null</code>. * @throws IllegalArgumentException @@ -305,8 +310,10 @@ * @todo throws IOException if the {@link SocketChannel} was not open and * could not be opened. */ - public Future<Void> send(final ByteBuffer buffer, final byte[] marker) { - + public Future<Void> send(final ByteBuffer buffer, final byte[] marker) + throws ImmediateDownstreamReplicationException, + InterruptedException { + if (buffer == null) throw new IllegalArgumentException(); @@ -324,10 +331,64 @@ // reopenChannel(); - return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer(), marker)); + try { + return tmp + .submit(newIncSendTask(buffer.asReadOnlyBuffer(), marker)); + + } catch (Throwable t) { + + launderThrowable(t); + + // make the compiler happy. + throw new AssertionError(); + + } + } + /** + * Test the {@link Throwable} for its root cause and distinguish between a + * root cause with immediate downstream replication, normal termination + * through {@link InterruptedException}, {@link CancellationException}, and + * nested {@link AbstractPipelineException}s thrown by a downstream service. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/724" > HA + * wire pulling and sure kill testing </a> + */ + private void launderThrowable(final Throwable t) + throws InterruptedException, + ImmediateDownstreamReplicationException { + + if (Haltable.isTerminationByInterrupt(t)) { + + // root cause is interrupt or cancellation exception. + throw new RuntimeException(t); + + } + + if (InnerCause.isInnerCause(t, AbstractPipelineException.class)) { + + /* + * The root cause is NOT the inability to replicate to our immediate + * downstream service. Instead, some service (not us) has a problem + * with pipline replication. + */ + + throw new NestedPipelineException(t); + + } + + /* + * We have a problem with replication to our immediate downstream + * service. + */ + + throw new ImmediateDownstreamReplicationException(toString(), t); + + } + + /** * A series of timeouts used when we need to re-open the * {@link SocketChannel}. */ Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/ImmediateDownstreamReplicationException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/ImmediateDownstreamReplicationException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/ImmediateDownstreamReplicationException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -0,0 +1,64 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.pipeline; + +import java.io.IOException; + + +/** + * An exception thrown by the {@link HAReceiveService} when replication to the + * downstream service fails. The root cause can be an RMI error (can not connect + * or connection lost), a socket channel write error (can not connect, + * connection lost, etc.), or even a transitive error from further down the + * write pipeline. This exception DOES NOT decisively indicate the problem is + * with the immediate downstream service. The caller must inspect the root cause + * to make this determination. However, this exception DOES indicate that the + * problem is with downstream replication rather than with the receipt or + * handling of the payload on this service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class ImmediateDownstreamReplicationException extends IOException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public ImmediateDownstreamReplicationException() { + } + + public ImmediateDownstreamReplicationException(String message) { + super(message); + } + + public ImmediateDownstreamReplicationException(Throwable cause) { + super(cause); + } + + public ImmediateDownstreamReplicationException(String message, Throwable cause) { + super(message, cause); + } + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/NestedPipelineException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/NestedPipelineException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/NestedPipelineException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -0,0 +1,55 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.pipeline; + +/** + * An exception that is used to wrap and rethrow a cause whose root cause is + * another {@link AbstractPipelineException}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class NestedPipelineException extends AbstractPipelineException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public NestedPipelineException() { + super(); + } + + public NestedPipelineException(String message, Throwable cause) { + super(message, cause); + } + + public NestedPipelineException(String message) { + super(message); + } + + public NestedPipelineException(Throwable cause) { + super(cause); + } + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineDownstreamChange.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineDownstreamChange.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineDownstreamChange.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -42,7 +42,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ -public class PipelineDownstreamChange extends QuorumException { +public class PipelineDownstreamChange extends AbstractPipelineChangeException { /** * Copied: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineImmediateDownstreamReplicationException.java (from rev 7640, branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java) =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineImmediateDownstreamReplicationException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineImmediateDownstreamReplicationException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -0,0 +1,113 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.ha.pipeline; + +import java.util.UUID; + +/** + * Exception thrown when there is a problem with write replication from a + * service to its downstream service, including a problem with RMI to the + * downstream service or socket level write replication to the downstream + * service. This typed exception enables the leader is then able to take action + * intended to cure the pipeline by forcing the problem service from the quorum. + * The problem service can then attempt to re-enter the quorum. + */ +public class PipelineImmediateDownstreamReplicationException extends + AbstractPipelineException { + + /** + * Generated ID + */ + private static final long serialVersionUID = 8019938954269914574L; + + /** + * The {@link UUID} of the service reporting a problem replicating writes to + * its downstream service. + */ + private final UUID serviceId; + + /** + * The prior and next service {@link UUID}s for the service reporting the + * problem. The problem is with the communication to the "next" service. + */ + private final UUID[] priorAndNext; + + /** + * + * @param serviceId + * The {@link UUID} of the service reporting a problem + * replicating writes to its downstream service. + * @param priorAndNext + * The prior and next service {@link UUID}s for the service + * reporting the problem. The problem is with the communication + * to the "next" service. + * @param t + * The root cause exception. + */ + public PipelineImmediateDownstreamReplicationException(final UUID serviceId, + final UUID[] priorAndNext, final Throwable t) { + + super(t); + + this.serviceId = serviceId; + + this.priorAndNext = priorAndNext; + + } + + /** + * Return the {@link UUID} of the service reporting the problem. This is the + * service that attempted to replicate a payload to the downstream service + * and was unable to replicate the payload due to the reported root cause. + */ + public UUID getReportingServiceId() { + + return serviceId; + + } + + /** + * The prior and next service {@link UUID}s for the service reporting the + * problem. The problem is with the communication to the "next" service. + */ + public UUID[] getPriorAndNext() { + + return priorAndNext; + + } + + /** + * Return the {@link UUID} of the downstream service - the problem is + * reported for the communication channel between the reporting service and + * this downstream service. The downstream service is the service that + * should be forced out of the quorum in order to "fix" the pipeline. + */ + public UUID getProblemServiceId() { + + return priorAndNext[1]; + + } + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineUpstreamChange.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineUpstreamChange.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineUpstreamChange.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -29,7 +29,6 @@ import java.util.concurrent.CancellationException; import com.bigdata.ha.QuorumPipelineImpl; -import com.bigdata.quorum.QuorumException; /** * Exception thrown when the upstream service is changed by a pipeline @@ -42,7 +41,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ -public class PipelineUpstreamChange extends QuorumException { +public class PipelineUpstreamChange extends AbstractPipelineChangeException { /** * Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -114,8 +114,9 @@ * @throws ExecutionException * @throws InterruptedException * @throws TimeoutException + * @throws ImmediateDownstreamReplicationException */ - public void testSimpleExchange() throws InterruptedException, ExecutionException, TimeoutException { + public void testSimpleExchange() throws InterruptedException, ExecutionException, TimeoutException, ImmediateDownstreamReplicationException { final long timeout = 5000;// ms { @@ -149,9 +150,10 @@ * @throws TimeoutException * @throws ExecutionException * @throws InterruptedException + * @throws ImmediateDownstreamReplicationException */ public void testStress() throws TimeoutException, InterruptedException, - ExecutionException { + ExecutionException, ImmediateDownstreamReplicationException { final long timeout = 5000; // ms for (int i = 0; i < 100; i++) { Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -159,7 +159,7 @@ } public void testSimpleExchange() throws InterruptedException, - ExecutionException, TimeoutException { + ExecutionException, TimeoutException, ImmediateDownstreamReplicationException { final long timeout = 5000; // ms final ByteBuffer tst1 = getRandomData(50); @@ -177,7 +177,7 @@ assertEquals(rcv1, rcv2); } - public void testChecksumError() throws InterruptedException, ExecutionException + public void testChecksumError() throws InterruptedException, ExecutionException, ImmediateDownstreamReplicationException { final ByteBuffer tst1 = getRandomData(50); Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -87,6 +87,10 @@ final long token = awaitFullyMetQuorum(); + // await the initial KB commit on all services. + awaitCommitCounter(1L, new HAGlue[] { startup.serverA, startup.serverB, + startup.serverC }); + // start concurrent task loads that continue until fully met final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( token)); @@ -94,7 +98,7 @@ executorService.submit(ft); // allow load head start - Thread.sleep(300/* ms */); + Thread.sleep(2000/* ms */); // Verify load is still running. assertFalse(ft.isDone()); @@ -103,11 +107,10 @@ log.warn("ZOOKEEPER\n" + dumpZoo()); kill(startup.serverC); - - // FIXME: in the face of no implemented error propagation we can explicitly - // tell the leader to remove the killed service! - // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); + // Note: Automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); + awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); // token must remain unchanged to indicate same quorum @@ -153,6 +156,10 @@ final long token = awaitFullyMetQuorum(); + // await the initial KB commit on all services. + awaitCommitCounter(1L, new HAGlue[] { startup.serverA, startup.serverB, + startup.serverC }); + // start concurrent task loads that continue until fully met final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( token)); @@ -160,7 +167,7 @@ executorService.submit(ft); // allow load head start - Thread.sleep(1000/* ms */); + Thread.sleep(2000/* ms */); // Verify load is still running. assertFalse(ft.isDone()); @@ -170,8 +177,8 @@ kill(startup.serverB); - // FIXME: temporary call to explicitly remove the service prior to correct protocol - // startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); + // Note: automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); @@ -311,15 +318,10 @@ // Dump Zookeeper log.warn("ZOOKEEPER\n" + dumpZoo()); - // Note: sure kill is done automatically when we hit the desired point - // in the write replication. - // kill(startup.serverB); + // Note: automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceBId()), true) +// .get(); - // FIXME: temporary call to explicitly remove the service prior to - // correct protocol - // startup.serverA.submit(new ForceRemoveService(getServiceBId()), true) - // .get(); - awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] { startup.serverA, startup.serverC }); @@ -390,15 +392,10 @@ // Dump Zookeeper log.warn("ZOOKEEPER\n" + dumpZoo()); - // Note: sure kill is done automatically when we hit the desired point - // in the write replication. - // kill(startup.serverB); + // Note: automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) +// .get(); - // FIXME: temporary call to explicitly remove the service prior to - // correct protocol - // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) - // .get(); - awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] { startup.serverA, startup.serverB }); @@ -415,11 +412,83 @@ // token must remain unchanged to indicate same quorum assertEquals(token, awaitMetQuorum()); - // TODO We could finally restart the killed service and verify resync. - } /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, C will issue a sure kill + * of B (the 1st follower) when it has received a specified number of bytes + * of data from B. Verify that the LOAD completes successfully with the + * remaining services (A+C). + */ + public void testABC_LiveLoadRemainsMet_C_kills_B_duringIncrementalReplication() + throws Exception { + + // enforce join order + final ABC startup = new ABC(true /* sequential */); + + final long token = awaitFullyMetQuorum(); + + // await the initial KB commit on all services. + awaitCommitCounter(1L, new HAGlue[] { startup.serverA, startup.serverB, + startup.serverC }); + + // Set a trigger to sure kill B once C reaches the specified + // replication point. + ((HAGlueTest) startup.serverC) + .failWriteReplication(new HAProgressListenerKillPID_1( + ((HAGlueTest) startup.serverB).getPID())); + + // start large load. + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // Wait until B is killed. + assertCondition(new Runnable() { + public void run() { + try { + startup.serverB.getWritePipelineAddr(); + fail("B is still running."); + } catch (IOException ex) { + if (log.isInfoEnabled()) + log.info("Expected exception: B is no longer responding: " + + ex); + return; + } + } + }, 20000, TimeUnit.MILLISECONDS); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + // Note: automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) +// .get(); + + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] { startup.serverA, + startup.serverC }); + + // also check members and joined + awaitMembers(new HAGlue[] { startup.serverA, startup.serverC }); + awaitJoined(new HAGlue[] { startup.serverA, startup.serverC }); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + } + + /** * Test where we start A+B+C in strict sequence. Once we observe that all * services have gone through the initial KB create, we do a sudden kill of * B. We then start the live load. This test explores what happens when A is @@ -471,8 +540,6 @@ * C. We then start the live load. This test explores what happens when A * and B are not yet aware that C is dead when the UPDATE operation starts. * - * Can I commit this - * * @throws Exception */ public void testABC_awaitKBCreate_killC_LiveLoadRemainsMet() @@ -495,9 +562,9 @@ executorService.submit(ft); - // FIXME RESYNC_PIPELINE: move into QuorumPipelineImpl. - // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) - // .get(); + // Note: Automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) +// .get(); awaitPipeline(getZKSessionTimeout() + 5000, TimeUnit.MILLISECONDS, new HAGlue[] { startup.serverA, startup.serverB }); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2013-12-13 11:01:43
|
Revision: 7641 http://bigdata.svn.sourceforge.net/bigdata/?rev=7641&view=rev Author: martyncutcher Date: 2013-12-13 11:01:37 +0000 (Fri, 13 Dec 2013) Log Message: ----------- Implement downstream PipelineException check to handle larger quorums than 3 services Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 10:44:56 UTC (rev 7640) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 11:01:37 UTC (rev 7641) @@ -2091,15 +2091,23 @@ futSnd.cancel(true/* mayInterruptIfRunning */); } } catch (Throwable t) { - // determine next pipeline service id - // FIXME: should this check for problem from further downstream for - // quorums with > 3 services? - final UUID[] priorAndNext = member.getQuorum() - .getPipelinePriorAndNext(member.getServiceId()); - log.warn("Problem with downstream service: " + priorAndNext[1], + // determine the problem service, which may be further downstream + // if the Throwable contains a PipelineException innerCause + final PipelineException pe = (PipelineException) InnerCause + .getInnerCause(t, PipelineException.class); + final UUID problemService; + if (pe != null) { + problemService = pe.getProblemServiceId(); + } else { + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + problemService = priorAndNext[1]; + } + + log.warn("Problem with downstream service: " + problemService, t); - throw new PipelineException(priorAndNext[1], t); + throw new PipelineException(problemService, t); } // done This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2013-12-13 10:45:03
|
Revision: 7640 http://bigdata.svn.sourceforge.net/bigdata/?rev=7640&view=rev Author: martyncutcher Date: 2013-12-13 10:44:56 +0000 (Fri, 13 Dec 2013) Log Message: ----------- Intermediate implementation of PipelineException and pipeline propagation leading to forceRemoveService invocation on leader Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java 2013-12-13 10:44:56 UTC (rev 7640) @@ -0,0 +1,56 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.ha; + +import java.util.UUID; + +/** + * PipelineException is thrown from RMI calls to communicate + * the root cause of a pipeline problem. The caller is then able + * to take action: for example to remove the problem service + * from the quorum. + */ +public class PipelineException extends RuntimeException { + + /** + * Generated ID + */ + private static final long serialVersionUID = 8019938954269914574L; + + /** The UUID of the service that could not be reached. */ + private final UUID serviceId; + + public PipelineException(final UUID serviceId, final Throwable t) { + super(t); + + this.serviceId = serviceId; + } + + /** Return the UUID of the service that could not be reached. */ + public UUID getProblemServiceId() { + return serviceId; + } + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 09:07:46 UTC (rev 7639) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 10:44:56 UTC (rev 7640) @@ -1729,56 +1729,91 @@ } private void doRunWithLock() throws InterruptedException, - ExecutionException, IOException { + ExecutionException, IOException { - // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService.send(b, snd.getMarker()); + try { + // Get Future for send() outcome on local service. + final Future<Void> futSnd = sendService + .send(b, snd.getMarker()); - try { + try { - // Get Future for receive outcome on the remote service (RMI). - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, snd, msg); + // Get Future for receive outcome on the remote service + // (RMI). + final Future<Void> futRec = downstream.service + .receiveAndReplicate(req, snd, msg); - try { + try { - /* - * Await the Futures, but spend more time waiting on the - * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * Make sure leader's quorum token remains valid for ALL - * writes. - */ - member.assertLeader(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futSnd.isDone() && !futRec.isDone()) { + /* + * Make sure leader's quorum token remains valid for + * ALL writes. + */ + member.assertLeader(token); + try { + futSnd.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futRec.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futSnd.get(); + futRec.get(); - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec.cancel(true/* mayInterruptIfRunning */); - } - } + } finally { + if (!futRec.isDone()) { + // cancel remote Future unless done. + futRec.cancel(true/* mayInterruptIfRunning */); + } + } - } finally { - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } + } finally { + // cancel the local Future. + futSnd.cancel(true/* mayInterruptIfRunning */); + } - } - + } catch (Throwable t) { + // check inner cause for downstream PipelineException + final PipelineException pe = (PipelineException) InnerCause + .getInnerCause(t, PipelineException.class); + final UUID problemService; + if (pe != null) { + // throw pe; // throw it upstream - already should have been + // handled + problemService = pe.getProblemServiceId(); + } else { + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + problemService = priorAndNext[1]; + } + + // determine next pipeline service id + log.warn("Problem with downstream service: " + problemService, + t); + + // Carry out remedial work directly - BAD + log.error("Really need to remove service " + problemService); + + try { + member.getActor().forceRemoveService(problemService); + } catch (Exception e) { + log.warn("Problem on node removal", e); + + throw new RuntimeException(e); + } + + throw new PipelineException(problemService, t); + + } + } } /** @@ -1996,69 +2031,81 @@ } @Override - public Void call() throws Exception { + public Void call() throws Exception { - // wrap the messages together. - final HAMessageWrapper wrappedMsg = new HAMessageWrapper( - req, snd, msg); + // wrap the messages together. + final HAMessageWrapper wrappedMsg = new HAMessageWrapper(req, snd, + msg); - // Get Future for send() outcome on local service. - final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, - b); + // Get Future for send() outcome on local service. + final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, + b); - try { + try { + try { - // Get future for receive outcome on the remote - // service. - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, snd, msg); + // Get future for receive outcome on the remote + // service. + final Future<Void> futRec = downstream.service + .receiveAndReplicate(req, snd, msg); - try { + try { - /* - * Await the Futures, but spend more time - * waiting on the local Future and only check - * the remote Future every second. Timeouts are - * ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * The token must remain valid, even if this service is - * not joined with the met quorum. If fact, services - * MUST replicate writes regardless of whether or not - * they are joined with the met quorum, but only while - * there is a met quorum. - */ - member.getQuorum().assertQuorum(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futSnd.isDone() && !futRec.isDone()) { + /* + * The token must remain valid, even if this service + * is not joined with the met quorum. If fact, + * services MUST replicate writes regardless of + * whether or not they are joined with the met + * quorum, but only while there is a met quorum. + */ + member.getQuorum().assertQuorum(token); + try { + futSnd.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futRec.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futSnd.get(); + futRec.get(); - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec - .cancel(true/* mayInterruptIfRunning */); - } - } + } finally { + if (!futRec.isDone()) { + // cancel remote Future unless done. + futRec.cancel(true/* mayInterruptIfRunning */); + } + } - } finally { - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } + } finally { + // Is it possible that this cancel conflicts with throwing + // the PipelineException? + // cancel the local Future. + futSnd.cancel(true/* mayInterruptIfRunning */); + } + } catch (Throwable t) { + // determine next pipeline service id + // FIXME: should this check for problem from further downstream for + // quorums with > 3 services? + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + log.warn("Problem with downstream service: " + priorAndNext[1], + t); - // done - return null; - } + throw new PipelineException(priorAndNext[1], t); + } + // done + return null; + } + } /** Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-13 09:07:46 UTC (rev 7639) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-13 10:44:56 UTC (rev 7640) @@ -106,7 +106,7 @@ // FIXME: in the face of no implemented error propagation we can explicitly // tell the leader to remove the killed service! - startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); + // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); @@ -171,7 +171,7 @@ kill(startup.serverB); // FIXME: temporary call to explicitly remove the service prior to correct protocol - startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); + // startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); @@ -317,8 +317,8 @@ // FIXME: temporary call to explicitly remove the service prior to // correct protocol - startup.serverA.submit(new ForceRemoveService(getServiceBId()), true) - .get(); + // startup.serverA.submit(new ForceRemoveService(getServiceBId()), true) + // .get(); awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] { startup.serverA, startup.serverC }); @@ -396,8 +396,8 @@ // FIXME: temporary call to explicitly remove the service prior to // correct protocol - startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) - .get(); + // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) + // .get(); awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] { startup.serverA, startup.serverB }); @@ -496,8 +496,8 @@ executorService.submit(ft); // FIXME RESYNC_PIPELINE: move into QuorumPipelineImpl. - startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) - .get(); + // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) + // .get(); awaitPipeline(getZKSessionTimeout() + 5000, TimeUnit.MILLISECONDS, new HAGlue[] { startup.serverA, startup.serverB }); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2013-12-13 09:07:53
|
Revision: 7639 http://bigdata.svn.sourceforge.net/bigdata/?rev=7639&view=rev Author: martyncutcher Date: 2013-12-13 09:07:46 +0000 (Fri, 13 Dec 2013) Log Message: ----------- Test commit Modified Paths: -------------- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-13 02:47:11 UTC (rev 7638) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-13 09:07:46 UTC (rev 7639) @@ -471,6 +471,8 @@ * C. We then start the live load. This test explores what happens when A * and B are not yet aware that C is dead when the UPDATE operation starts. * + * Can I commit this + * * @throws Exception */ public void testABC_awaitKBCreate_killC_LiveLoadRemainsMet() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-13 02:47:19
|
Revision: 7638 http://bigdata.svn.sourceforge.net/bigdata/?rev=7638&view=rev Author: thompsonbry Date: 2013-12-13 02:47:11 +0000 (Fri, 13 Dec 2013) Log Message: ----------- removing file with java 7 dependencies that should not have been in the last commit. Removed Paths: ------------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java Deleted: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-12 23:40:15 UTC (rev 7637) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-13 02:47:11 UTC (rev 7638) @@ -1,818 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -package com.bigdata.ha.pipeline; - -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketOption; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -import junit.framework.AssertionFailedError; - -import com.bigdata.btree.BytesUtil; -import com.bigdata.io.TestCase3; - -/** - * Test suite for basic socket behaviors. - * <p> - * Note: Tests in this suite should use direct byte buffers (non-heap NIO) - * buffers in order accurately model the conditions that bigdata uses for write - * replication. If you use heap byte[]s, then they are copied into an NIO direct - * buffer before they are transmitted over a socket. By using NIO direct - * buffers, we stay within the zero-copy pattern for sockets. - * <p> - * Note: Tests in this suite need to use {@link ServerSocketChannel#open()} to - * get access to the stream oriented listening interface for the server side of - * the socket. This is what is used by the {@link HAReceiveService}. It also - * sets up the {@link ServerSocketChannel} in a non-blocking mode and then uses - * the selectors to listen for available data. See {@link HAReceiveService}. - * - * @author <a href="mailto:mar...@us...">Martyn - * Cutcher</a> - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - */ -public class TestSocketsDirect extends TestCase3 { - - public TestSocketsDirect() { - } - - public TestSocketsDirect(String name) { - super(name); - } - - /** - * Writes out the available options for the client and server socket. - * - * @throws IOException - */ - public void testDirectSockets_options() throws IOException { - - // Get a socket addresss for an unused port. - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - - // First our ServerSocketChannel - final ServerSocketChannel ssc = ServerSocketChannel.open(); - try { - - // bind the ServerSocket to the specified port. - ssc.bind(serverAddr); - - // Now the first Client SocketChannel - final SocketChannel cs = SocketChannel.open(); - try { - /* - * Note: true if connection made. false if connection in - * progress. - */ - final boolean immediate = cs.connect(serverAddr); - if (!immediate) { - // Did not connect immediately, so finish connect now. - if (!cs.finishConnect()) { - fail("Did not connect."); - } - } - - /* - * Write out the client socket options. - */ - log.info("Client:: isOpen=" + cs.isOpen()); - log.info("Client:: isBlocking=" + cs.isBlocking()); - log.info("Client:: isRegistered=" + cs.isRegistered()); - log.info("Client:: isConnected=" + cs.isConnected()); - log.info("Client:: isConnectionPending=" - + cs.isConnectionPending()); - for (SocketOption<?> opt : cs.supportedOptions()) { - log.info("Client:: " + opt + " := " + cs.getOption(opt)); - } - - /* - * Note: We need to use ServerSocketChannel.open() to get access - * to the stream oriented listening interface for the server - * side of the socket. - */ - log.info("Server:: isOpen=" + ssc.isOpen()); - log.info("Server:: isBlocking=" + ssc.isBlocking()); - log.info("Server:: isRegistered=" + ssc.isRegistered()); - for (SocketOption<?> opt : ssc.supportedOptions()) { - log.info("Server:: " + opt + " := " + cs.getOption(opt)); - } - - } finally { - cs.close(); - } - - } finally { - - ssc.close(); - - } - - } - - /** - * Simple test of connecting to a server socket and the failure to connect - * to a port not associated with a server socket. - * - * @throws IOException - */ - public void testDirectSockets_exceptionIfPortNotOpen() throws IOException { - - // Get two socket addressses. We will open a service on one and try to - // connect to the unused one on the other port. - final InetSocketAddress serverAddr1 = new InetSocketAddress(getPort(0)); - final InetSocketAddress serverAddr2 = new InetSocketAddress(getPort(0)); - - // First our ServerSocket - final ServerSocket ss1 = new ServerSocket(); - try { - - // bind the ServerSocket to the specified port. - ss1.bind(serverAddr1); - - assertTrue(ss1.getChannel() == null); - - /* - * Without a new connect request we should not be able to accept() a - * new connection. - */ - try { - accept(ss1); - fail("Expected timeout failure"); - } catch (AssertionFailedError afe) { - // expected - } - - // Now the first Client SocketChannel - final SocketChannel cs1 = SocketChannel.open(); - try { - /* - * Note: true if connection made. false if connection in - * progress. - */ - final boolean immediate1 = cs1.connect(serverAddr1); - if (!immediate1) { - // Did not connect immediately, so finish connect now. - if (!cs1.finishConnect()) { - fail("Did not connect."); - } - } - } finally { - cs1.close(); - } - - // Now the first Client SocketChannel - final SocketChannel cs2 = SocketChannel.open(); - try { - cs1.connect(serverAddr2); - fail("Expecting " + IOException.class); - } catch (IOException ex) { - if(log.isInfoEnabled()) - log.info("Ignoring expected exception: "+ex); - } finally { - cs2.close(); - } - - /* - * Without a new connect request we should not be able to accept() a - * new connection. - */ - try { - accept(ss1); - fail("Expected timeout failure"); - } catch (AssertionFailedError afe) { - // expected - } - - } finally { - - ss1.close(); - - } - - } - - /** - * Test of a large write on a socket to understand what happens when the - * write is greater than the combined size of the client send buffer and the - * server receive buffer and the server side of the socket is either not - * accepted or already shutdown. - * - * @throws IOException - * @throws InterruptedException - */ - public void testDirectSockets_largeWrite_NotAccepted() throws IOException, - InterruptedException { - - final Random r = new Random(); - - // Get a socket addresss for an unused port. - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - - // First our ServerSocket - final ServerSocket ss = new ServerSocket(); - try { - - // Size of the server socket receive buffer. - final int receiveBufferSize = ss.getReceiveBufferSize(); - - // Allocate buffer twice as large as the receive buffer. - final byte[] largeBuffer = new byte[receiveBufferSize * 10]; - - if (log.isInfoEnabled()) { - log.info("receiveBufferSize=" + receiveBufferSize - + ", largeBufferSize=" + largeBuffer.length); - } - - // fill buffer with random data. - r.nextBytes(largeBuffer); - - // bind the ServerSocket to the specified port. - ss.bind(serverAddr); - - // Now the first Client SocketChannel - final SocketChannel cs = SocketChannel.open(); - try { - /* - * Note: true if connection made. false if connection in - * progress. - */ - final boolean immediate = cs.connect(serverAddr); - if (!immediate) { - // Did not connect immediately, so finish connect now. - if (!cs.finishConnect()) { - fail("Did not connect."); - } - } - - /* - * Attempt to write data. The server socket is not yet accepted. - * This should hit a timeout. - */ - assertTimeout(10L, TimeUnit.SECONDS, new WriteBufferTask(cs, - ByteBuffer.wrap(largeBuffer))); - - accept(ss); - - } finally { - cs.close(); - } - - } finally { - - ss.close(); - - } - - } - - /** - * The use of threaded tasks in the send/receive service makes it difficult to - * observer the socket state changes. - * - * So let's begin by writing some tests over the raw sockets. - * - * Note that connecting and then immediately closing the socket is perfectly okay. - * ...with an accept followed by a read() of -1 on the returned Socket stream. - * - * @throws IOException - * @throws InterruptedException - */ - public void testDirectSockets() throws IOException, InterruptedException { - - // The payload size that we will use. - final int DATA_LEN = 200; - - final Random r = new Random(); - final byte[] data = new byte[DATA_LEN]; - r.nextBytes(data); - final byte[] dst = new byte[DATA_LEN]; - - // The server side receive buffer size (once we open the server socket). - int receiveBufferSize = -1; - - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - - // First our ServerSocket - final ServerSocket ss = new ServerSocket(); - try { - - assertTrue(ss.getChannel() == null); - - // bind the server socket to the port. - ss.bind(serverAddr); - - assertTrue(ss.getChannel() == null); - - // figure out the receive buffer size on the server socket. - receiveBufferSize = ss.getReceiveBufferSize(); - - if (log.isInfoEnabled()) - log.info("receiveBufferSize=" + receiveBufferSize - + ", payloadSize=" + DATA_LEN); - - if (receiveBufferSize < DATA_LEN) { - - fail("Service socket receive buffer is smaller than test payload size: receiveBufferSize=" - + receiveBufferSize + ", payloadSize=" + DATA_LEN); - - } - - { - /* - * InputStream for server side of socket connection - set below and - * then reused outside of the try/finally block. - */ - InputStream instr = null; - - // Now the first Client SocketChannel - final SocketChannel cs1 = SocketChannel.open(); - try { - - /* - * Note: true if connection made. false if connection in - * progress. - */ - final boolean immediate1 = cs1.connect(serverAddr); - if (!immediate1) { - if (!cs1.finishConnect()) { - fail("Did not connect?"); - } - } - - assertTrue(ss.getChannel() == null); - - /* - * We are connected. - */ - - final ByteBuffer src = ByteBuffer.wrap(data); - - // Write some data on the client socket. - cs1.write(src); - - /* - * Accept client's connection on server (after connect and - * write). - */ - final Socket readSckt1 = accept(ss); - - // Stream to read the data from the socket on the server - // side. - instr = readSckt1.getInputStream(); - - // and read the data - instr.read(dst); - - // confirming the read is correct - assertTrue(BytesUtil.bytesEqual(data, dst)); - - assertTrue(ss.getChannel() == null); - - /* - * Attempting to read more returns ZERO because there is - * nothing in the buffer and the connection is still open on - * the client side. - * - * Note: instr.read(buf) will BLOCK until the data is - * available, the EOF is detected, or an exception is - * thrown. - */ - assertEquals(0, instr.available()); - // assertEquals(0, instr.read(dst)); - - /* - * Now write some more data into the channel and *then* - * close it. - */ - cs1.write(ByteBuffer.wrap(data)); - - // close the client side of the socket - cs1.close(); - - // The server side of client connection is still open. - assertTrue(readSckt1.isConnected()); - assertFalse(readSckt1.isClosed()); - - /* - * Now try writing some more data. This should be disallowed - * since we closed the client side of the socket. - */ - try { - cs1.write(ByteBuffer.wrap(data)); - fail("Expected closed channel exception"); - } catch (ClosedChannelException e) { - // expected - } - - /* - * Since we closed the client side of the socket, when we - * try to read more data on the server side of the - * connection. The data that we already buffered is still - * available on the server side of the socket. - */ - { - // the already buffered data should be available. - final int rdlen = instr.read(dst); - assertEquals(DATA_LEN, rdlen); - assertTrue(BytesUtil.bytesEqual(data, dst)); - } - - /* - * We have drained the buffered data. There is no more - * buffered data and client side is closed, so an attempt to - * read more data on the server side of the socket will - * return EOF (-1). - */ - assertEquals(-1, instr.read(dst)); // read EOF - - // if so then should we explicitly close its socket? - readSckt1.close(); - assertTrue(readSckt1.isClosed()); - - /* - * Still reports EOF after the accepted server socket is - * closed. - */ - assertEquals(-1, instr.read(dst)); - - assertFalse(ss.isClosed()); - assertTrue(ss.getChannel() == null); - - } finally { - cs1.close(); - } - - // failing to read from original stream - final int nrlen = instr.read(dst); - assertEquals(-1, nrlen); - } - - /* - * Now open a new client Socket and connect to the server. - */ - final SocketChannel cs2 = SocketChannel.open(); - try { - - // connect to the server socket again. - final boolean immediate2 = cs2.connect(serverAddr); - if (!immediate2) { - if (!cs2.finishConnect()) { - fail("Did not connect?"); - } - } - - // Now server should accept the new client connection - final Socket s2 = accept(ss); - - // Client writes to the SocketChannel - final int wlen = cs2.write(ByteBuffer.wrap(data)); - assertEquals(DATA_LEN, wlen); // verify data written. - - // but succeeding to read from the new Socket - final InputStream instr2 = s2.getInputStream(); - instr2.read(dst); - assertTrue(BytesUtil.bytesEqual(data, dst)); - - /* - * Question: Can a downstream close be detected upstream? - * - * Answer: No. Closing the server socket does not tell the - * client that the socket was closed. - */ - { - // close server side input stream. - instr2.close(); - - // but the client still thinks its connected. - assertTrue(cs2.isOpen()); - - // Does the client believe it is still open after a brief - // sleep? - Thread.sleep(1000); - assertTrue(cs2.isOpen()); // yes. - - // close server stocket. - s2.close(); - - // client still thinks it is connected after closing server - // socket. - assertTrue(cs2.isOpen()); - - // Does the client believe it is still open after a brief - // sleep? - Thread.sleep(1000); - assertTrue(cs2.isOpen()); // yes. - - } - - /* - * Now write some more to the socket. We have closed the - * accepted connection on the server socket. Our observations - * show that the 1st write succeeds. The second write then fails - * with 'IOException: "Broken pipe"' - * - * The server socket is large (256k). We are not filling it up, - * but the 2nd write always fails. Further, the client never - * believes that the connection is closed until the 2nd write, - */ - { - final int writeSize = 1; - int nwritesOk = 0; - long nbytesReceived = 0L; - while (true) { - try { - // write a payload. - final int wlen2 = cs2.write(ByteBuffer.wrap(data, - 0, writeSize)); - // if write succeeds, should have written all bytes. - assertEquals(writeSize, wlen2); - nwritesOk++; - nbytesReceived += wlen2; - // does the client think the connection is still open? - assertTrue(cs2.isOpen()); // yes. - Thread.sleep(1000); - assertTrue(cs2.isOpen()); // yes. - } catch (IOException ex) { - if (log.isInfoEnabled()) - log.info("Expected exception: nwritesOk=" - + nwritesOk + ", nbytesReceived=" - + nbytesReceived + ", ex=" + ex); - break; - } - } - } - - /* - * Having closed the input, without a new connect request we - * should not be able to accept the new write since the data - * were written on a different client connection. - */ - try { - final Socket s3 = accept(ss); - fail("Expected timeout failure"); - } catch (AssertionFailedError afe) { - // expected - } - - } finally { - cs2.close(); - } - - } finally { - ss.close(); - } - - } - - /** - * Confirms that multiple clients can communicate with same Server - * - * @throws IOException - */ - public void testMultipleClients() throws IOException { - - // The payload size that we will use. - final int DATA_LEN = 200; - final Random r = new Random(); - final byte[] data = new byte[DATA_LEN]; - r.nextBytes(data); - - final int nclients = 10; - - final ArrayList<SocketChannel> clients = new ArrayList<SocketChannel>(); - final ArrayList<Socket> sockets = new ArrayList<Socket>(); - - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - - final ServerSocket ss = new ServerSocket(); - try { - - // bind the ServerSocket to the specified port. - ss.bind(serverAddr); - - assertTrue(ss.getChannel() == null); - - final int receiveBufferSize = ss.getReceiveBufferSize(); - - // Make sure that we have enough room to receive all client writes - // before draining any of them. - assertTrue(DATA_LEN * nclients <= receiveBufferSize); - - assertNoTimeout(10, TimeUnit.SECONDS, new Callable<Void>() { - - @Override - public Void call() throws Exception { - - for (int c = 0; c < nclients; c++) { - - // client connects to server. - final SocketChannel cs = SocketChannel.open(); - cs.connect(serverAddr); - clients.add(cs); - - // accept connection on server. - sockets.add(ss.accept()); - - // write to each SocketChannel (after connect/accept) - cs.write(ByteBuffer.wrap(data)); - } - - return null; - - } - - }); - - /* - * Now read from all Sockets accepted on the server. - * - * Note: This is a simple loop, not a parallel read. The same buffer - * is reused on each iteration. - */ - { - - final byte[] dst = new byte[DATA_LEN]; - - for (Socket s : sockets) { - - assertFalse(s.isClosed()); - - final InputStream instr = s.getInputStream(); - - assertFalse(-1 == instr.read(dst)); // doesn't return -1 - - assertTrue(BytesUtil.bytesEqual(data, dst)); - - // Close each Socket to ensure it is different - s.close(); - - assertTrue(s.isClosed()); - - } - - } - - } finally { - - // ensure client side connections are closed. - for (SocketChannel ch : clients) { - if (ch != null) - ch.close(); - } - - // ensure server side connections are closed. - for (Socket s : sockets) { - if (s != null) - s.close(); - } - - // close the server socket. - ss.close(); - - } - - } - - /** wrap the ServerSocket accept with a timeout check. */ - private Socket accept(final ServerSocket ss) { - - final AtomicReference<Socket> av = new AtomicReference<Socket>(); - - assertNoTimeout(1, TimeUnit.SECONDS, new Callable<Void>() { - - @Override - public Void call() throws Exception { - - av.set(ss.accept()); - - return null; - } - }); - - return av.get(); - } - - /** - * Fail the test if the {@link Callable} completes before the specified - * timeout. - * - * @param timeout - * @param unit - * @param callable - */ - private void assertTimeout(final long timeout, final TimeUnit unit, - final Callable<Void> callable) { - final ExecutorService es = Executors.newSingleThreadExecutor(); - final Future<Void> ret = es.submit(callable); - final long begin = System.currentTimeMillis(); - try { - // await Future with timeout. - ret.get(timeout, unit); - final long elapsed = System.currentTimeMillis() - begin; - fail("Expected timeout: elapsed=" + elapsed + "ms, timeout=" - + timeout + " " + unit); - } catch (TimeoutException e) { - // that is expected - final long elapsed = System.currentTimeMillis() - begin; - if (log.isInfoEnabled()) - log.info("timeout after " + elapsed + "ms"); - return; - } catch (Exception e) { - final long elapsed = System.currentTimeMillis() - begin; - fail("Expected timeout: elapsed=" + elapsed + ", timeout=" - + timeout + " " + unit, e); - } finally { - log.warn("Cancelling task - should interrupt accept()"); - ret.cancel(true/* mayInterruptIfRunning */); - es.shutdown(); - } - } - - /** - * Throws {@link AssertionFailedError} if the {@link Callable} does not - * succeed within the timeout. - * - * @param timeout - * @param unit - * @param callable - * - * @throws AssertionFailedError - * if the {@link Callable} does not succeed within the timeout. - * @throws AssertionFailedError - * if the {@link Callable} fails. - */ - private void assertNoTimeout(final long timeout, final TimeUnit unit, - final Callable<Void> callable) { - final ExecutorService es = Executors.newSingleThreadExecutor(); - try { - final Future<Void> ret = es.submit(callable); - ret.get(timeout, unit); - } catch (TimeoutException e) { - fail("Unexpected timeout"); - } catch (Exception e) { - fail("Unexpected Exception", e); - } finally { - es.shutdown(); - } - } - - /** - * Task writes the data on the client {@link SocketChannel}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - */ - private static class WriteBufferTask implements Callable<Void> { - - final private ByteBuffer buf; - final private SocketChannel cs; - - public WriteBufferTask(final SocketChannel cs, final ByteBuffer buf) { - this.cs = cs; - this.buf = buf; - } - - @Override - public Void call() throws Exception { - cs.write(buf); - return null; - } - - } - -} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-12 23:40:21
|
Revision: 7637 http://bigdata.svn.sourceforge.net/bigdata/?rev=7637&view=rev Author: thompsonbry Date: 2013-12-12 23:40:15 +0000 (Thu, 12 Dec 2013) Log Message: ----------- Added the just kills test suite. This still needs work, but it can go into CI for this branch. Modified Paths: -------------- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAll.java Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAll.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAll.java 2013-12-12 23:38:47 UTC (rev 7636) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAll.java 2013-12-12 23:40:15 UTC (rev 7637) @@ -86,6 +86,9 @@ // HA3 test suite focusing on changing the leader. suite.addTestSuite(TestHA3ChangeLeader.class); + // HA3 test suite focusing on sudden kills. + suite.addTestSuite(TestHA3JustKills.class); + // HA3 snapshot policy test suite. suite.addTestSuite(TestHA3SnapshotPolicy.class); suite.addTestSuite(TestHA3SnapshotPolicy2.class); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-12 23:38:58
|
Revision: 7636 http://bigdata.svn.sourceforge.net/bigdata/?rev=7636&view=rev Author: thompsonbry Date: 2013-12-12 23:38:47 +0000 (Thu, 12 Dec 2013) Log Message: ----------- Sync to Martyn and CI on #724 (write replication pipeline resynchronization). We have incorporated logic to drain to the marker in the replication protocol. The marker concept has been refactored. There is now an IHASendState that captures original and potentially routing information for the payload. This has been raised into the HAPipelineGlue interface. We still need to bring in the typed exception handling for forceRemoveService() invocations and examine QuorumPipelineImpl for possible lock contention issues around this resync protocol. Removed dead test suite from build.xml Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/MockQuorumMember.java branches/MGC_1_3_0/build.xml Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAMessageWrapper.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHAMessageWrapper.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHASendState.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/AbstractHASendAndReceiveTestCase.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -37,6 +37,7 @@ import com.bigdata.ha.msg.IHALogRootBlocksRequest; import com.bigdata.ha.msg.IHALogRootBlocksResponse; import com.bigdata.ha.msg.IHARebuildRequest; +import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASendStoreResponse; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -128,14 +129,17 @@ * A request for an HALog (optional). This is only non-null when * historical {@link WriteCache} blocks are being replayed down * the write pipeline in order to synchronize a service. + * @param snd + * Metadata about the state of the sender and potentially the + * routing of the payload along the write replication pipeline. * @param msg * The metadata. * * @return The {@link Future} which will become available once the buffer * transfer is complete. */ - Future<Void> receiveAndReplicate(IHASyncRequest req, IHAWriteMessage msg) - throws IOException; + Future<Void> receiveAndReplicate(IHASyncRequest req, IHASendState snd, + IHAWriteMessage msg) throws IOException; /** * Request metadata about the current write set from the quorum leader. Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -33,6 +33,7 @@ import java.util.concurrent.Future; import com.bigdata.ha.halog.HALogWriter; +import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.io.writecache.WriteCache; @@ -85,11 +86,14 @@ * A synchronization request (optional). This is only non-null * when historical {@link WriteCache} blocks are being replayed * down the write pipeline in order to synchronize a service. + * @param snd + * Metadata about the state of the sender and potentially the + * routing of the payload along the write replication pipeline. * @param msg * The RMI metadata about the payload. */ - Future<Void> receiveAndReplicate(IHASyncRequest req, IHAWriteMessage msg) - throws IOException; + Future<Void> receiveAndReplicate(IHASyncRequest req, IHASendState snd, + IHAWriteMessage msg) throws IOException; /* * Note: Method removed since it does not appear necessary to let this Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -41,15 +41,17 @@ import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; -import com.bigdata.ha.msg.HAWriteMessageBase; -import com.bigdata.ha.msg.IHALogRequest; +import com.bigdata.ha.msg.HAMessageWrapper; +import com.bigdata.ha.msg.HASendState; import com.bigdata.ha.msg.IHAMessage; +import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.ha.pipeline.HAReceiveService; @@ -190,7 +192,7 @@ * href="https://sourceforge.net/apps/trac/bigdata/ticket/724">HA wire * pulling and sudden kill testing</a>. */ - private final int RETRY_SLEEP = 100; //200; // 50; // milliseconds. + private final int RETRY_SLEEP = 30; //200; // 50; // milliseconds. /** * Once this timeout is elapsed, retrySend() will fail. @@ -207,12 +209,12 @@ /** * The {@link QuorumMember}. */ - protected final QuorumMember<S> member; + private final QuorumMember<S> member; /** * The service {@link UUID} for the {@link QuorumMember}. */ - protected final UUID serviceId; + private final UUID serviceId; /** * Lock managing the various mutable aspects of the pipeline state. @@ -244,6 +246,11 @@ private final InnerEventHandler innerEventHandler = new InnerEventHandler(); /** + * One up message identifier. + */ + private final AtomicLong messageId = new AtomicLong(0L); + + /** * Core implementation of the handler for the various events. Always run * while holding the {@link #lock}. * @@ -851,15 +858,19 @@ public void callback(final HAMessageWrapper msg, final ByteBuffer data) throws Exception { // delegate handling of write cache blocks. - handleReplicatedWrite(msg.req, msg.msg, data); + handleReplicatedWrite(msg.getHASyncRequest(), + (IHAWriteMessage) msg + .getHAWriteMessage(), data); } @Override public void incReceive(final HAMessageWrapper msg, final int nreads, final int rdlen, final int rem) throws Exception { // delegate handling of incremental receive notify. - QuorumPipelineImpl.this.incReceive(msg.req, - msg.msg, nreads, rdlen, rem); + QuorumPipelineImpl.this.incReceive(// + msg.getHASyncRequest(), + (IHAWriteMessage) msg.getHAWriteMessage(), // + nreads, rdlen, rem); } }); // Start the receive service - will not return until service is @@ -1170,36 +1181,17 @@ /* * End of QuorumStateChangeListener. */ - - /** - * Glue class wraps the {@link IHAWriteMessage} and the - * {@link IHALogRequest} message and exposes the requires {@link IHAMessage} - * interface to the {@link HAReceiveService}. This class is never persisted. - * It just let's us handshake with the {@link HAReceiveService} and get back - * out the original {@link IHAWriteMessage} as well as the optional - * {@link IHALogRequest} message. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private static class HAMessageWrapper extends HAWriteMessageBase { - private static final long serialVersionUID = 1L; + private IHASendState newSendState() { - final IHASyncRequest req; - final IHAWriteMessage msg; - - public HAMessageWrapper(final IHASyncRequest req, - final IHAWriteMessage msg) { + final Quorum<?, ?> quorum = member.getQuorum(); - // Use size and checksum from real IHAWriteMessage. - super(msg.getSize(),msg.getChk()); - - this.req = req; // MAY be null; - this.msg = msg; - - } + final IHASendState snd = new HASendState(messageId.incrementAndGet(), + serviceId/* originalSenderId */, serviceId/* senderId */, + quorum.token(), quorum.replicationFactor()); + return snd; + } /* @@ -1214,7 +1206,8 @@ lock(); try { - ft = new FutureTask<Void>(new RobustReplicateTask(req, msg, b)); + ft = new FutureTask<Void>(new RobustReplicateTask(req, + newSendState(), msg, b)); } finally { @@ -1243,6 +1236,11 @@ private final IHASyncRequest req; /** + * Metadata about the state of the sender for this message. + */ + private final IHASendState snd; + + /** * The {@link IHAWriteMessage}. */ private final IHAWriteMessage msg; @@ -1265,10 +1263,14 @@ private final long quorumToken; public RobustReplicateTask(final IHASyncRequest req, - final IHAWriteMessage msg, final ByteBuffer b) { + final IHASendState snd, final IHAWriteMessage msg, + final ByteBuffer b) { // Note: [req] MAY be null. + if (snd == null) + throw new IllegalArgumentException(); + if (msg == null) throw new IllegalArgumentException(); @@ -1277,6 +1279,8 @@ this.req = req; + this.snd = snd; + this.msg = msg; this.b = b; @@ -1467,6 +1471,7 @@ * at com.bigdata.ha.pipeline.HAReceiveService.run(HAReceiveService.java:431) * </pre> */ + @Override public Void call() throws Exception { final long beginNanos = System.nanoTime(); @@ -1549,7 +1554,7 @@ final ByteBuffer b = this.b.duplicate(); - new SendBufferTask<S>(member, quorumToken, req, msg, b, + new SendBufferTask<S>(member, quorumToken, req, snd, msg, b, downstream, sendService, sendLock).call(); return; @@ -1674,6 +1679,7 @@ private final QuorumMember<S> member; private final long token; // member MUST remain leader for token. private final IHASyncRequest req; + private final IHASendState snd; private final IHAWriteMessage msg; private final ByteBuffer b; private final PipelineState<S> downstream; @@ -1681,13 +1687,15 @@ private final Lock sendLock; public SendBufferTask(final QuorumMember<S> member, final long token, - final IHASyncRequest req, final IHAWriteMessage msg, - final ByteBuffer b, final PipelineState<S> downstream, + final IHASyncRequest req, final IHASendState snd, + final IHAWriteMessage msg, final ByteBuffer b, + final PipelineState<S> downstream, final HASendService sendService, final Lock sendLock) { this.member = member; this.token = token; this.req = req; // Note: MAY be null. + this.snd = snd; this.msg = msg; this.b = b; this.downstream = downstream; @@ -1696,6 +1704,7 @@ } + @Override public Void call() throws Exception { /* @@ -1723,13 +1732,13 @@ ExecutionException, IOException { // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService.send(b); + final Future<Void> futSnd = sendService.send(b, snd.getMarker()); try { // Get Future for receive outcome on the remote service (RMI). final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, msg); + .receiveAndReplicate(req, snd, msg); try { @@ -1780,7 +1789,8 @@ @Override public Future<Void> receiveAndReplicate(final IHASyncRequest req, - final IHAWriteMessage msg) throws IOException { + final IHASendState snd, final IHAWriteMessage msg) + throws IOException { /* * FIXME We should probably pass the quorum token through from the @@ -1837,7 +1847,7 @@ */ ft = new FutureTask<Void>(new ReceiveTask<S>(member, token, - req, msg, b, receiveService)); + req, snd, msg, b, receiveService)); // try { // @@ -1862,7 +1872,8 @@ */ ft = new FutureTask<Void>(new ReceiveAndReplicateTask<S>( - member, token, req, msg, b, downstream, receiveService)); + member, token, req, snd, msg, b, downstream, + receiveService)); } @@ -1891,6 +1902,7 @@ private final QuorumMember<S> member; private final long token; private final IHASyncRequest req; + private final IHASendState snd; private final IHAWriteMessage msg; private final ByteBuffer b; private final HAReceiveService<HAMessageWrapper> receiveService; @@ -1898,6 +1910,7 @@ public ReceiveTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, + final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final HAReceiveService<HAMessageWrapper> receiveService ) { @@ -1905,16 +1918,18 @@ this.member = member; this.token = token; this.req = req; // Note: MAY be null. + this.snd = snd; this.msg = msg; this.b = b; this.receiveService = receiveService; } + @Override public Void call() throws Exception { - // wrap the messages together. + // wrap the messages together. final HAMessageWrapper wrappedMsg = new HAMessageWrapper( - req, msg); + req, snd, msg); // Get Future for send() outcome on local service. final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, @@ -1957,6 +1972,7 @@ private final QuorumMember<S> member; private final long token; private final IHASyncRequest req; + private final IHASendState snd; private final IHAWriteMessage msg; private final ByteBuffer b; private final PipelineState<S> downstream; @@ -1964,6 +1980,7 @@ public ReceiveAndReplicateTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, + final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, final HAReceiveService<HAMessageWrapper> receiveService) { @@ -1971,17 +1988,19 @@ this.member = member; this.token = token; this.req = req; // Note: MAY be null. + this.snd = snd; this.msg = msg; this.b = b; this.downstream = downstream; this.receiveService = receiveService; } + @Override public Void call() throws Exception { // wrap the messages together. final HAMessageWrapper wrappedMsg = new HAMessageWrapper( - req, msg); + req, snd, msg); // Get Future for send() outcome on local service. final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, @@ -1992,7 +2011,7 @@ // Get future for receive outcome on the remote // service. final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, msg); + .receiveAndReplicate(req, snd, msg); try { Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -36,6 +36,7 @@ import org.apache.log4j.Logger; +import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.journal.AbstractJournal; @@ -233,10 +234,11 @@ @Override public Future<Void> receiveAndReplicate(final IHASyncRequest req, - final IHAWriteMessage msg) throws IOException { + final IHASendState snd, final IHAWriteMessage msg) + throws IOException { + + return pipelineImpl.receiveAndReplicate(req, snd, msg); - return pipelineImpl.receiveAndReplicate(req, msg); - } @Override Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAMessageWrapper.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAMessageWrapper.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAMessageWrapper.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -0,0 +1,84 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.msg; + +import com.bigdata.ha.pipeline.HAReceiveService; + +/** + * Glue class wraps the {@link IHAWriteMessage} and the {@link IHALogRequest} + * message and exposes the requires {@link IHAMessage} interface to the + * {@link HAReceiveService}. This class is never persisted. It just let's us + * handshake with the {@link HAReceiveService} and get back out the original + * {@link IHAWriteMessage} as well as the optional {@link IHALogRequest} + * message. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class HAMessageWrapper extends HAWriteMessageBase implements + IHAMessageWrapper { + + private static final long serialVersionUID = 1L; + + private final IHASyncRequest req; + private final IHASendState snd; + private final IHAWriteMessageBase msg; + + public HAMessageWrapper(final IHASyncRequest req, final IHASendState snd, + final IHAWriteMessageBase msg) { + + // Use size and checksum from real IHAWriteMessage. + super(msg.getSize(), msg.getChk()); + + this.req = req; // MAY be null; + this.snd = snd; + this.msg = msg; + + } + + @Override + public IHASyncRequest getHASyncRequest() { + return req; + } + + @Override + public IHASendState getHASendState() { + return snd; + } + + @Override + public IHAWriteMessageBase getHAWriteMessage() { + return msg; + } + + /** + * Return the {@link IHASendState#getMarker()} iff there is an associated + * {@link IHASendState} and otherwise <code>null</code>. + */ + public byte[] getMarker() { + + return snd == null ? null : snd.getMarker(); + + } + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -0,0 +1,245 @@ +package com.bigdata.ha.msg; + +import java.io.DataOutput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; + +import com.bigdata.io.DataOutputBuffer; +import com.bigdata.rawstore.Bytes; + +public class HASendState implements IHASendState, Externalizable { + +// private static final Logger log = Logger.getLogger(HASendState.class); + + private static final long serialVersionUID = 1; + + private long messageId; + private UUID originalSenderId; + private UUID senderId; + private long token; + private int replicationFactor; + + /** + * De-serialization constructor. + */ + public HASendState() { + + } + + public HASendState(final long messageId, final UUID originalSenderId, + final UUID senderId, final long token, final int replicationFactor) { + + if (originalSenderId == null) + throw new IllegalArgumentException(); + + if (senderId == null) + throw new IllegalArgumentException(); + + if (replicationFactor <= 0) + throw new IllegalArgumentException(); + + this.messageId = messageId; + this.originalSenderId = originalSenderId; + this.senderId = senderId; + this.token = token; + this.replicationFactor = replicationFactor; + + } + + @Override + public long getMessageId() { + + return messageId; + + } + + @Override + public UUID getOriginalSenderId() { + + return originalSenderId; + } + + @Override + public UUID getSenderId() { + + return senderId; + } + + @Override + public long getQuorumToken() { + + return token; + + } + + @Override + public int getReplicationFactor() { + + return replicationFactor; + + } + + @Override + public byte[] getMarker() { + + final byte[] a = new byte[MAGIC_SIZE + currentVersionLen]; + + final DataOutputBuffer dob = new DataOutputBuffer(0/* len */, a); + + try { + + dob.writeLong(MAGIC); + + writeExternal2(dob); + + } catch (IOException e) { + + throw new RuntimeException(e); + + } + + return a; + + } + + @Override + public String toString() { + + return super.toString() + "{messageId=" + messageId + + ",originalSenderId=" + originalSenderId + ",senderId=" + + senderId + ",token=" + token + ", replicationFactor=" + + replicationFactor + "}"; + + } + + @Override + public boolean equals(final Object obj) { + + if (this == obj) + return true; + + if (!(obj instanceof IHASendState)) + return false; + + final IHASendState t = (IHASendState) obj; + + return messageId == t.getMessageId() + && originalSenderId.equals(t.getOriginalSenderId()) + && senderId.equals(t.getSenderId()) && token == t.getQuorumToken() + && replicationFactor == t.getReplicationFactor(); + + } + + @Override + public int hashCode() { + + // based on the messageId and the hashCode of the senderId + return ((int) (messageId ^ (messageId >>> 32))) + senderId.hashCode(); + } + + /** + * Magic data only included in the marker. + */ + private static final long MAGIC = 0x13759f98e8363caeL; + private static final int MAGIC_SIZE = Bytes.SIZEOF_LONG; + + private static final transient short VERSION0 = 0x0; + private static final transient int VERSION0_LEN = // + Bytes.SIZEOF_LONG + // messageId + Bytes.SIZEOF_UUID + // originalSenderId + Bytes.SIZEOF_UUID + // senderId + Bytes.SIZEOF_LONG + // token + Bytes.SIZEOF_INT // replicationFactor + ; + + private static final transient short currentVersion = VERSION0; + private static final transient int currentVersionLen = VERSION0_LEN; + + @Override + public void readExternal(final ObjectInput in) throws IOException, + ClassNotFoundException { + + final short version = in.readShort(); + + if (version != VERSION0) + throw new RuntimeException("Bad version for serialization"); + + messageId = in.readLong(); + + originalSenderId = new UUID(// + in.readLong(), /* MSB */ + in.readLong() /* LSB */ + ); + + senderId = new UUID(// + in.readLong(), /* MSB */ + in.readLong() /* LSB */ + ); + + token = in.readLong(); + + replicationFactor = in.readInt(); + + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + + writeExternal2(out); + + } + + private void writeExternal2(final DataOutput out) throws IOException { + + out.writeShort(currentVersion); + + out.writeLong(messageId); + + out.writeLong(originalSenderId.getMostSignificantBits()); + out.writeLong(originalSenderId.getLeastSignificantBits()); + + out.writeLong(senderId.getMostSignificantBits()); + out.writeLong(senderId.getLeastSignificantBits()); + + out.writeLong(token); + + out.writeInt(replicationFactor); + + } + + // static final private int MARKER_SIZE = 8; + // + // /** + // * Unique marker generation with JVM wide random number generator. + // * + // * @return A "pretty unique" marker. + // */ + // private byte[] genMarker() { + // + // final byte[] token = new byte[MARKER_SIZE]; + // + // while (!unique1(token)) { + // r.nextBytes(token); + // } + // + // return token; + // } + // + // /** + // * Checks that the first byte is not repeated in the remaining bytes, this + // * simplifies search for the token in the input stream. + // */ + // static private boolean unique1(final byte[] bytes) { + // final byte b = bytes[0]; + // for (int t = 1; t < bytes.length; t++) { + // if (bytes[t] == b) + // return false; + // } + // + // return true; + // } + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -159,6 +159,7 @@ return compressorKey; } + @Override public String toString() { return getClass().getName() // @@ -347,7 +348,9 @@ * @return */ public static boolean isDataCompressed() { - return compressData; + + return compressData; + } @Override @@ -375,6 +378,7 @@ } + @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { @@ -414,6 +418,7 @@ firstOffset = in.readLong(); } + @Override public void writeExternal(final ObjectOutput out) throws IOException { super.writeExternal(out); if (currentVersion >= VERSION1 && uuid != null) { @@ -469,6 +474,7 @@ // return compressor.compress(buffer); // } + @Override public ByteBuffer expand(final ByteBuffer buffer) { final String compressorKey = getCompressorKey(); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -63,15 +63,15 @@ * The Alder32 checksum of the bytes to be transfered. */ public HAWriteMessageBase(final int sze, final int chk) { - + if (sze <= 0) throw new IllegalArgumentException(); this.sze = sze; this.chk = chk; - - } + + } /** * Deserialization constructor. @@ -97,7 +97,8 @@ return chk; } - + + @Override public String toString() { return super.toString() + "{size=" + sze + ",chksum=" + chk + "}"; @@ -131,6 +132,7 @@ private static final transient short currentVersion = VERSION0; + @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { @@ -145,6 +147,7 @@ } + @Override public void writeExternal(final ObjectOutput out) throws IOException { out.writeShort(currentVersion); @@ -152,7 +155,7 @@ out.writeInt(sze); out.writeInt(chk); - + } } Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHAMessageWrapper.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHAMessageWrapper.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHAMessageWrapper.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -0,0 +1,59 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.msg; + +import com.bigdata.ha.pipeline.HAReceiveService; + +/** + * Glue interface wraps the {@link IHALogRequest}, {@link IHASendState}, and + * {@link IHAWriteMessage} interfaces exposes the requires {@link IHAMessage} + * interface to the {@link HAReceiveService}. This class is never persisted (it + * does NOT get written into the HALog files). It just let's us handshake with + * the {@link HAReceiveService} and get back out the original + * {@link IHAWriteMessage} as well as the optional {@link IHALogRequest} + * message. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public interface IHAMessageWrapper { + + /** + * Return the optional {@link IHASyncRequest}. When available, this provides + * information about the service request that resulted in the transmission + * of the payload along the pipeline. + */ + IHASyncRequest getHASyncRequest(); + + /** + * Return information about the state of the sending service. + */ + IHASendState getHASendState(); + + /** + * Return the message that describes the payload that will be replicated + * along the pipeline. + */ + IHAWriteMessageBase getHAWriteMessage(); + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHASendState.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHASendState.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -0,0 +1,71 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package com.bigdata.ha.msg; + +import java.io.Serializable; +import java.util.UUID; + +/** + * Interface for the state of the sender of an {@link IHAMessage}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public interface IHASendState extends Serializable { + + /** + * A unique (one-up) message sequence identifier for the messages from the + * sender. This identifier may be used to verify that the bytes available + * from the replication stream are associated with the designed payload. + */ + long getMessageId(); + + /** + * The {@link UUID} of the originating service. This may be used to verify + * that a message was sourced the expected quorum leader. + */ + UUID getOriginalSenderId(); + + /** + * The {@link UUID} of the sending service. This may be used to verify that + * a message was sourced the expected upstream service. + */ + UUID getSenderId(); + + /** + * The current quorum token on the sender. + */ + long getQuorumToken(); + + /** + * The current replication factor on the sender. + */ + int getReplicationFactor(); + + /** + * A byte[] marker that must prefix the message payload, needed to skip + * stale data from failed read tasks. + */ + byte[] getMarker(); + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -51,7 +51,9 @@ import org.apache.log4j.Logger; +import com.bigdata.btree.BytesUtil; import com.bigdata.ha.QuorumPipelineImpl; +import com.bigdata.ha.msg.HAMessageWrapper; import com.bigdata.ha.msg.IHAMessage; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -73,7 +75,7 @@ * @author Martyn Cutcher * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ -public class HAReceiveService<M extends IHAWriteMessageBase> extends Thread { +public class HAReceiveService<M extends HAMessageWrapper> extends Thread { private static final Logger log = Logger .getLogger(HAReceiveService.class); @@ -225,6 +227,7 @@ /* * Note: toString() implementation is non-blocking. */ + @Override public String toString() { return super.toString() + "{addrSelf=" + addrSelf + ", addrNext=" @@ -725,7 +728,7 @@ * <p> * report the #of payloads. */ - static private class ReadTask<M extends IHAWriteMessageBase> implements + static private class ReadTask<M extends HAMessageWrapper> implements Callable<Void> { private final ServerSocketChannel server; @@ -983,18 +986,25 @@ private void doReceiveAndReplicate(final Client client) throws Exception { - /** - * The first cause if downstream replication fails. We make a note - * of this first cause, continue to drain the payload, and then - * rethrow the first cause once the payload has been fully drained. - * This is necessary to ensure that the socket channel does not have - * partial data remaining from an undrained payload. - * - * @see <a - * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" - * > HA wire pulling and sure kill testing </a> - */ - Throwable downstreamFirstCause = null; +// /** +// * The first cause if downstream replication fails. We make a note +// * of this first cause, continue to drain the payload, and then +// * rethrow the first cause once the payload has been fully drained. +// * This is necessary to ensure that the socket channel does not have +// * partial data remaining from an undrained payload. +// * +// * @see <a +// * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" +// * > HA wire pulling and sure kill testing </a> +// * +// * Note: It appears that attempting to drain the +// * payload is risky since there are a variety of ways in which +// * the process might be terminated. It seems to be safer to +// * drain the socket channel until we reach a marker that gives +// * us confidence that we are at the payload for the message +// * that is being processed. +// */ +// Throwable downstreamFirstCause = null; /* * We should now have parameters ready in the WriteMessage and can @@ -1012,6 +1022,9 @@ // for debug retain number of low level reads int reads = 0; + final DrainToMarkerUtil drainUtil = message.getHASendState() != null ? new DrainToMarkerUtil( + message.getHASendState().getMarker(), client) : null; + while (rem > 0 && !EOS) { // block up to the timeout. @@ -1070,9 +1083,16 @@ while (iter.hasNext()) { + // Check for termination. + client.checkFirstCause(); + iter.next(); iter.remove(); + if (!drainUtil.foundMarker()) { + continue; + } + final int rdlen = client.client.read(localBuffer); if (log.isTraceEnabled()) @@ -1098,17 +1118,17 @@ callback.incReceive(message, reads, rdlen, rem); } - if (downstreamFirstCause == null) { - try { +// if (downstreamFirstCause == null) { +// try { forwardReceivedBytes(client, rdlen); - } catch (ExecutionException ex) { - log.error( - "Downstream replication failure" - + ": will drain payload and then rethrow exception: rootCause=" - + ex, ex); - downstreamFirstCause = ex; - } - } +// } catch (ExecutionException ex) { +// log.error( +// "Downstream replication failure" +// + ": will drain payload and then rethrow exception: rootCause=" +// + ex, ex); +// downstreamFirstCause = ex; +// } +// } } // while(itr.hasNext()) @@ -1134,28 +1154,31 @@ + ", actual=" + (int) chk.getValue()); } - if (downstreamFirstCause != null) { - - /** - * Replication to the downstream service failed. The payload has - * been fully drained. This ensures that we do not leave part of - * the payload in the upstream socket channel. We now wrap and - * rethrow the root cause of the downstream failure. The leader - * will handle this by forcing the remove of the downstream - * service and then re-replicating the payload. - * - * @see <a - * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" - * > HA wire pulling and sure kill testing </a> - */ +// if (downstreamFirstCause != null) { +// +// /** +// * Replication to the downstream service failed. The payload has +// * been fully drained. This ensures that we do not leave part of +// * the payload in the upstream socket channel. We now wrap and +// * rethrow the root cause of the downstream failure. The leader +// * will handle this by forcing the remove of the downstream +// * service and then re-replicating the payload. +// * +// * @see <a +// * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" +// * > HA wire pulling and sure kill testing </a> +// */ +// +// throw new RuntimeException( +// "Downstream replication failure: msg=" + message +// + ", ex=" + downstreamFirstCause, +// downstreamFirstCause); +// +// } + + // Check for termination. + client.checkFirstCause(); - throw new RuntimeException( - "Downstream replication failure: msg=" + message - + ", ex=" + downstreamFirstCause, - downstreamFirstCause); - - } - if (callback != null) { /* @@ -1173,7 +1196,7 @@ } // call() /** - * Now forward the most recent transfer bytes downstream. + * Forward the most recent transfer bytes downstream. * <p> * * Note: [addrNext] is final. If the downstream address is changed, then @@ -1229,107 +1252,123 @@ } // Check for termination. client.checkFirstCause(); - // Send and await Future. - sendService.send(out).get(); + /* + * Send and await Future. If this is the first chunk of a + * payload and a marker exists, then send the marker as + * well. + */ + sendService + .send(out, + out.position() == 0 + && message.getHASendState() != null ? message + .getHASendState().getMarker() + : null).get(); } break; // break out of the inner while loop. } // while(true) } - -// private void ack(final Client client) throws IOException { -// -// if (log.isTraceEnabled()) -// log.trace("Will ACK"); -// -// ack(client.client, HASendService.ACK); -// -// if (log.isTraceEnabled()) -// log.trace("Did ACK"); -// -// } -// -// private void nack(final Client client) throws IOException { -// -// if (log.isTraceEnabled()) -// log.trace("Will NACK"); -// -// ack(client.client, HASendService.NACK); -// -// if (log.isTraceEnabled()) -// log.trace("Did NACK"); -// -// } -// -// /** -// * ACK/NACK the payload. -// * -// * @param client -// * @throws IOException -// */ -// private void ack(final SocketChannel client, final byte ret) -// throws IOException { -// -// // FIXME optimize. -// final ByteBuffer b = ByteBuffer.wrap(new byte[] { ret /* ACK */}); -// -// // The #of bytes to transfer. -// final int remaining = b.remaining(); -// -//// if (log.isTraceEnabled()) -//// log.trace("Will send " + remaining + " bytes"); -// -//// try { -// -// int nwritten = 0; -// -// while (nwritten < remaining) { -// -// /* -// * Write the data. Depending on the channel, will either -// * block or write as many bytes as can be written -// * immediately (this latter is true for socket channels in a -// * non-blocking mode). IF it blocks, should block until -// * finished or until this thread is interrupted, e.g., by -// * shutting down the thread pool on which it is running. -// * -// * Note: If the SocketChannel is closed by an interrupt, -// * then the send request for the [data] payload will fail. -// * However, the SocketChannel will be automatically reopened -// * for the next request (unless the HASendService has been -// * terminated). -// */ -// -// final int nbytes = client.write(b); -// -// nwritten += nbytes; -// -//// if (log.isTraceEnabled()) -//// log.trace("Sent " + nbytes + " bytes with " + nwritten -//// + " of out " + remaining + " written so far"); -// -// } -// return; -// -//// while (client.isOpen()) { -//// -//// if (client.write(b) > 0) { -//// -//// // Sent (N)ACK byte. -//// return; -//// -//// } -//// -//// } -// -//// // channel is closed. -//// throw new AsynchronousCloseException(); -// -// } - + } // class ReadTask /** + * Helper class to drain bytes from the upstream socket until we encounter a + * marker in the stream that immediately proceeds the desired payload. + * + * @author <a href="mailto:mar...@us...">Martyn + * Cutcher</a> + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/724" > HA + * wire pulling and sure kill testing </a> + */ + static private class DrainToMarkerUtil { + + final private byte[] marker; + final private byte[] markerBuffer; + final private ByteBuffer markerBB; + final private Client client; + + private int markerIndex = 0; + private int nmarkerreads = 0; + private int nmarkerbytematches = 0; + + DrainToMarkerUtil(final byte[] marker, final Client client) { + this.marker = marker; + this.markerBuffer = marker == null ? null : new byte[marker.length]; + this.markerBB = marker == null ? null : ByteBuffer + .wrap(markerBuffer); + this.client = client; + + if (log.isDebugEnabled()) + log.debug("Receive token: " + BytesUtil.toHexString(marker)); + + } + + /** + * Note that the logic for finding the token bytes depends on the first + * byte in the token being unique! + * <p> + * We have to be a bit clever to be sure we do not read beyond the token + * and therefore complicate the reading into the localBuffer. + * <p> + * This is optimized for the normal case where the marker is read as + * from the next bytes from the stream. In the worst case scenario this + * could read large amounts of data only a few bytes at a time, however + * this is not in reality a significant overhead. + */ + boolean foundMarker() throws IOException { + + if (log.isDebugEnabled()) + log.debug("Looking for token, " + BytesUtil.toHexString(marker) + + ", reads: " + nmarkerreads); + + while (markerIndex < marker.length) { + + final int remtok = marker.length - markerIndex; + markerBB.limit(remtok); + markerBB.position(0); + + final int rdLen = client.client.read(markerBB); + for (int i = 0; i < rdLen; i++) { + if (markerBuffer[i] != marker[markerIndex]) { + if (nmarkerreads < 2) + log.warn("TOKEN MISMATCH"); + markerIndex = 0; + if (markerBuffer[i] == marker[markerIndex]) { + markerIndex++; + } + } else { + markerIndex++; + nmarkerbytematches++; + } + } + + nmarkerreads++; + if (nmarkerreads % 10000 == 0) { + if (log.isDebugEnabled()) + log.debug("...still looking, reads: " + nmarkerreads); + } + + } + + if (markerIndex != marker.length) { // not sufficient data ready + if (log.isDebugEnabled()) + log.debug("Not found token yet!"); + return false; + } else { + if (log.isDebugEnabled()) + log.debug("Found token after " + nmarkerreads + + " token reads and " + nmarkerbytematches + + " byte matches"); + + return true; + } + + } + + } + + /** * Receive data into the caller's buffer as described by the caller's * message. * @@ -1408,21 +1447,6 @@ public interface IHAReceiveCallback<M extends IHAWriteMessageBase> { /** - * Hook invoked once a buffer has been received. - * - * @param msg - * The message. - * @param data - * The buffer containing the data. The position() will be - * ZERO (0). The limit() will be the #of bytes available. The - * implementation MAY have side effects on the buffer state - * (position, limit, etc). - * - * @throws Exception - */ - void callback(M msg, ByteBuffer data) throws Exception; - - /** * Notify that some payload bytes have been incrementally received for * an {@link IHAMessage}. This is invoked each time some data has been * read from the upstream socket. @@ -1441,6 +1465,22 @@ * @throws Exception */ void incReceive(M msg, int nreads, int rdlen, int rem) throws Exception; + + /** + * Hook invoked once a buffer has been received. + * + * @param msg + * The message. + * @param data + * The buffer containing the data. The position() will be + * ZERO (0). The limit() will be the #of bytes available. The + * implementation MAY have side effects on the buffer state + * (position, limit, etc). + * + * @throws Exception + */ + void callback(M msg, ByteBuffer data) throws Exception; + } /** Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -287,6 +287,10 @@ * * @param buffer * The buffer. + * @param marker + * A marker that will be used to prefix the payload for the + * message in the write replication socket stream. The marker is + * used to ensure synchronization when reading on the stream. * * @return The {@link Future} which can be used to await the outcome of this * operation. @@ -301,8 +305,8 @@ * @todo throws IOException if the {@link SocketChannel} was not open and * could not be opened. */ - public Future<Void> send(final ByteBuffer buffer) { - + public Future<Void> send(final ByteBuffer buffer, final byte[] marker) { + if (buffer == null) throw new IllegalArgumentException(); @@ -320,10 +324,9 @@ // reopenChannel(); - return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer())); + return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer(), marker)); } - /** * A series of timeouts used when we need to re-open the * {@link SocketChannel}. @@ -422,13 +425,17 @@ * * @param buffer * The buffer whose data are to be sent. + * @param marker + * A marker that will be used to prefix the payload for the + * message in the write replication socket stream. The marker is + * used to ensure synchronization when reading on the stream. * * @return The task which will send the data to the configured * {@link InetSocketAddress}. */ - protected Callable<Void> newIncSendTask(final ByteBuffer buffer) { + protected Callable<Void> newIncSendTask(final ByteBuffer buffer, final byte[] marker) { - return new IncSendTask(buffer); + return new IncSendTask(buffer, marker); } @@ -485,25 +492,21 @@ */ protected /*static*/ class IncSendTask implements Callable<Void> { -// private final SocketChannel socketChannel; - private final ByteBuffer data; + private final ByteBuffer data; + private final byte[] marker; - public IncSendTask(/*final SocketChannel socketChannel, */final ByteBuffer data) { + public IncSendTask(final ByteBuffer data, final byte[] marker) { -// if (socketChannel == null) -// throw new IllegalArgumentException(); + if (data == null) + throw new IllegalArgumentException(); - if (data == null) - throw new IllegalArgumentException(); + this.data = data; + this.marker = marker; + } -// this.socketChannel = socketChannel; - - this.data = data; + @Override + public Void call() throws Exception { - } - - public Void call() throws Exception { - // defer until we actually run. final SocketChannel socketChannel = reopenChannel(); @@ -521,10 +524,22 @@ try { - int nwritten = 0; + int nmarker = 0; // #of marker bytes written. + int nwritten = 0; // #of payload bytes written. + + final ByteBuffer markerBB = marker != null ? ByteBuffer + .wrap(marker) : null; while (nwritten < remaining) { + + if (marker != null && nmarker < marker.length) { + + nmarker += socketChannel.write(markerBB); + continue; + + } + /* * Write the data. Depending on the channel, will either * block or write as many bytes as can be written Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -132,6 +132,7 @@ import com.bigdata.ha.msg.IHARemoteRebuildRequest; import com.bigdata.ha.msg.IHARootBlockRequest; import com.bigdata.ha.msg.IHARootBlockResponse; +import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASendStoreResponse; import com.bigdata.ha.msg.IHASnapshotDigestRequest; import ... [truncated message content] |
From: <tho...@us...> - 2013-12-11 16:31:39
|
Revision: 7635 http://bigdata.svn.sourceforge.net/bigdata/?rev=7635&view=rev Author: thompsonbry Date: 2013-12-11 16:31:29 +0000 (Wed, 11 Dec 2013) Log Message: ----------- Added unit tests: - Add test where we kill B after the KB create and then do the large load. - Add test where we kill C after the KB create and then do the large load. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-12-11 16:17:47 UTC (rev 7634) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-12-11 16:31:29 UTC (rev 7635) @@ -233,7 +233,7 @@ */ private ServiceListener serviceListenerA = null, serviceListenerB = null; - protected ServiceListener serviceListenerC = null; + private ServiceListener serviceListenerC = null; private LookupDiscoveryManager lookupDiscoveryManager = null; @@ -268,6 +268,10 @@ return zookeeper; } + /** + * Return the negotiated zookeeper session timeout in milliseconds (if + * available) and otherwise the requested zookeeper session timeout. + */ protected int getZKSessionTimeout() { final ZooKeeper zookeeper = this.zookeeper; if (zookeeper != null) { Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-11 16:17:47 UTC (rev 7634) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-11 16:31:29 UTC (rev 7635) @@ -419,4 +419,96 @@ } + /** + * Test where we start A+B+C in strict sequence. Once we observe that all + * services have gone through the initial KB create, we do a sudden kill of + * B. We then start the live load. This test explores what happens when A is + * not yet aware that B is dead when the UPDATE operation starts. + * + * @throws Exception + */ + public void testABC_awaitKBCreate_killB_LiveLoadRemainsMet() + throws Exception { + + // enforce join order + final ABC startup = new ABC(true /* sequential */); + + final long token = awaitFullyMetQuorum(); + + // await the initial KB commit on all services. + awaitCommitCounter(1L, new HAGlue[] { startup.serverA, startup.serverB, + startup.serverC }); + + kill(startup.serverB); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + awaitPipeline(getZKSessionTimeout() + 5000, TimeUnit.MILLISECONDS, + new HAGlue[] { startup.serverA, startup.serverC }); + + // also check members and joined + awaitMembers(new HAGlue[] { startup.serverA, startup.serverC }); + awaitJoined(new HAGlue[] { startup.serverA, startup.serverC }); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + } + + /** + * Test where we start A+B+C in strict sequence. Once we observe that all + * services have gone through the initial KB create, we do a sudden kill of + * C. We then start the live load. This test explores what happens when A + * and B are not yet aware that C is dead when the UPDATE operation starts. + * + * @throws Exception + */ + public void testABC_awaitKBCreate_killC_LiveLoadRemainsMet() + throws Exception { + + // enforce join order + final ABC startup = new ABC(true /* sequential */); + + final long token = awaitFullyMetQuorum(); + + // await the initial KB commit on all services. + awaitCommitCounter(1L, new HAGlue[] { startup.serverA, startup.serverB, + startup.serverC }); + + kill(startup.serverC); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + awaitPipeline(getZKSessionTimeout() + 5000, TimeUnit.MILLISECONDS, + new HAGlue[] { startup.serverA, startup.serverB }); + + // also check members and joined + awaitMembers(new HAGlue[] { startup.serverA, startup.serverB }); + awaitJoined(new HAGlue[] { startup.serverA, startup.serverB }); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-11 16:17:55
|
Revision: 7634 http://bigdata.svn.sourceforge.net/bigdata/?rev=7634&view=rev Author: thompsonbry Date: 2013-12-11 16:17:47 +0000 (Wed, 11 Dec 2013) Log Message: ----------- Modified HAReceiveService to always drain down the payload for the current message if there is a problem with re-replicating the payload to the downstream service. Added code to expose the HAGlue implementation object - this is the object that gets exported. This makes it possible to reach into the HAGlueTestImpl object in some unit tests. See #724 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-11 15:45:11 UTC (rev 7633) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-11 16:17:47 UTC (rev 7634) @@ -983,6 +983,19 @@ private void doReceiveAndReplicate(final Client client) throws Exception { + /** + * The first cause if downstream replication fails. We make a note + * of this first cause, continue to drain the payload, and then + * rethrow the first cause once the payload has been fully drained. + * This is necessary to ensure that the socket channel does not have + * partial data remaining from an undrained payload. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" + * > HA wire pulling and sure kill testing </a> + */ + Throwable downstreamFirstCause = null; + /* * We should now have parameters ready in the WriteMessage and can * begin transferring data from the stream to the writeCache. @@ -1085,65 +1098,22 @@ callback.incReceive(message, reads, rdlen, rem); } - /* - * Now forward the most recent transfer bytes downstream - * - * @todo Since the downstream writes are against a blocking - * mode channel, the receiver on this node runs in sync with - * the receiver on the downstream node. In fact, those - * processes could be decoupled with a bit more effort and - * are only required to synchronize by the end of each - * received payload. - * - * Note: [addrNext] is final. If the downstream address is - * changed, then the ReadTask is interrupted using its - * Future and the WriteCacheService will handle the error by - * retransmitting the current cache block. - * - * The rdlen is checked for non zero to avoid an - * IllegalArgumentException. - * - * Note: loop since addrNext might change asynchronously. - */ - while(true) { - if (rdlen != 0 && addrNextRef.get() != null) { - if (log.isTraceEnabled()) - log.trace("Incremental send of " + rdlen + " bytes"); - final ByteBuffer out = localBuffer.asReadOnlyBuffer(); - out.position(localBuffer.position() - rdlen); - out.limit(localBuffer.position()); - synchronized (sendService) { - /* - * Note: Code block is synchronized on [downstream] - * to make the decision to start the HASendService - * that relays to [addrNext] atomic. The - * HASendService uses [synchronized] for its public - * methods so we can coordinate this lock with its - * synchronization API. - */ - if (!sendService.isRunning()) { - /* - * Prepare send service for incremental - * transfers to the specified address. - */ - // Check for termination. - client.checkFirstCause(); - // Note: use then current addrNext! - sendService.start(addrNextRef.get()); - continue; - } + if (downstreamFirstCause == null) { + try { + forwardReceivedBytes(client, rdlen); + } catch (ExecutionException ex) { + log.error( + "Downstream replication failure" + + ": will drain payload and then rethrow exception: rootCause=" + + ex, ex); + downstreamFirstCause = ex; } - // Check for termination. - client.checkFirstCause(); - // Send and await Future. - sendService.send(out).get(); - } - break; } - } - } // while( rem > 0 ) + } // while(itr.hasNext()) + } // while( rem > 0 && !EOS ) + if (localBuffer.position() != message.getSize()) throw new IOException("Receive length error: localBuffer.pos=" + localBuffer.position() + ", message.size=" @@ -1164,12 +1134,109 @@ + ", actual=" + (int) chk.getValue()); } + if (downstreamFirstCause != null) { + + /** + * Replication to the downstream service failed. The payload has + * been fully drained. This ensures that we do not leave part of + * the payload in the upstream socket channel. We now wrap and + * rethrow the root cause of the downstream failure. The leader + * will handle this by forcing the remove of the downstream + * service and then re-replicating the payload. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" + * > HA wire pulling and sure kill testing </a> + */ + + throw new RuntimeException( + "Downstream replication failure: msg=" + message + + ", ex=" + downstreamFirstCause, + downstreamFirstCause); + + } + if (callback != null) { + + /* + * The message was received and (if there is a downstream + * service) successfully replicated to the downstream service. + * We now invoke the callback to given this service an + * opportunity to handle the message and the fully received + * payload. + */ + callback.callback(message, localBuffer); + } } // call() + /** + * Now forward the most recent transfer bytes downstream. + * <p> + * + * Note: [addrNext] is final. If the downstream address is changed, then + * the {@link ReadTask} is interrupted using its {@link Future} and the + * WriteCacheService on the leader will handle the error by + * retransmitting the current cache block. + * + * The rdlen is checked for non zero to avoid an + * IllegalArgumentException. + * + * Note: loop since addrNext might change asynchronously. + * + * @throws ExecutionException + * @throws InterruptedException + * + * @todo Since the downstream writes are against a blocking mode + * channel, the receiver on this node runs in sync with the + * receiver on the downstream node. In fact, those processes could + * be decoupled with a bit more effort and are only required to + * synchronize by the end of each received payload. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/724" > + * HA wire pulling and sure kill testing </a> + */ + private void forwardReceivedBytes(final Client client, final int rdlen) + throws InterruptedException, ExecutionException { + while (true) { + if (rdlen != 0 && addrNextRef.get() != null) { + if (log.isTraceEnabled()) + log.trace("Incremental send of " + rdlen + " bytes"); + final ByteBuffer out = localBuffer.asReadOnlyBuffer(); + out.position(localBuffer.position() - rdlen); + out.limit(localBuffer.position()); + synchronized (sendService) { + /* + * Note: Code block is synchronized on [downstream] to + * make the decision to start the HASendService that + * relays to [addrNext] atomic. The HASendService uses + * [synchronized] for its public methods so we can + * coordinate this lock with its synchronization API. + */ + if (!sendService.isRunning()) { + /* + * Prepare send service for incremental transfers to + * the specified address. + */ + // Check for termination. + client.checkFirstCause(); + // Note: use then current addrNext! + sendService.start(addrNextRef.get()); + continue; + } + } + // Check for termination. + client.checkFirstCause(); + // Send and await Future. + sendService.send(out).get(); + } + break; // break out of the inner while loop. + } // while(true) + + } + // private void ack(final Client client) throws IOException { // // if (log.isTraceEnabled()) Modified: branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-12-11 15:45:11 UTC (rev 7633) +++ branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-12-11 16:17:47 UTC (rev 7634) @@ -311,12 +311,12 @@ /** * The service implementation object. */ - protected Remote impl; + private Remote impl; /** * The exported proxy for the service implementation object. */ - protected Remote proxy; + private Remote proxy; /** * The name of the host on which the server is running. @@ -349,6 +349,8 @@ /** * The exported proxy for the service implementation object. + * + * @see #getRemoteImpl() */ public Remote getProxy() { @@ -357,6 +359,17 @@ } /** + * The service implementation object. + * + * @see #getProxy() + */ + public Remote getRemoteImpl() { + + return impl; + + } + + /** * Return the assigned {@link ServiceID}. If this is a new service and the * {@link ServiceUUID} was not specified in the {@link Configuration} then * the {@link ServiceID} will be <code>null</code> until it has been Modified: branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-12-11 15:45:11 UTC (rev 7633) +++ branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-12-11 16:17:47 UTC (rev 7634) @@ -1949,12 +1949,13 @@ * @return an object that implements whatever administration interfaces * are appropriate for the particular service. */ + @Override public Object getAdmin() throws RemoteException { if (log.isInfoEnabled()) log.info("serviceID=" + server.getServiceID()); - return server.proxy; + return server.getProxy(); } Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-11 15:45:11 UTC (rev 7633) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-11 16:17:47 UTC (rev 7634) @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import net.jini.config.Configuration; import net.jini.config.ConfigurationException; @@ -89,8 +90,8 @@ import com.bigdata.journal.ITx; import com.bigdata.journal.StoreState; import com.bigdata.journal.jini.ha.HAJournalServer.HAQuorumService; +import com.bigdata.journal.jini.ha.HAJournalServer.HAQuorumService.IHAProgressListener; import com.bigdata.journal.jini.ha.HAJournalServer.RunStateEnum; -import com.bigdata.journal.jini.ha.HAJournalServer.HAQuorumService.IHAProgressListener; import com.bigdata.quorum.AsynchronousQuorumCloseException; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.zk.ZKQuorumImpl; @@ -125,6 +126,16 @@ return new HAGlueTestImpl(serviceId); } + + /** + * The service implementation object that gets exported (not the proxy, but + * the thing that gets exported as the {@link HAGlueTest} proxy). + */ + public HAGlueTestImpl getRemoteImpl() { + + return (HAGlueTestImpl) getHAJournalServer().getRemoteImpl(); + + } /** * Utility accessible for HAGlueTest methods and public static for @@ -328,7 +339,21 @@ * Supports consistency checking between HA services */ public StoreState getStoreState() throws IOException; - + + /** + * Gets and clears a root cause that was set on the remote service. This + * is used to inspect the root cause when an RMI method is interrupted + * in the local JVM. Since the RMI was interrupted, we can not observe + * the outcome or root cause of the associated failure on the remote + * service. However, test glue can explicitly set that root cause such + * that it can then be reported using this method. + */ + public Throwable getAndClearLastRootCause() throws IOException; + + /** + * Variant that does not clear out the last root cause. + */ + public Throwable getLastRootCause() throws IOException; } /** @@ -437,7 +462,7 @@ * * @see HAJournal.HAGlueService */ - private class HAGlueTestImpl extends HAJournal.HAGlueService + class HAGlueTestImpl extends HAJournal.HAGlueService implements HAGlue, HAGlueTest, RemoteDestroyAdmin { /** @@ -479,7 +504,7 @@ false); private final AtomicLong nextTimestamp = new AtomicLong(-1L); - + private HAGlueTestImpl(final UUID serviceId) { super(serviceId); @@ -1350,7 +1375,45 @@ .set(listener); } + + @Override + public Throwable getAndClearLastRootCause() throws IOException { + + final Throwable t = lastRootCause.getAndSet(null/* newValue */); + + if (t != null) + log.warn("lastRootCause: " + t, t); + + return t; + + } + @Override + public Throwable getLastRootCause() throws IOException { + + final Throwable t = lastRootCause.get(); + + if (t != null) + log.warn("lastRootCause: " + t, t); + + return t; + + } + public void setLastRootCause(final Throwable t) { + + if (log.isInfoEnabled()) + log.info("Setting lastRootCause: " + t); + + lastRootCause.set(t); + + } + + /** + * @see HAGlueTest#getAndClearLastRootCause() + */ + private AtomicReference<Throwable> lastRootCause = new AtomicReference<Throwable>(); + + } // class HAGlueTestImpl /** Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-11 15:45:11 UTC (rev 7633) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-11 16:17:47 UTC (rev 7634) @@ -47,149 +47,148 @@ */ public class TestHA3JustKills extends AbstractHA3JournalServerTestCase { - - /** - * {@inheritDoc} - * <p> - * Note: This overrides some {@link Configuration} values for the - * {@link HAJournalServer} in order to establish conditions suitable for - * testing the {@link ISnapshotPolicy} and {@link IRestorePolicy}. - */ - @Override - protected String[] getOverrides() { - - return new String[]{ + /** + * {@inheritDoc} + * <p> + * Note: This overrides some {@link Configuration} values for the + * {@link HAJournalServer} in order to establish conditions suitable for + * testing the {@link ISnapshotPolicy} and {@link IRestorePolicy}. + */ + @Override + protected String[] getOverrides() { + + return new String[]{ // "com.bigdata.journal.HAJournal.properties=" +TestHA3JournalServer.getTestHAJournalProperties(com.bigdata.journal.HAJournal.properties), - "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)", - "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", + "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)", + "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", // "com.bigdata.journal.jini.ha.HAJournalServer.HAJournalClass=\""+HAJournalTest.class.getName()+"\"", - "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", - }; - - } - - public TestHA3JustKills() { - } + "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", + }; + + } + + public TestHA3JustKills() { + } - public TestHA3JustKills(String name) { - super(name); - } + public TestHA3JustKills(String name) { + super(name); + } - /** - * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start - * a long running LOAD. While the LOAD is running, sure kill C (the last - * follower). Verify that the LOAD completes successfully with the remaining - * services (A+B). - */ - public void testABC_LiveLoadRemainsMet_kill_C() throws Exception { + /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, sure kill C (the last + * follower). Verify that the LOAD completes successfully with the remaining + * services (A+B). + */ + public void testABC_LiveLoadRemainsMet_kill_C() throws Exception { - // enforce join order - final ABC startup = new ABC(true /*sequential*/); - - final long token = awaitFullyMetQuorum(); - - // start concurrent task loads that continue until fully met - final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( - token)); + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); - executorService.submit(ft); + executorService.submit(ft); - // allow load head start - Thread.sleep(300/* ms */); + // allow load head start + Thread.sleep(300/* ms */); - // Verify load is still running. - assertFalse(ft.isDone()); - - // Dump Zookeeper - log.warn("ZOOKEEPER\n" + dumpZoo()); - - kill(startup.serverC); - - // FIXME: in the face of no implemented error propagation we can explicitly - // tell the leader to remove the killed service! - startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + kill(startup.serverC); + + // FIXME: in the face of no implemented error propagation we can explicitly + // tell the leader to remove the killed service! + startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); - awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); + awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); - // token must remain unchanged to indicate same quorum - assertEquals(token, awaitMetQuorum()); + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); - awaitMembers(new HAGlue[] {startup.serverA, startup.serverB}); - awaitJoined(new HAGlue[] {startup.serverA, startup.serverB}); + awaitMembers(new HAGlue[] {startup.serverA, startup.serverB}); + awaitJoined(new HAGlue[] {startup.serverA, startup.serverB}); - // token must remain unchanged to indicate same quorum - assertEquals(token, awaitMetQuorum()); - - // Await LOAD, but with a timeout. - ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); - // token must remain unchanged to indicate same quorum - assertEquals(token, awaitMetQuorum()); + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); - } - - public void testStressABC_LiveLoadRemainsMet_kill_C() throws Exception { - for (int i = 0; i < 5; i++) { - try { - testABC_LiveLoadRemainsMet_kill_C(); - } catch (Throwable t) { - fail("Run " + i, t); - } finally { - Thread.sleep(1000); - destroyAll(); - } - } - } + } + + public void _testStressABC_LiveLoadRemainsMet_kill_C() throws Exception { + for (int i = 0; i < 5; i++) { + try { + testABC_LiveLoadRemainsMet_kill_C(); + } catch (Throwable t) { + fail("Run " + i, t); + } finally { + Thread.sleep(1000); + destroyAll(); + } + } + } - /** - * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start - * a long running LOAD. While the LOAD is running, sure kill B (the first - * follower). Verify that the LOAD completes successfully with the remaining - * services (A+C), after the leader re-orders the pipeline. - */ - public void testABC_LiveLoadRemainsMet_kill_B() throws Exception { + /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, sure kill B (the first + * follower). Verify that the LOAD completes successfully with the remaining + * services (A+C), after the leader re-orders the pipeline. + */ + public void testABC_LiveLoadRemainsMet_kill_B() throws Exception { - // enforce join order - final ABC startup = new ABC(true /*sequential*/); - - final long token = awaitFullyMetQuorum(); - - // start concurrent task loads that continue until fully met - final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( - token)); + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); - executorService.submit(ft); + executorService.submit(ft); - // allow load head start - Thread.sleep(1000/* ms */); + // allow load head start + Thread.sleep(1000/* ms */); - // Verify load is still running. - assertFalse(ft.isDone()); - - // Dump Zookeeper - log.warn("ZOOKEEPER\n" + dumpZoo()); - - kill(startup.serverB); - - // FIXME: temporary call to explicitly remove the service prior to correct protocol - startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + kill(startup.serverB); + + // FIXME: temporary call to explicitly remove the service prior to correct protocol + startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); - awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); - - // also check members and joined - awaitMembers(new HAGlue[] {startup.serverA, startup.serverC}); - awaitJoined(new HAGlue[] {startup.serverA, startup.serverC}); + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); + + // also check members and joined + awaitMembers(new HAGlue[] {startup.serverA, startup.serverC}); + awaitJoined(new HAGlue[] {startup.serverA, startup.serverC}); - // token must remain unchanged to indicate same quorum - assertEquals(token, awaitMetQuorum()); - - // Await LOAD, but with a timeout. - ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); - // token must remain unchanged to indicate same quorum - assertEquals(token, awaitMetQuorum()); + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); - } + } /** * Base class for sure kill of a process when write replication reaches a @@ -243,8 +242,13 @@ log.error("msg=" + msg + ", nreads=" + nreads + ", rdlen=" + rdlen + ", rem=" + rem); - // Note: This is the *opening* root block counter. - if (msg.getCommitCounter() == 1L && nreads > 1) { + /* + * Note: This is the *opening* root block counter. The process will + * be killed as soon as it has received the first chunk of data for + * the payload of the first replicated write cache block. + */ + + if (msg.getCommitCounter() == 1L) { sureKill(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2013-12-11 15:45:17
|
Revision: 7633 http://bigdata.svn.sourceforge.net/bigdata/?rev=7633&view=rev Author: martyncutcher Date: 2013-12-11 15:45:11 +0000 (Wed, 11 Dec 2013) Log Message: ----------- Refactor to use TokenDrain class to encapsulate token processing Modified Paths: -------------- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-11 14:32:56 UTC (rev 7632) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-11 15:45:11 UTC (rev 7633) @@ -1026,6 +1026,82 @@ } // call. + class TokenDrain { + final byte[] token; + final byte[] tokenBuffer; + final ByteBuffer tokenBB; + final Client client; + + int tokenIndex = 0; + int ntokenreads = 0; + int ntokenbytematches = 0; + + TokenDrain(final byte[] token, final Client client) { + this.token = token; + this.tokenBuffer = token == null ? null : new byte[token.length]; + this.tokenBB = token == null ? null : ByteBuffer.wrap(tokenBuffer); + this.client = client; + + if (log.isDebugEnabled()) + log.debug("Receive token: " + BytesUtil.toHexString(token)); + + } + + boolean foundToken() throws IOException { + + if (log.isDebugEnabled()) + log.debug("Looking for token, reads: " + ntokenreads); + // Note that the logic for finding the token bytes depends on the + // first byte in the token being unique! + // We have to be a bit clever to be sure we do not read beyond the + // token and therefore complicate the reading into the localBuffer. + // This is optimized for the normal case where the key token is read + // as the next bytes from the stream. In the worst case scenario this + // could read large amounts of data only a few bytes at a time, however + // this is not in reality a significant overhead. + while (tokenIndex < token.length ) { + final int remtok = token.length - tokenIndex; + tokenBB.limit(remtok); + tokenBB.position(0); + + final int rdLen = client.client.read(tokenBB); + for (int i = 0; i < rdLen; i++) { + if (tokenBuffer[i] != token[tokenIndex]) { + if (ntokenreads < 2) + log.warn("TOKEN MISMATCH"); + tokenIndex = 0; + if (tokenBuffer[i] == token[tokenIndex]) { + tokenIndex++; + } + } else { + tokenIndex++; + ntokenbytematches++; + } + } + + ntokenreads++; + if (ntokenreads % 10000 == 0) { + if (log.isDebugEnabled()) + log.debug("...still looking, reads: " + ntokenreads); + } + + } + + if (tokenIndex != token.length) { // not sufficient data ready + if (log.isDebugEnabled()) + log.debug("Not found token yet!"); + return false; + } else { + if (log.isDebugEnabled()) + log.debug("Found token after " + ntokenreads + " token reads and " + ntokenbytematches + " byte matches"); + + return true; + } + + } + + } + private void doReceiveAndReplicate(final Client client) throws Exception { @@ -1047,19 +1123,10 @@ // for debug retain number of low level reads int reads = 0; - // setup token values to search for any provided token prefix - final byte[] token = message.getToken(); + final TokenDrain tokenDrain = new TokenDrain(message.getToken(), client); - boolean foundStart = token == null; // if null then not able to check - int tokenIndex = 0; - final byte[] tokenBuffer = token == null ? null : new byte[token.length]; - final ByteBuffer tokenBB = token == null ? null : ByteBuffer.wrap(tokenBuffer); - - if (log.isDebugEnabled()) - log.debug("Receive token: " + BytesUtil.toHexString(token)); + while (rem > 0 && !EOS) { - while (rem > 0 && !EOS) { - // block up to the timeout. final int nkeys = client.clientSelector .select(selectorTimeout/* ms */); @@ -1115,62 +1182,13 @@ final Iterator<SelectionKey> iter = keys.iterator(); - int ntokenreads = 0; - int ntokenbytematches = 0; - while (iter.hasNext()) { iter.next(); iter.remove(); - if (!foundStart) { - if (log.isDebugEnabled()) - log.debug("Looking for token, reads: " + ntokenreads); - // Note that the logic for finding the token bytes depends on the - // first byte in the token being unique! - // We have to be a bit clever to be sure we do not read beyond the - // token and therefore complicate the reading into the localBuffer. - // This is optimized for the normal case where the key token is read - // as the next bytes from the stream. In the worst case scenario this - // could read large amounts of data only a few bytes at a time, however - // this is not in reality a significant overhead. - while (tokenIndex < token.length ) { - final int remtok = token.length - tokenIndex; - tokenBB.limit(remtok); - tokenBB.position(0); - - final int rdLen = client.client.read(tokenBB); - for (int i = 0; i < rdLen; i++) { - if (tokenBuffer[i] != token[tokenIndex]) { - if (ntokenreads < 2) - log.warn("TOKEN MISMATCH"); - tokenIndex = 0; - if (tokenBuffer[i] == token[tokenIndex]) { - tokenIndex++; - } - } else { - tokenIndex++; - ntokenbytematches++; - } - } - - ntokenreads++; - if (ntokenreads % 10000 == 0) { - if (log.isDebugEnabled()) - log.debug("...still looking, reads: " + ntokenreads); - } - - foundStart = tokenIndex == token.length; - } - - if (!foundStart) { // not sufficient data ready - // if (log.isDebugEnabled()) - log.warn("Not found token yet!"); - continue; - } else { - if (log.isDebugEnabled()) - log.debug("Found token after " + ntokenreads + " token reads and " + ntokenbytematches + " byte matches"); - } + if (!tokenDrain.foundToken()) { + continue; } final int rdlen = client.client.read(localBuffer); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-11 14:33:03
|
Revision: 7632 http://bigdata.svn.sourceforge.net/bigdata/?rev=7632&view=rev Author: thompsonbry Date: 2013-12-11 14:32:56 +0000 (Wed, 11 Dec 2013) Log Message: ----------- javadoc - linking the changes to ticket #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-12-11 13:28:48 UTC (rev 7631) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-12-11 14:32:56 UTC (rev 7632) @@ -2343,6 +2343,9 @@ * that will force them to fail rather than block forever. This will * then force the service into an error state if its QuorumActor can not * carry out the requested action within a specified timeout. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/724" > + * HA wire pulling and sure kill testing </a> */ @Override final public void forceRemoveService(final UUID psid) { Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2013-12-11 13:28:48 UTC (rev 7631) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2013-12-11 14:32:56 UTC (rev 7632) @@ -219,6 +219,9 @@ * * @param serviceId * The UUID of the service to be removed. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/724" > HA + * wire pulling and sure kill testing </a> */ public void forceRemoveService(UUID serviceId); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-11 13:28:57
|
Revision: 7631 http://bigdata.svn.sourceforge.net/bigdata/?rev=7631&view=rev Author: thompsonbry Date: 2013-12-11 13:28:48 +0000 (Wed, 11 Dec 2013) Log Message: ----------- Modified forceRemoveService() to use the following order: - serviceLeave - withdrawVote - pipelineRemove - memberRemove The is the reverse of the order in which the service establishes itself. It is important to do the service leave first in case the remote service was not truely dead. @see #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-12-10 21:36:50 UTC (rev 7630) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-12-11 13:28:48 UTC (rev 7631) @@ -2358,10 +2358,18 @@ protected void doAction() throws InterruptedException { log.warn("Forcing remove of service" + ": thisService=" + serviceId + ", otherServiceId=" + psid); - doMemberRemove(psid); + /** + * Note: The JOINED[] entry MUST be removed first in case the + * service is not truely dead. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" + * > HA wire pulling and sure kill testing </a> + */ + doServiceLeave(psid); doWithdrawVote(psid); doPipelineRemove(psid); - doServiceLeave(psid); + doMemberRemove(psid); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-10 21:37:05
|
Revision: 7630 http://bigdata.svn.sourceforge.net/bigdata/?rev=7630&view=rev Author: thompsonbry Date: 2013-12-10 21:36:50 +0000 (Tue, 10 Dec 2013) Log Message: ----------- Sync to Martyn on #779. I have added a listener protocol for the incremental write replication and sufficient hooks that we can force the failure of the write replication protocol by intercepting it at some desired point between the receive and the replicate of bytes in a payload. I added a sure kill for A+B+C where B is killed during a large load when it has receive two chunks of data for the large load. This test passes. I did a similar test where C is killed. This test fails - it gets to the end of the test and then discovers that the LOAD failed. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/TestAll.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/TestWORMStrategyNoCache.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/HABranch.txt branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java Removed Paths: ------------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWORMStrategy.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWritePipeline.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestJournalHA.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -847,11 +847,20 @@ // Setup the receive service. receiveService = new HAReceiveService<HAMessageWrapper>(addrSelf, addrNext, new IHAReceiveCallback<HAMessageWrapper>() { + @Override public void callback(final HAMessageWrapper msg, final ByteBuffer data) throws Exception { // delegate handling of write cache blocks. handleReplicatedWrite(msg.req, msg.msg, data); } + @Override + public void incReceive(final HAMessageWrapper msg, + final int nreads, final int rdlen, + final int rem) throws Exception { + // delegate handling of incremental receive notify. + QuorumPipelineImpl.this.incReceive(msg.req, + msg.msg, nreads, rdlen, rem); + } }); // Start the receive service - will not return until service is // running @@ -2056,6 +2065,27 @@ final IHAWriteMessage msg, final ByteBuffer data) throws Exception; /** + * Notify that some payload bytes have been incrementally received for an + * {@link IHAMessage}. + * + * @param msg + * The message. + * @param nreads + * The number of reads performed against the upstream socket for + * this message. + * @param rdlen + * The number of bytes read from the socket in this read. + * @param rem + * The number of bytes remaining before the payload has been + * fully read. + * + * @throws Exception + */ + abstract protected void incReceive(final IHASyncRequest req, + final IHAWriteMessage msg, final int nreads, final int rdlen, + final int rem) throws Exception; + + /** * A utility class that bundles together the Internet address and port at which * the downstream service will accept and relay cache blocks for the write * pipeline and the remote interface which is used to communicate with that Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -109,6 +109,15 @@ } @Override + protected void incReceive(final IHASyncRequest req, + final IHAWriteMessage msg, final int nreads, + final int rdlen, final int rem) throws Exception { + + QuorumServiceBase.this.incReceive(req, msg, nreads, rdlen, rem); + + } + + @Override protected long getRetrySendTimeoutNanos() { return QuorumServiceBase.this.getRetrySendTimeoutNanos(); @@ -262,8 +271,31 @@ */ abstract protected void handleReplicatedWrite(IHASyncRequest req, IHAWriteMessage msg, ByteBuffer data) throws Exception; - + /** + * Core implementation of callback for monitoring progress of replicated + * writes. + * + * @param req + * The synchronization request (optional). When non- + * <code>null</code> the message and payload are historical data. + * When <code>null</code> they are live data. + * @param msg + * Metadata about a buffer containing data replicated to this + * node. + * @param rdlen + * The number of bytes read from the socket in this read. + * @param rem + * The number of bytes remaining before the payload has been + * fully read. + * + * @throws Exception + */ + abstract protected void incReceive(final IHASyncRequest req, + final IHAWriteMessage msg, final int nreads, final int rdlen, + final int rem) throws Exception; + + /** * {@inheritDoc} * <p> * Note: The default implementation is a NOP. Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -52,6 +52,7 @@ import org.apache.log4j.Logger; import com.bigdata.ha.QuorumPipelineImpl; +import com.bigdata.ha.msg.IHAMessage; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.ha.msg.IHAWriteMessageBase; @@ -1079,6 +1080,11 @@ rem -= rdlen; + if (callback != null) { + // notify of incremental read. + callback.incReceive(message, reads, rdlen, rem); + } + /* * Now forward the most recent transfer bytes downstream * @@ -1349,6 +1355,25 @@ */ void callback(M msg, ByteBuffer data) throws Exception; + /** + * Notify that some payload bytes have been incrementally received for + * an {@link IHAMessage}. This is invoked each time some data has been + * read from the upstream socket. + * + * @param msg + * The message. + * @param nreads + * The number of reads performed against the upstream socket + * for this message. + * @param rdlen + * The number of bytes read from the socket in this read. + * @param rem + * The number of bytes remaining before the payload has been + * fully read. + * + * @throws Exception + */ + void incReceive(M msg, int nreads, int rdlen, int rem) throws Exception; } /** Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -131,10 +131,30 @@ * @see #start(InetSocketAddress) */ public HASendService() { + + this(true/* blocking */); + + } + + /** + * Note: This constructor is not exposed yet. We need to figure out whether + * to allow the configuration of the socket options and how to support that. + * + * @param blocking + */ + private HASendService(final boolean blocking) { + + this.blocking = blocking; - } + } /** + * <code>true</code> iff the client socket will be setup in a blocking mode. + * This is the historical behavior until at least Dec 10, 2013. + */ + private final boolean blocking; + + /** * Extended to ensure that the private executor service is always * terminated. */ @@ -422,22 +442,29 @@ * * @throws IOException */ - static private SocketChannel openChannel(final InetSocketAddress addr) + private SocketChannel openChannel(final InetSocketAddress addr) throws IOException { final SocketChannel socketChannel = SocketChannel.open(); try { - socketChannel.configureBlocking(true); + socketChannel.configureBlocking(blocking); if (log.isTraceEnabled()) log.trace("Connecting to " + addr); - socketChannel.connect(addr); + if (!socketChannel.connect(addr)) { + while (!socketChannel.finishConnect()) { + try { + Thread.sleep(10/* millis */); + } catch (InterruptedException e) { + // Propagate interrupt. + Thread.currentThread().interrupt(); + } + } + } - socketChannel.finishConnect(); - } catch (IOException ex) { log.error(ex); @@ -520,7 +547,7 @@ * socket buffer exchange and the send() Future will report * success even through the application code on the receiver * could fail once it gets control back from select(). This - * twist can be a bit suprising. Therefore it is useful to + * twist can be a bit surprising. Therefore it is useful to * write tests with both small payloads (the data transfer * will succeed at the socket level even if the application * logic then fails the transfer) and for large payloads. Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/TestAll.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/TestAll.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/TestAll.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -108,7 +108,6 @@ suite.addTest(com.bigdata.io.writecache.TestAll.suite()); suite.addTest( com.bigdata.journal.TestAll.suite() ); suite.addTest( com.bigdata.rwstore.TestAll.suite() ); - suite.addTest( com.bigdata.journal.ha.TestAll.suite() ); suite.addTest( com.bigdata.resources.TestAll.suite() ); suite.addTest( com.bigdata.relation.TestAll.suite() ); suite.addTest( com.bigdata.bop.TestAll.suite() ); Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -339,6 +339,13 @@ } @Override + protected void incReceive(final IHASyncRequest req, + final IHAWriteMessage msg, final int nreads, + final int rdlen, final int rem) throws Exception { + // NOP + } + + @Override public UUID getStoreUUID() { return MyMockQuorumMember.this.getStoreUUID(); } @@ -380,7 +387,7 @@ MyMockQuorumMember.this.purgeHALogs(token); } - + }); } Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/TestWORMStrategyNoCache.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/TestWORMStrategyNoCache.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/TestWORMStrategyNoCache.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -35,7 +35,6 @@ import junit.framework.Test; import com.bigdata.io.DirectBufferPool; -import com.bigdata.journal.ha.TestHAWORMStrategy; import com.bigdata.rawstore.IRawStore; /** @@ -44,7 +43,7 @@ * to operation when caching is disabled. * <p> * Note: The HA journal requires that cache be enabled. However, the HA journal - * is tested by a different test suite. See {@link TestHAWORMStrategy}. + * is tested by a different test suite. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ Deleted: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -1,626 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Oct 14, 2006 - */ - -package com.bigdata.journal.ha; - -import java.io.File; -import java.io.IOException; -import java.net.BindException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.rmi.Remote; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import junit.framework.TestCase; - -import com.bigdata.LRUNexus; -import com.bigdata.ha.HAGlue; -import com.bigdata.ha.QuorumService; -import com.bigdata.ha.QuorumServiceBase; -import com.bigdata.ha.msg.IHASyncRequest; -import com.bigdata.ha.msg.IHAWriteMessage; -import com.bigdata.journal.AbstractJournal; -import com.bigdata.journal.AbstractJournalTestCase; -import com.bigdata.journal.IRootBlockView; -import com.bigdata.journal.Journal; -import com.bigdata.journal.Options; -import com.bigdata.journal.ProxyTestCase; -import com.bigdata.quorum.MockQuorumFixture; -import com.bigdata.quorum.Quorum; -import com.bigdata.quorum.QuorumActor; - -/** - * <p> - * Abstract harness for testing under a variety of configurations. In order to - * test a specific configuration, create a concrete instance of this class. The - * configuration can be described using a mixture of a <code>.properties</code> - * file of the same name as the test class and custom code. - * </p> - * <p> - * When debugging from an IDE, it is very helpful to be able to run a single - * test case. You can do this, but you MUST define the property - * <code>testClass</code> as the name test class that has the logic required - * to instantiate and configure an appropriate object manager instance for the - * test. - * </p> - */ -abstract public class AbstractHAJournalTestCase - extends AbstractJournalTestCase -{ - - // - // Constructors. - // - - public AbstractHAJournalTestCase() {} - - public AbstractHAJournalTestCase(String name) {super(name);} - - //************************************************************ - //************************************************************ - //************************************************************ - - /** - * The replication factor for the quorum. This is initialized in - * {@link #setUp(ProxyTestCase)} so you can override it. The default is - * <code>3</code>, which is a highly available quorum. - */ - protected int k; - - /** - * The replication count (#of journals) for the quorum. This is initialized - * in {@link #setUp(ProxyTestCase)} so you can override it. The default is - * <code>3</code>, which is the same as the default replication factor - * {@link #k}. - */ - protected int replicationCount; - /** - * The fixture provides a mock of the distributed quorum state machine. - */ - protected MockQuorumFixture fixture = null; - /** - * The logical service identifier. - */ - protected String logicalServiceId = null; - /** - * The {@link Journal}s which are the members of the logical service. - */ - private Journal[] stores = null; - - /** - * Invoked from {@link TestCase#setUp()} for each test in the suite. - */ - public void setUp(ProxyTestCase testCase) throws Exception { - - super.setUp(testCase); - -// if(log.isInfoEnabled()) -// log.info("\n\n================:BEGIN:" + testCase.getName() -// + ":BEGIN:===================="); - - fixture = new MockQuorumFixture(); -// fixture.start(); - logicalServiceId = "logicalService_" + getName(); - - k = 3; - - replicationCount = 3; - - } - - /** - * Invoked from {@link TestCase#tearDown()} for each test in the suite. - */ - public void tearDown(ProxyTestCase testCase) throws Exception { - - if (stores != null) { - for (Journal store : stores) { - if (store != null) { - store.destroy(); - } - } - } - - if(fixture!=null) { - fixture.terminate(); - fixture = null; - } - - logicalServiceId = null; - - super.tearDown(testCase); - - } - - /** - * Note: Due to the manner in which the {@link MockQuorumManager} is - * initialized, the elements in {@link #stores} will be <code>null</code> - * initially. This should be Ok as long as the test gets fully setup before - * you start making requests of the {@link QuorumManager} or the - * {@link Quorum}. - */ - @Override - protected Journal getStore(final Properties properties) { - - stores = new Journal[replicationCount]; - - for (int i = 0; i < replicationCount; i++) { - - stores[i] = newJournal(properties); - - } - - /* - * FIXME It appears that it is necessary to start the QuorumFixture and - * then each Quorum *BEFORE* any quorum member takes an action, e.g., - * by doing a memberAdd(). That is likely a flaw in the QuorumFixture - * or the Quorum code. - */ - fixture.start(); - - for (int i = 0; i < replicationCount; i++) { - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = stores[i] - .getQuorum(); - final HAJournal jnl = (HAJournal) stores[i]; - final UUID serviceId = jnl.getUUID(); - quorum.start(newQuorumService(logicalServiceId, serviceId, - jnl.newHAGlue(serviceId), jnl)); - } - - for (int i = 0; i < replicationCount; i++) { - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = stores[i] - .getQuorum(); - final HAJournal jnl = (HAJournal) stores[i]; - /* - * Tell the actor to try and join the quorum. It will join iff our - * current root block can form a simple majority with the other - * services in the quorum. - */ - final QuorumActor<?, ?> actor = quorum.getActor(); - try { - - actor.memberAdd(); - fixture.awaitDeque(); - - actor.pipelineAdd(); - fixture.awaitDeque(); - - actor.castVote(jnl.getLastCommitTime()); - fixture.awaitDeque(); - - } catch (InterruptedException ex) { - - throw new RuntimeException(ex); - - } - } - - /* - * Initialize the master first. The followers will get their root blocks - * from the master. - */ -// fixture.join(stores[0]); -// fixture.join(stores[1]); -// fixture.join(stores[2]); -// -// stores[1].takeRootBlocksFromLeader(); -// stores[2].takeRootBlocksFromLeader(); - - /* - * @todo we probably have to return the service which joined as the - * leader from getStore(). - */ - final Quorum<HAGlue, QuorumService<HAGlue>> q = stores[0].getQuorum(); - - try { - - final long token = q.awaitQuorum(1L,TimeUnit.SECONDS); - assertEquals(token, stores[1].getQuorum().awaitQuorum(1L,TimeUnit.SECONDS)); - assertEquals(token, stores[2].getQuorum().awaitQuorum(1L,TimeUnit.SECONDS)); - - q.getClient().assertLeader(token); - - assertEquals(k, q.getMembers().length); - - } catch (TimeoutException ex) { -for(int i=0; i<3; i++)log.error("quorum["+i+"]:"+(stores[i].getQuorum()).toString()); - throw new RuntimeException(ex); - - } catch (InterruptedException ex) { - - throw new RuntimeException(ex); - - } - - // return the master. - return stores[0]; - - } - - protected Journal newJournal(final Properties properties) { - - /* - * Initialize the HA components. - */ - - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = newQuorum(); - - final HAJournal jnl = new HAJournal(properties, quorum); - -// /* -// * FIXME This probably should be a constant across the life cycle of the -// * service, in which case it needs to be elevated outside of this method -// * which is used both to open and re-open the journal. -// */ -// final UUID serviceId = UUID.randomUUID(); - - /* - * Set the client on the quorum. - * - * FIXME The client needs to manage the quorumToken and various other - * things. - */ -// quorum.start(newQuorumService(logicalServiceId, serviceId, jnl -// .newHAGlue(serviceId), jnl)); - -// // discard the current write set. -// abort(); -// -// // set the quorum object. -// this.quorum.set(quorum); -// -// // save off the current token (typically NO_QUORUM unless standalone). -// quorumToken = quorum.token(); - -// /* -// * Tell the actor to try and join the quorum. It will join iff our -// * current root block can form a simple majority with the other services -// * in the quorum. -// */ -// final QuorumActor<?, ?> actor = quorum.getActor(); -// try { -// -// actor.memberAdd(); -// fixture.awaitDeque(); -// -// actor.pipelineAdd(); -// fixture.awaitDeque(); -// -// actor.castVote(jnl.getLastCommitTime()); -// fixture.awaitDeque(); -// -// } catch (InterruptedException ex) { -// -// throw new RuntimeException(ex); -// -// } - - return jnl; - -// return new Journal(properties) { -// -// protected Quorum<HAGlue, QuorumService<HAGlue>> newQuorum() { -// -// return AbstractHAJournalTestCase.this.newQuorum(); -// -// }; -// -// }; - - } - - private static class HAJournal extends Journal { - - /** - * @param properties - */ - public HAJournal(Properties properties, - Quorum<HAGlue, QuorumService<HAGlue>> quorum) { - super(properties, quorum); - } - - /** - * {@inheritDoc} - * <p> - * Note: This uses a random port on the loopback address. - */ - @Override - public HAGlue newHAGlue(final UUID serviceId) { - - final InetSocketAddress writePipelineAddr; - try { - writePipelineAddr = new InetSocketAddress(getPort(0)); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - - return new HAGlueService(serviceId, writePipelineAddr); - - } - - /** - * Return an unused port. - * - * @param suggestedPort - * The suggested port. - * - * @return The suggested port, unless it is zero or already in use, in which - * case an unused port is returned. - * - * @throws IOException - */ - static protected int getPort(int suggestedPort) throws IOException { - ServerSocket openSocket; - try { - openSocket = new ServerSocket(suggestedPort); - } catch (BindException ex) { - // the port is busy, so look for a random open port - openSocket = new ServerSocket(0); - } - final int port = openSocket.getLocalPort(); - openSocket.close(); - return port; - } - - /** - * Extended implementation supports RMI. - */ - protected class HAGlueService extends BasicHA { - - protected HAGlueService(final UUID serviceId, - final InetSocketAddress writePipelineAddr) { - - super(serviceId, writePipelineAddr); - - } - - } - - } - - protected Quorum<HAGlue, QuorumService<HAGlue>> newQuorum() { - - return new MockQuorumFixture.MockQuorum<HAGlue, QuorumService<HAGlue>>( - k, fixture); - - }; - - /** - * Factory for the {@link QuorumService} implementation. - * - * @param remoteServiceImpl - * The object that implements the {@link Remote} interfaces - * supporting HA operations. - */ - protected QuorumServiceBase<HAGlue, AbstractJournal> newQuorumService( - final String logicalServiceId, - final UUID serviceId, final HAGlue remoteServiceImpl, - final AbstractJournal store) { - - return new QuorumServiceBase<HAGlue, AbstractJournal>(logicalServiceId, - serviceId, remoteServiceImpl, store) { - - /** - * Only the local service implementation object can be resolved. - */ - @Override - public HAGlue getService(final UUID serviceId) { - - return (HAGlue) fixture.getService(serviceId); - - } - - /** - * FIXME handle replicated writes. Probably just dump it on the - * jnl's WriteCacheService. Or maybe wrap it back up using a - * WriteCache and let that lay it down onto the disk. - */ - @Override - protected void handleReplicatedWrite(IHASyncRequest req, - IHAWriteMessage msg, ByteBuffer data) throws Exception { - - -// new WriteCache() { -// -// @Override -// protected boolean writeOnChannel(ByteBuffer buf, long firstOffset, -// Map<Long, RecordMetadata> recordMap, long nanos) -// throws InterruptedException, TimeoutException, IOException { -// // TODO Auto-generated method stub -// return false; -// } -// }; - - throw new UnsupportedOperationException(); - } - - @Override - public void installRootBlocks(IRootBlockView rootBlock0, - final IRootBlockView rootBlock1) { - throw new UnsupportedOperationException(); - } - -// @Override -// public void didMeet(final long token, final long commitCounter, -// final boolean isLeader) { -// throw new UnsupportedOperationException(); -// } - - @Override - public File getServiceDir() { - throw new UnsupportedOperationException(); - } - - @Override - public int getPID() { - throw new UnsupportedOperationException(); - } - - @Override - public void enterErrorState() { - // TODO Auto-generated method stub - } - - @Override - public void discardWriteSet() { - // TODO Auto-generated method stub - } - - @Override - protected long getRetrySendTimeoutNanos() { - return TimeUnit.MILLISECONDS.toNanos(100); // 100ms by default - } - - }; - - } - - /** - * Re-open the same backing store. - * - * @param store - * the existing store. - * - * @return A new store. - * - * @exception Throwable - * if the existing store is closed or if the store can not be - * re-opened, e.g., from failure to obtain a file lock, etc. - */ - @Override - protected Journal reopenStore(final Journal store) { - - if (stores[0] != store) - throw new AssertionError(); - - for (int i = 0; i < stores.length; i++) { - - Journal aStore = stores[i]; - - if (LRUNexus.INSTANCE != null) { - /* - * Drop the record cache for this store on reopen. This makes it - * easier to find errors related to a difference in the bytes on - * the disk versus the bytes in the record cache. - */ - LRUNexus.INSTANCE.deleteCache(aStore.getUUID()); - } - - // close the store. - aStore.close(); - - if (!aStore.isStable()) { - - throw new UnsupportedOperationException( - "The backing store is not stable"); - - } - - // Note: clone to avoid modifying!!! - final Properties properties = (Properties) getProperties().clone(); - - // Turn this off now since we want to re-open the same store. - properties.setProperty(Options.CREATE_TEMP_FILE, "false"); - - // The backing file that we need to re-open. - final File file = aStore.getFile(); - - if (file == null) - throw new AssertionError(); - - // Set the file property explicitly. - properties.setProperty(Options.FILE, file.toString()); - - stores[i] = newJournal(properties); - - } - - return stores[0]; - - } - -// /** -// * Begin to run as part of a highly available {@link Quorum}. -// * -// * @param newQuorum -// * The {@link Quorum}. -// */ -// public void joinQuorum(final Quorum<HAGlue, QuorumService<HAGlue>> quorum) { -// -// if (quorum == null) -// throw new IllegalArgumentException(); -// -// final WriteLock lock = _fieldReadWriteLock.writeLock(); -// -// lock.lock(); -// -// try { -// -// if (this.quorum.get() != null) { -// // Already running with some quorum. -// throw new IllegalStateException(); -// } -// -// // discard the current write set. -// abort(); -// -// // set the quorum object. -// this.quorum.set(quorum); -// -// // save off the current token (typically NO_QUORUM unless standalone). -// quorumToken = quorum.token(); -// -// /* -// * Tell the actor to try and join the quorum. It will join iff our -// * current root block can form a simple majority with the other -// * services in the quorum. -// */ -// final QuorumActor<?,?> actor = quorum.getActor(); -// actor.memberAdd(); -// actor.pipelineAdd(); -// actor.castVote(getRootBlockView().getLastCommitTime()); -// -// } catch (Throwable e) { -// -// throw new RuntimeException(e); -// -// } finally { -// -// lock.unlock(); -// -// } -// -// } - -} \ No newline at end of file Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/HABranch.txt =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/HABranch.txt 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/HABranch.txt 2013-12-10 21:36:50 UTC (rev 7630) @@ -388,57 +388,12 @@ TODO: - - RWStore, write cache, write replication, standalone HA in - JOURNAL_HA_BRANCH. - - - The RW store needs to record delete blocks and needs a thread, - periodic task, or commit time task for releasing old commit - points. (Will be done with allocation block bitmap deltas, but - must retain session for resynchronization to be possible.) - - - WORM checksums. Historical WORM stores will not support this so - it needs to be captured by the WORM version number. - - - WORM integration with the write cache service. - - - WriteCacheService lock, sleep and possible gap and deadlock - issues. - - - Interrupted during sleep logs error. - - - WARN : 6938 main - com.bigdata.io.WriteCache.resetWith(WriteCache.java:1368): - Written WriteCache but with no records - - RW is always using 6 buffers. This must be a configuration option so we can stress test the WriteCacheService under heavy write loads and mixed write/read loads with lots of concurrency and only a few buffers. We need to do this to look for deadlocks. - - AbstractJournal: Modify to log the root block to be overwritten - during the commit protocol so we can potentially restore it - from the file. This is easier to do for the WORM and would - require a search of the appropriate allocation block's records - for the RW looking for anything which has the right magic value - and can also be interpreted as a RootBlockView (passes the - checksum, etc). - - - API to accept a pipeline update (watch on the children of a - znode for the logical journal) and to notify if you are no - longer the master (SessionExpiredException when you try some - zookeeper operation). Internally, the code has to handle the - join / leave. - - - API for replication and resynchronization writes. Slaves - should verify checksums as calculated by the master. - Differentiate between replication (ascending writes for the - WORM), resynchronization (delta to the end of the file for the - WORM), and write replacement (random write for both). - - - Journal must be aware of master/slave state and whether it is - caught up and can therefore support reads. - - Handle read errors by reading on a peer. Note that some file systems will retry MANY times, which could hang the caller (timed reads? how?). Consider doing replacement writes over @@ -448,96 +403,3 @@ HA. - Report read errors in support of decision making about failure. - -============================================================ - -During BSBM 2785 data load on laptop. 4/20/2010 - -java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.AssertionError: There are 2 outstanding permits (should be just one). - at com.bigdata.rdf.spo.SPORelation.insert(SPORelation.java:1826) - at com.bigdata.rdf.store.AbstractTripleStore.addStatements(AbstractTripleStore.java:3261) - at com.bigdata.rdf.rio.StatementBuffer.writeSPOs(StatementBuffer.java:989) - at com.bigdata.rdf.rio.StatementBuffer.addStatements(StatementBuffer.java:869) - at com.bigdata.rdf.rio.StatementBuffer.incrementalWrite(StatementBuffer.java:708) - at com.bigdata.rdf.rio.StatementBuffer.add(StatementBuffer.java:784) - at com.bigdata.rdf.rio.StatementBuffer.add(StatementBuffer.java:766) - at com.bigdata.rdf.sail.BigdataSail$BigdataSailConnection.addStatement(BigdataSail.java:1918) - at com.bigdata.rdf.sail.BigdataSail$BigdataSailConnection.addStatement(BigdataSail.java:1879) - at org.openrdf.repository.sail.SailRepositoryConnection.addWithoutCommit(SailRepositoryConnection.java:228) - at org.openrdf.repository.base.RepositoryConnectionBase.add(RepositoryConnectionBase.java:455) - at org.openrdf.repository.util.RDFInserter.handleStatement(RDFInserter.java:196) - at org.openrdf.rio.ntriples.NTriplesParser.parseTriple(NTriplesParser.java:260) - at org.openrdf.rio.ntriples.NTriplesParser.parse(NTriplesParser.java:170) - at org.openrdf.rio.ntriples.NTriplesParser.parse(NTriplesParser.java:112) - at org.openrdf.repository.base.RepositoryConnectionBase.addInputStreamOrReader(RepositoryConnectionBase.java:353) - at org.openrdf.repository.base.RepositoryConnectionBase.add(RepositoryConnectionBase.java:242) - at org.openrdf.repository.base.RepositoryConnectionBase.add(RepositoryConnectionBase.java:239) - at org.openrdf.repository.base.RepositoryConnectionBase.add(RepositoryConnectionBase.java:202) - at benchmark.bigdata.BigdataLoader.loadData(BigdataLoader.java:159) - at benchmark.bigdata.BigdataLoader.main(BigdataLoader.java:109) -Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: There are 2 outstanding permits (should be just one). - at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) - at java.util.concurrent.FutureTask.get(FutureTask.java:83) - at com.bigdata.rdf.spo.SPORelation.insert(SPORelation.java:1807) - ... 20 more -Caused by: java.lang.AssertionError: There are 2 outstanding permits (should be just one). - at com.bigdata.io.WriteCacheService.writeChk(WriteCacheService.java:834) - at com.bigdata.rwstore.RWStore.alloc(RWStore.java:854) - at com.bigdata.journal.RWStrategy.write(RWStrategy.java:201) - at com.bigdata.journal.AbstractJournal.write(AbstractJournal.java:2498) - at com.bigdata.btree.AbstractBTree.writeNodeOrLeaf(AbstractBTree.java:3664) - at com.bigdata.btree.AbstractBTree.writeNodeRecursive(AbstractBTree.java:3477) - at com.bigdata.btree.DefaultEvictionListener.evicted(DefaultEvictionListener.java:102) - at com.bigdata.btree.DefaultEvictionListener.evicted(DefaultEvictionListener.java:1) - at com.bigdata.cache.HardReferenceQueue.evict(HardReferenceQueue.java:226) - at com.bigdata.cache.HardReferenceQueue.beforeOffer(HardReferenceQueue.java:199) - at com.bigdata.cache.RingBuffer.add(RingBuffer.java:159) - at com.bigdata.cache.HardReferenceQueue.add(HardReferenceQueue.java:176) - at com.bigdata.btree.AbstractBTree.doTouch(AbstractBTree.java:3365) - at com.bigdata.btree.AbstractBTree.touch(AbstractBTree.java:3331) - at com.bigdata.btree.AbstractNode.<init>(AbstractNode.java:297) - at com.bigdata.btree.AbstractNode.<init>(AbstractNode.java:333) - at com.bigdata.btree.Leaf.<init>(Leaf.java:345) - at com.bigdata.btree.AbstractNode.copyOnWrite(AbstractNode.java:492) - at com.bigdata.btree.AbstractNode.copyOnWrite(AbstractNode.java:417) - at com.bigdata.btree.Leaf.insert(Leaf.java:490) - at com.bigdata.btree.Node.insert(Node.java:900) - at com.bigdata.btree.Node.insert(Node.java:900) - at com.bigdata.btree.Node.insert(Node.java:900) - at com.bigdata.btree.AbstractBTree.insert(AbstractBTree.java:2006) - at com.bigdata.btree.AbstractBTree.insert(AbstractBTree.java:1950) - at com.bigdata.rdf.spo.SPOIndexWriteProc.apply(SPOIndexWriteProc.java:247) - at com.bigdata.btree.UnisolatedReadWriteIndex.submit(UnisolatedReadWriteIndex.java:796) - at com.bigdata.rdf.spo.SPOIndexWriter.call(SPOIndexWriter.java:329) - at com.bigdata.rdf.spo.SPOIndexWriter.call(SPOIndexWriter.java:1) - at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) - at java.util.concurrent.FutureTask.run(FutureTask.java:138) - at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) - at java.lang.Thread.run(Thread.java:619) -ERROR: 37844 com.bigdata.rwstore.RWWriteCacheService1 com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:307): java.nio.channels.ClosedByInterruptException -java.nio.channels.ClosedByInterruptException - at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) - at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:653) - at com.bigdata.io.FileChannelUtility.writeAll(FileChannelUtility.java:402) - at com.bigdata.io.WriteCache$FileChannelScatteredWriteCache.writeOnChannel(WriteCache.java:1313) - at com.bigdata.io.WriteCache.flushAndReset(WriteCache.java:745) - at com.bigdata.io.WriteCache.flush(WriteCache.java:658) - at com.bigdata.io.WriteCache.flush(WriteCache.java:604) - at com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:285) - at com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:1) - at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) - at java.util.concurrent.FutureTask.run(FutureTask.java:138) - at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) - at java.lang.Thread.run(Thread.java:619) -ERROR: 37844 com.bigdata.rwstore.RWWriteCacheService1 com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:307): java.lang.InterruptedException: sleep interrupted -java.lang.InterruptedException: sleep interrupted - at java.lang.Thread.sleep(Native Method) - at com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:271) - at com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:1) - at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) - at java.util.concurrent.FutureTask.run(FutureTask.java:138) - at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) - at java.lang.Thread.run(Thread.java:619) Deleted: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -1,74 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Oct 14, 2006 - */ - -package com.bigdata.journal.ha; - -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -/** - * Runs all tests for all journal implementations. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class TestAll extends TestCase { - - /** - * - */ - public TestAll() { - } - - /** - * @param arg0 - */ - public TestAll(String arg0) { - super(arg0); - } - - /** - * Returns a test that will run each of the implementation specific test - * suites in turn. - */ - public static Test suite() - { - - final TestSuite suite = new TestSuite("journal/HA"); - - // HA test suite for the WORM strategy. - suite.addTest(TestHAWORMStrategy.suite()); - - // @todo HA test suite for the RW strategy. -// suite.addTest(TestHARWStrategy.suite()); - - return suite; - - } - -} Deleted: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWORMStrategy.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWORMStrategy.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWORMStrategy.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -1,378 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Oct 14, 2006 - */ - -package com.bigdata.journal.ha; - -import java.io.File; -import java.io.IOException; -import java.util.Properties; - -import junit.extensions.proxy.ProxyTestSuite; -import junit.framework.Test; - -import com.bigdata.io.DirectBufferPool; -import com.bigdata.io.writecache.WriteCacheService; -import com.bigdata.journal.AbstractInterruptsTestCase; -import com.bigdata.journal.AbstractMRMWTestCase; -import com.bigdata.journal.AbstractMROWTestCase; -import com.bigdata.journal.AbstractRestartSafeTestCase; -import com.bigdata.journal.BufferMode; -import com.bigdata.journal.Journal; -import com.bigdata.journal.Options; -import com.bigdata.journal.TestJournalBasics; -import com.bigdata.journal.WORMStrategy; -import com.bigdata.rawstore.IRawStore; - -/** - * Test suite for highly available {@link WORMStrategy} journals. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class TestHAWORMStrategy extends AbstractHAJournalTestCase { - - public TestHAWORMStrategy() { - super(); - } - - public TestHAWORMStrategy(String name) { - super(name); - } - - public static Test suite() { - - final TestHAWORMStrategy delegate = new TestHAWORMStrategy(); // !!!! THIS CLASS !!!! - - /* - * Use a proxy test suite and specify the delegate. - */ - - final ProxyTestSuite suite = new ProxyTestSuite(delegate, - "WORM HA Journal Test Suite"); - - /* - * List any non-proxied tests (typically bootstrapping tests). - */ - -// /* -// * HA bootstrap test (non-proxied). -// */ -// suite.addTestSuite(TestHAJournalBootstrap.class); - - /* - * Proxied test suites. - */ - - // tests defined by this class. - suite.addTestSuite(TestHAWORMStrategy.class); - - // test suite for the IRawStore api. - suite.addTestSuite(TestRawStore.class); - - // test suite for handling asynchronous close of the file channel. - suite.addTestSuite(TestInterrupts.class); - - // test suite for MROW correctness. - suite.addTestSuite(TestMROW.class); - - // test suite for MRMW correctness. - suite.addTestSuite(TestMRMW.class); - - /* - * Pickup the basic journal test suite. This is a proxied test suite, so - * all the tests will run with the configuration specified in this test - * class and its optional .properties file. - */ - suite.addTest(TestJournalBasics.suite()); - - /* - * Pickup the HA journal test suite. - */ - suite.addTest(TestJournalHA.suite()); - - return suite; - - } - - public Properties getProperties() { - - final Properties properties = super.getProperties(); - - properties.setProperty(Journal.Options.COLLECT_PLATFORM_STATISTICS, - "false"); - - properties.setProperty(Journal.Options.COLLECT_QUEUE_STATISTICS, - "false"); - - properties.setProperty(Journal.Options.HTTPD_PORT, "-1"/* none */); - - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskWORM.toString()); - - properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - - properties.setProperty(Options.DELETE_ON_EXIT, "true"); - - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); - - return properties; - - } - - /** - * Verify normal operation and basic assumptions when creating a new journal - * using {@link BufferMode#DiskWORM}. - * - * @throws IOException - */ - public void test_create_disk01() throws IOException { - - final Properties properties = getProperties(); - - final Journal journal = new Journal(properties); - - try { - - final WORMStrategy bufferStrategy = (WORMStrategy) journal - .getBufferStrategy(); - - assertTrue("isStable", bufferStrategy.isStable()); - assertFalse("isFullyBuffered", bufferStrategy.isFullyBuffered()); - // assertEquals(Options.FILE, properties.getProperty(Options.FILE), - // bufferStrategy.file.toString()); - assertEquals(Options.INITIAL_EXTENT, Long - .parseLong(Options.DEFAULT_INITIAL_EXTENT), bufferStrategy - .getInitialExtent()); - assertEquals(Options.MAXIMUM_EXTENT, - 0L/* soft limit for disk mode */, bufferStrategy - .getMaximumExtent()); - assertNotNull("raf", bufferStrategy.getRandomAccessFile()); - assertEquals(Options.BUFFER_MODE, BufferMode.DiskWORM, bufferStrategy - .getBufferMode()); - - } finally { - - journal.destroy(); - - } - - } - - /** - * Unit test verifies that {@link Options#CREATE} may be used to initialize - * a journal on a newly created empty file. - * - * @throws IOException - */ - public void test_create_emptyFile() throws IOException { - - final File file = File.createTempFile(getName(), Options.JNL); - - final Properties properties = new Properties(); - - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskWORM.toString()); - - properties.setProperty(Options.FILE, file.toString()); - - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); - - final Journal journal = new Journal(properties); - - try { - - assertEquals(file, journal.getFile()); - - } finally { - - journal.destroy(); - - } - - } - - /** - * Test suite integration for {@link AbstractRestartSafeTestCase}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - public static class TestRawStore extends AbstractRestartSafeTestCase { - - public TestRawStore() { - super(); - } - - public TestRawStore(String name) { - super(name); - } - - protected BufferMode getBufferMode() { - - return BufferMode.DiskWORM; - - } - - } - - /** - * Test suite integration for {@link AbstractInterruptsTestCase}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - public static class TestInterrupts extends AbstractInterruptsTestCase { - - public TestInterrupts() { - super(); - } - - public TestInterrupts(String name) { - super(name); - } - - protected IRawStore getStore() { - - final Properties properties = getProperties(); - - properties.setProperty(Options.DELETE_ON_EXIT, "true"); - - properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskWORM - .toString()); - - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); - - return new Journal(properties);//.getBufferStrategy(); - - } - - } - - /** - * Test suite integration for {@link AbstractMROWTestCase}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - public static class TestMROW extends AbstractMROWTestCase { - - public TestMROW() { - super(); - } - - public TestMROW(String name) { - super(name); - } - - protected IRawStore getStore() { - - final Properties properties = getProperties(); - - properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - - properties.setProperty(Options.DELETE_ON_EXIT, "true"); - - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskWORM - .toString()); - - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); - - return new Journal(properties);//.getBufferStrategy(); - - } - - } - - /** - * Test suite integration for {@link AbstractMRMWTestCase}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - public static class TestMRMW extends AbstractMRMWTestCase { - - public TestMRMW() { - super(); - } - - public TestMRMW(String name) { - super(name); - } - - protected IRawStore getStore() { - - final Properties properties = getProperties(); - - properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - - properties.setProperty(Options.DELETE_ON_EXIT, "true"); - - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskWORM - .toString()); - - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); - - /* - * The following two properties are dialed way down in order to - * raise the probability that we will observe the following error - * during this test. - * - * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6371642 - * - * FIXME We should make the MRMW test harder and focus on - * interleaving concurrent extensions of the backing store for both - * WORM and R/W stores. - */ - - // Note: Use a relatively small initial extent. - properties.setProperty(Options.INITIAL_EXTENT, "" - + DirectBufferPool.INSTANCE.getBufferCapacity() * 1); - - // Note: Use a relatively small extension each time. - properties.setProperty(Options.MINIMUM_EXTENSION, - "" + (long) (DirectBufferPool.INSTANCE - .getBufferCapacity() * 1.1)); - - return new Journal(properties);//.getBufferStrategy(); - - } - - } - - /** - * Note: HA requires the use of the write cache. It is the - * {@link WriteCacheService} which provides the write replication mechanism - * for HA. - */ - private static final boolean writeCacheEnabled = true; - -} Deleted: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWritePipeline.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWritePipeline.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWritePipeline.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -1,370 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a cop... [truncated message content] |
From: <tho...@us...> - 2013-12-09 20:53:45
|
Revision: 7629 http://bigdata.svn.sourceforge.net/bigdata/?rev=7629&view=rev Author: thompsonbry Date: 2013-12-09 20:53:38 +0000 (Mon, 09 Dec 2013) Log Message: ----------- Modified to ONLY run in the SIDS mode pending fix by MP. {{{ java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: context bound, but not quads or sids: < TermId(7B), TermId(5U), com.bigdata.rdf.internal.impl.literal.LiteralExtensionIV@25889b2f, TermId(8B) : Explicit > at com.bigdata.rdf.spo.SPORelation.insert(SPORelation.java:2234) at com.bigdata.rdf.store.AbstractTripleStore.addStatements(AbstractTripleStore.java:3931) at com.bigdata.rdf.rio.StatementBuffer.writeSPOs(StatementBuffer.java:1113) at com.bigdata.rdf.rio.StatementBuffer.addStatements(StatementBuffer.java:977) at com.bigdata.rdf.rio.StatementBuffer.incrementalWrite(StatementBuffer.java:800) at com.bigdata.rdf.rio.StatementBuffer.flush(StatementBuffer.java:400) at com.bigdata.rdf.rio.TestStatementBuffer.test_reificationDoneRight_enabled(TestStatementBuffer.java:523) Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: context bound, but not quads or sids: < TermId(7B), TermId(5U), com.bigdata.rdf.internal.impl.literal.LiteralExtensionIV@25889b2f, TermId(8B) : Explicit > at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) at java.util.concurrent.FutureTask.get(FutureTask.java:83) at com.bigdata.rdf.spo.SPORelation.logFuture(SPORelation.java:2260) at com.bigdata.rdf.spo.SPORelation.insert(SPORelation.java:2215) Caused by: java.lang.IllegalArgumentException: context bound, but not quads or sids: < TermId(7B), TermId(5U), com.bigdata.rdf.internal.impl.literal.LiteralExtensionIV@25889b2f, TermId(8B) : Explicit > at com.bigdata.rdf.spo.SPOIndexWriter.call(SPOIndexWriter.java:275) at com.bigdata.rdf.spo.SPOIndexWriter.call(SPOIndexWriter.java:68) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) }}} See #526. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestStatementBuffer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestStatementBuffer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestStatementBuffer.java 2013-12-09 17:47:42 UTC (rev 7628) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestStatementBuffer.java 2013-12-09 20:53:38 UTC (rev 7629) @@ -451,9 +451,15 @@ return; } - if (store.isQuads() || store.isStatementIdentifiers()) { - /* - * Disabled. + if (!store.isStatementIdentifiers()) { + /** + * Disabled. FIXME This should be ON for TRIPLES or QUADS. It + * only works in the SIDS mode right now. The root cause is + * + * <pre> + * Caused by: java.lang.IllegalArgumentException: context bound, but not quads or sids: < TermId(7B), TermId(5U), com.bigdata.rdf.internal.impl.literal.LiteralExtensionIV@25889b2f, TermId(8B) : Explicit > + * at com.bigdata.rdf.spo.SPOIndexWriter.call(SPOIndexWriter.java:275) + * </pre> */ return; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-09 17:47:49
|
Revision: 7628 http://bigdata.svn.sourceforge.net/bigdata/?rev=7628&view=rev Author: thompsonbry Date: 2013-12-09 17:47:42 +0000 (Mon, 09 Dec 2013) Log Message: ----------- sync to martyn Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 17:36:22 UTC (rev 7627) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 17:47:42 UTC (rev 7628) @@ -78,10 +78,22 @@ final ServerSocket ss1 = new ServerSocket(); try { + // bind the ServerSocket to the specified port. ss1.bind(serverAddr1); assertTrue(ss1.getChannel() == null); + /* + * Without a new connect request we should not be able to accept() a + * new connection. + */ + try { + accept(ss1); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + // Now the first Client SocketChannel final SocketChannel cs1 = SocketChannel.open(); try { @@ -112,6 +124,17 @@ cs2.close(); } + /* + * Without a new connect request we should not be able to accept() a + * new connection. + */ + try { + accept(ss1); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + } finally { ss1.close(); @@ -388,7 +411,8 @@ /* * Having closed the input, without a new connect request we - * should not be able to accept the new write. + * should not be able to accept the new write since the data + * were written on a different client connection. */ try { final Socket s3 = accept(ss); @@ -413,55 +437,105 @@ * @throws IOException */ public void testMultipleClients() throws IOException { - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - final ServerSocket ss = new ServerSocket(); - ss.bind(serverAddr); - - assertTrue(ss.getChannel() == null); - + // The payload size that we will use. + final int DATA_LEN = 200; + final Random r = new Random(); + final byte[] data = new byte[DATA_LEN]; + r.nextBytes(data); + final int nclients = 10; - + final ArrayList<SocketChannel> clients = new ArrayList<SocketChannel>(); final ArrayList<Socket> sockets = new ArrayList<Socket>(); - - final Random r = new Random(); - final byte[] data = new byte[200]; - r.nextBytes(data); - assertNoTimeout(10, TimeUnit.SECONDS, new Callable<Void>() { - @Override - public Void call() throws Exception { - for (int c = 0; c < nclients; c++) { - final SocketChannel cs = SocketChannel.open(); - cs.connect(serverAddr); - - clients.add(cs); - sockets.add(ss.accept()); - - // write to each SocketChannel (after connect/accept) - cs.write(ByteBuffer.wrap(data)); - } - return null; - } - - }); - - // Now read from all Sockets accepted on the server - final byte[] dst = new byte[200]; - for (Socket s : sockets) { - assertFalse(s.isClosed()); - - final InputStream instr = s.getInputStream(); + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + + final ServerSocket ss = new ServerSocket(); + try { + + // bind the ServerSocket to the specified port. + ss.bind(serverAddr); - assertFalse(-1 == instr.read(dst)); // doesn't return -1 + assertTrue(ss.getChannel() == null); + + final int receiveBufferSize = ss.getReceiveBufferSize(); + + // Make sure that we have enough room to receive all client writes + // before draining any of them. + assertTrue(DATA_LEN * nclients <= receiveBufferSize); - assertTrue(BytesUtil.bytesEqual(data, dst)); + assertNoTimeout(10, TimeUnit.SECONDS, new Callable<Void>() { + + @Override + public Void call() throws Exception { + + for (int c = 0; c < nclients; c++) { + + // client connects to server. + final SocketChannel cs = SocketChannel.open(); + cs.connect(serverAddr); + clients.add(cs); + + // accept connection on server. + sockets.add(ss.accept()); + + // write to each SocketChannel (after connect/accept) + cs.write(ByteBuffer.wrap(data)); + } + + return null; + + } + + }); + + /* + * Now read from all Sockets accepted on the server. + * + * Note: This is a simple loop, not a parallel read. The same buffer + * is reused on each iteration. + */ + { + + final byte[] dst = new byte[DATA_LEN]; + + for (Socket s : sockets) { + + assertFalse(s.isClosed()); + + final InputStream instr = s.getInputStream(); + + assertFalse(-1 == instr.read(dst)); // doesn't return -1 + + assertTrue(BytesUtil.bytesEqual(data, dst)); + + // Close each Socket to ensure it is different + s.close(); + + assertTrue(s.isClosed()); + + } - // Close each Socket to ensure it is different - s.close(); + } + + } finally { + + // ensure client side connections are closed. + for (SocketChannel ch : clients) { + if (ch != null) + ch.close(); + } - assertTrue(s.isClosed()); + // ensure server side connections are closed. + for (Socket s : sockets) { + if (s != null) + s.close(); + } + + // close the server socket. + ss.close(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-09 17:36:29
|
Revision: 7627 http://bigdata.svn.sourceforge.net/bigdata/?rev=7627&view=rev Author: thompsonbry Date: 2013-12-09 17:36:22 +0000 (Mon, 09 Dec 2013) Log Message: ----------- Sync to martyn on low-level socket behavior test suite. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 16:21:57 UTC (rev 7626) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 17:36:22 UTC (rev 7627) @@ -46,6 +46,12 @@ import com.bigdata.btree.BytesUtil; import com.bigdata.io.TestCase3; +/** + * Test suite for basic socket behaviors. + * + * @author <a href="mailto:mar...@us...">Martyn Cutcher</a> + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ public class TestSocketsDirect extends TestCase3 { public TestSocketsDirect() { @@ -56,6 +62,65 @@ } /** + * Simple test of connecting to a server socket and the failure to connect + * to a port not associated with a server socket. + * + * @throws IOException + */ + public void testDirectSockets_exceptionIfPortNotOpen() throws IOException { + + // Get two socket addressses. We will open a service on one and try to + // connect to the unused one on the other port. + final InetSocketAddress serverAddr1 = new InetSocketAddress(getPort(0)); + final InetSocketAddress serverAddr2 = new InetSocketAddress(getPort(0)); + + // First our ServerSocket + final ServerSocket ss1 = new ServerSocket(); + try { + + ss1.bind(serverAddr1); + + assertTrue(ss1.getChannel() == null); + + // Now the first Client SocketChannel + final SocketChannel cs1 = SocketChannel.open(); + try { + /* + * Note: true if connection made. false if connection in + * progress. + */ + final boolean immediate1 = cs1.connect(serverAddr1); + if (!immediate1) { + // Did not connect immediately, so finish connect now. + if (!cs1.finishConnect()) { + fail("Did not connect."); + } + } + } finally { + cs1.close(); + } + + // Now the first Client SocketChannel + final SocketChannel cs2 = SocketChannel.open(); + try { + cs1.connect(serverAddr2); + fail("Expecting " + IOException.class); + } catch (IOException ex) { + if(log.isInfoEnabled()) + log.info("Ignoring expected exception: "+ex); + } finally { + cs2.close(); + } + + } finally { + + ss1.close(); + + } + + } + + /** * The use of threaded tasks in the send/receive service makes it difficult to * observer the socket state changes. * @@ -65,127 +130,279 @@ * ...with an accept followed by a read() of -1 on the returned Socket stream. * * @throws IOException + * @throws InterruptedException */ - public void testDirectSockets() throws IOException { + public void testDirectSockets() throws IOException, InterruptedException { - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + // The payload size that we will use. + final int DATA_LEN = 200; - // First our ServerSocket - final ServerSocket ss = new ServerSocket(); - ss.bind(serverAddr); - - assertTrue(ss.getChannel() == null); - - // Now the first Client SocketChannel - final SocketChannel cs1 = SocketChannel.open(); - - final boolean immediate1 = cs1.connect(serverAddr); - assertTrue("Expected immediate local connection", immediate1); - final Random r = new Random(); - final byte[] data = new byte[200]; + final byte[] data = new byte[DATA_LEN]; r.nextBytes(data); + final byte[] dst = new byte[DATA_LEN]; - final ByteBuffer src = ByteBuffer.wrap(data); + // The server side receive buffer size (once we open the server socket). + int receiveBufferSize = -1; - // Write some data - cs1.write(src); - - final byte[] dst = new byte[200]; + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - // Accept the client connection (after connect and write) - final Socket readSckt1 = accept(ss); - - InputStream instr = readSckt1.getInputStream(); - - // and read the data - instr.read(dst); - - // confirming the read is correct - assertTrue(BytesUtil.bytesEqual(data, dst)); - - assertTrue(ss.getChannel() == null); + // First our ServerSocket + final ServerSocket ss = new ServerSocket(); + try { - // now write some more data into the channel and then close it - cs1.write(ByteBuffer.wrap(data)); - - // close the client socket - cs1.close(); - - assertTrue(readSckt1.isConnected()); - assertFalse(readSckt1.isClosed()); - - // Now try writing some more data - try { - cs1.write(ByteBuffer.wrap(data)); - fail("Expected closed channel exception"); - } catch (ClosedChannelException e) { - // expected - } - - // the old stream should be closed - try { - final int rdlen = instr.read(dst); // should be closed - assertTrue(rdlen == 200); - assertTrue(BytesUtil.bytesEqual(data, dst)); + assertTrue(ss.getChannel() == null); + + // bind the server socket to the port. + ss.bind(serverAddr); + + assertTrue(ss.getChannel() == null); - assertTrue(instr.read(dst) == -1); // read EOF - } catch (Exception e) { - fail("not expected"); - } - - // if so then should we explicitly close its socket? - readSckt1.close(); - assertTrue(readSckt1.isClosed()); - - assertFalse(ss.isClosed()); - assertTrue(ss.getChannel() == null); - - // Now open a new client Socket and connect to the server - - final SocketChannel cs2 = SocketChannel.open(); - final boolean immediate2 = cs2.connect(serverAddr); - - assertTrue("Expected immediate local connection", immediate2); - - // Now we should be able to accept the new connection - final Socket s2 = accept(ss); - - // ... write to the SocketChannel - final int wlen = cs2.write(ByteBuffer.wrap(data)); - - assertTrue(wlen == data.length); + // figure out the receive buffer size on the server socket. + receiveBufferSize = ss.getReceiveBufferSize(); - // failing to read from original stream - final int nrlen = instr.read(dst); - assertTrue(nrlen == -1); + if (log.isInfoEnabled()) + log.info("receiveBufferSize=" + receiveBufferSize + + ", payloadSize=" + DATA_LEN); + + if (receiveBufferSize < DATA_LEN) { - // but succeeding to read from the new Socket - final InputStream instr2 = s2.getInputStream(); - instr2.read(dst); - - assertTrue(BytesUtil.bytesEqual(data, dst)); - - // Can a downstream close be detected upstream? - instr2.close(); - - assertTrue(cs2.isOpen()); // Not after closing input stream - - s2.close(); - - assertTrue(cs2.isOpen()); // Nor after closing the socket - - // now write some more to the socket - final int wlen2 = cs2.write(ByteBuffer.wrap(data)); - assertTrue(wlen2 == data.length); - - // having closed the input, without a new connect request - // we should not be able to accept the new write - try { - final Socket s3 = accept(ss); - fail("Expected timeout failure"); - } catch (AssertionFailedError afe) { - // expected + fail("Service socket receive buffer is smaller than test payload size: receiveBufferSize=" + + receiveBufferSize + ", payloadSize=" + DATA_LEN); + + } + + /* + * InputStream for server side of socket connection - set below and + * then reused outside of the try/finally block. + */ + InputStream instr = null; + + // Now the first Client SocketChannel + final SocketChannel cs1 = SocketChannel.open(); + try { + + /* + * Note: true if connection made. false if connection in + * progress. + */ + final boolean immediate1 = cs1.connect(serverAddr); + if (!immediate1) { + if (!cs1.finishConnect()) { + fail("Did not connect?"); + } + } + + assertTrue(ss.getChannel() == null); + + /* + * We are connected. + */ + + final ByteBuffer src = ByteBuffer.wrap(data); + + // Write some data on the client socket. + cs1.write(src); + + /* + * Accept client's connection on server (after connect and + * write). + */ + final Socket readSckt1 = accept(ss); + + // Stream to read the data from the socket on the server side. + instr = readSckt1.getInputStream(); + + // and read the data + instr.read(dst); + + // confirming the read is correct + assertTrue(BytesUtil.bytesEqual(data, dst)); + + assertTrue(ss.getChannel() == null); + + /* + * Attempting to read more returns ZERO because there is nothing + * in the buffer and the connection is still open on the client + * side. + * + * Note: instr.read(buf) will BLOCK until the data is available, + * the EOF is detected, or an exception is thrown. + */ + assertEquals(0,instr.available()); +// assertEquals(0, instr.read(dst)); + + /* + * Now write some more data into the channel and *then* close + * it. + */ + cs1.write(ByteBuffer.wrap(data)); + + // close the client side of the socket + cs1.close(); + + // The server side of client connection is still open. + assertTrue(readSckt1.isConnected()); + assertFalse(readSckt1.isClosed()); + + /* + * Now try writing some more data. This should be disallowed + * since we closed the client side of the socket. + */ + try { + cs1.write(ByteBuffer.wrap(data)); + fail("Expected closed channel exception"); + } catch (ClosedChannelException e) { + // expected + } + + /* + * Since we closed the client side of the socket, when we try to + * read more data on the server side of the connection. The data + * that we already buffered is still available on the server + * side of the socket. + */ + { + // the already buffered data should be available. + final int rdlen = instr.read(dst); + assertEquals(DATA_LEN, rdlen); + assertTrue(BytesUtil.bytesEqual(data, dst)); + } + + /* + * We have drained the buffered data. There is no more buffered + * data and client side is closed, so an attempt to read more + * data on the server side of the socket will return EOF (-1). + */ + assertEquals(-1, instr.read(dst)); // read EOF + + // if so then should we explicitly close its socket? + readSckt1.close(); + assertTrue(readSckt1.isClosed()); + + assertFalse(ss.isClosed()); + assertTrue(ss.getChannel() == null); + + } finally { + cs1.close(); + } + + /* + * Now open a new client Socket and connect to the server. + */ + final SocketChannel cs2 = SocketChannel.open(); + try { + + // connect to the server socket again. + final boolean immediate2 = cs2.connect(serverAddr); + if (!immediate2) { + if (!cs2.finishConnect()) { + fail("Did not connect?"); + } + } + + // Now server should accept the new client connection + final Socket s2 = accept(ss); + + // Client writes to the SocketChannel + final int wlen = cs2.write(ByteBuffer.wrap(data)); + assertEquals(DATA_LEN, wlen); // verify data written. + + // failing to read from original stream + final int nrlen = instr.read(dst); + assertEquals(-1, nrlen); + + // but succeeding to read from the new Socket + final InputStream instr2 = s2.getInputStream(); + instr2.read(dst); + assertTrue(BytesUtil.bytesEqual(data, dst)); + + /* + * Question: Can a downstream close be detected upstream? + * + * Answer: No. Closing the server socket does not tell the + * client that the socket was closed. + */ + { + // close server side input stream. + instr2.close(); + + // but the client still thinks its connected. + assertTrue(cs2.isOpen()); + + // Does the client believe it is still open after a brief + // sleep? + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + + // close server stocket. + s2.close(); + + // client still thinks it is connected after closing server + // socket. + assertTrue(cs2.isOpen()); + + // Does the client believe it is still open after a brief + // sleep? + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + + } + + /* + * Now write some more to the socket. We have closed the + * accepted connection on the server socket. Our observations + * show that the 1st write succeeds. The second write then fails + * with 'IOException: "Broken pipe"' + * + * The server socket is large (256k). We are not filling it up, + * but the 2nd write always fails. Further, the client never + * believes that the connection is closed until the 2nd write, + */ + { + final int writeSize = 1; + int nwritesOk = 0; + long nbytesReceived = 0L; + while (true) { + try { + // write a payload. + final int wlen2 = cs2.write(ByteBuffer.wrap(data, + 0, writeSize)); + // if write succeeds, should have written all bytes. + assertEquals(writeSize, wlen2); + nwritesOk++; + nbytesReceived += wlen2; + // does the client think the connection is still open? + assertTrue(cs2.isOpen()); // yes. + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + } catch (IOException ex) { + if (log.isInfoEnabled()) + log.info("Expected exception: nwritesOk=" + + nwritesOk + ", nbytesReceived=" + + nbytesReceived + ", ex=" + ex); + break; + } + } + } + + /* + * Having closed the input, without a new connect request we + * should not be able to accept the new write. + */ + try { + final Socket s3 = accept(ss); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + + } finally { + cs2.close(); + } + + } finally { + ss.close(); } } @@ -249,19 +466,22 @@ } - // wrap the ServerSocket accept with a timeout check - Socket accept(final ServerSocket ss) { + /** wrap the ServerSocket accept with a timeout check. */ + private Socket accept(final ServerSocket ss) { + final AtomicReference<Socket> av = new AtomicReference<Socket>(); + assertNoTimeout(1, TimeUnit.SECONDS, new Callable<Void>() { @Override public Void call() throws Exception { - + av.set(ss.accept()); - + return null; - }}); - + } + }); + return av.get(); } @@ -283,7 +503,21 @@ } } - private void assertNoTimeout(long timeout, TimeUnit unit, Callable<Void> callable) { + /** + * Throws {@link AssertionFailedError} if the {@link Callable} does not + * succeed within the timeout. + * + * @param timeout + * @param unit + * @param callable + * + * @throws AssertionFailedError + * if the {@link Callable} does not succeed within the timeout. + * @throws AssertionFailedError + * if the {@link Callable} fails. + */ + private void assertNoTimeout(final long timeout, final TimeUnit unit, + final Callable<Void> callable) { final ExecutorService es = Executors.newSingleThreadExecutor(); try { final Future<Void> ret = es.submit(callable); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |