This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <tho...@us...> - 2013-10-30 19:19:49
|
Revision: 7501 http://bigdata.svn.sourceforge.net/bigdata/?rev=7501&view=rev Author: thompsonbry Date: 2013-10-30 19:19:39 +0000 (Wed, 30 Oct 2013) Log Message: ----------- I have reviewed and revised the following aspects of the commit and 2-phase commit protocol: - Journal: cancellation of the Future in the GATHER consensus protocol is no longer logged as an ERROR. This is just a data race. - HAJournalServer: enterErrorState() is now written to be defensive. Nothing will be thrown out, even if the HAQuorumService is not running (quorum.terminate())). - HAJournalServer.HAQuorumServer.start()/terminate() now use an AtomicBoolean to guard those methods and to make enterErrorState() a NOP if the HAQuorumService is currently in either of those methods. - HAJournalServer.logRootBlock() no longer has the isJoinedService boolean. This method is only called in the 2-phase commit logic and the 2-phase commit is only executed for services that were joined with the met quorum as of the atomic decision point in commitNow(). - commitNow() - refactored to use a CommitState object and pushed down methods for the different things that are being done into that object. The error handling for the two phase commit logic was simplified to increase understandability. - The Prepare2Phase and Commit2Phase tasks were simplified. The core code was pushed down into private inner methods. This makes it easier to analyze the error handling code paths. - TestDumpJournal: added coverage of some more conditions looking to replicate an error observed on the HA3 cluster. I was not able to replicate the problem. It may have been related to allocator recycling or abnormal failure mode on bigdata17 as per this ticket: {{{ ERROR: 2885231 2013-10-28 16:05:28,194 qtp230584058-49 com.bigdata.rdf.sail.webapp.StatusServlet.doGet(StatusServlet.java:863): java.lang.RuntimeException: java.lang.ClassCastException: com.bigdata.btree.BTree cannot be cast to com.bigdata.journal.Name2Addr java.lang.RuntimeException: java.lang.ClassCastException: com.bigdata.btree.BTree cannot be cast to com.bigdata.journal.Name2Addr at com.bigdata.journal.DumpJournal.dumpNamedIndicesMetadata(DumpJournal.java:726) at com.bigdata.journal.DumpJournal.dumpJournal(DumpJournal.java:599) at com.bigdata.rdf.sail.webapp.StatusServlet.doGet(StatusServlet.java:485) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:534) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:475) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:929) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:403) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:864) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:117) at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:47) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:114) at org.eclipse.jetty.server.Server.handle(Server.java:352) at org.eclipse.jetty.server.HttpConnection.handleRequest(HttpConnection.java:596) at org.eclipse.jetty.server.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:1051) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:590) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:212) at org.eclipse.jetty.server.HttpConnection.handle(HttpConnection.java:426) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:508) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.access$000(SelectChannelEndPoint.java:34) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:40) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:451) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.ClassCastException: com.bigdata.btree.BTree cannot be cast to com.bigdata.journal.Name2Addr at com.bigdata.journal.AbstractJournal.getIndexWithCommitRecord(AbstractJournal.java:4751) at com.bigdata.journal.DumpJournal.dumpNamedIndicesMetadata(DumpJournal.java:706) ... 23 more }}} TODO: - Review abort2Phase(). For some invocation contexts, this should be restricted to services that were joined with the met quorum and that voted YES for the PREPARE message. - QuorumCommitImpl: I have reviewed methods, including the cancellation of remote futures and the error paths. modified to use the local executor service to submit RMI requests in parallel and simplified the code paths for interrupt and error handling for RMI requests or for failures to locate the proxy for a service. However, the new code causes some problems with the HA CI test suite and is being withheld while I investigate those issues in more depth. I have run threw the HA CI test suite, RWStore test suite, WORM test suite, and TestBigdataSailWithQuads. All is green. See #760 Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/Journal.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-10-30 19:11:05 UTC (rev 7500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-10-30 19:19:39 UTC (rev 7501) @@ -160,19 +160,21 @@ * onto the {@link HALogWriter}. * <p> * Note: This method is ONLY invoked as part of the 2-phase commit protocol. - * Therefore, it ONLY applies to the live HALog file. A service is + * Therefore, this method ONLY applies to the live HALog file. A service is * atomically either joined with the met quorum at a 2-phase commit point or - * not joined. This information is passed through from the 2-phase prepare - * in the <i>isJoinedService</i> argument. + * not joined. The PREPARE and COMMIT messages are ONLY generated for + * services that were joined with the met quorum as of that atomic decision + * point in the commit protocol. Therefore, this method is never called for + * a service that was not joined as of that atomic decision point. * - * @param isJoinedService - * <code>true</code> iff the service was joined with the met - * quorum at the atomic decision point in the 2-phase commit - * protocol. * @param rootBlock * The root block for the commit point that was just achieved. */ - void logRootBlock(final boolean isJoinedService, +// * @param isJoinedService +// * <code>true</code> iff the service was joined with the met +// * quorum at the atomic decision point in the 2-phase commit +// * protocol. + void logRootBlock(//final boolean isJoinedService, final IRootBlockView rootBlock) throws IOException; /** Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-10-30 19:11:05 UTC (rev 7500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-10-30 19:19:39 UTC (rev 7501) @@ -145,10 +145,10 @@ } @Override - public void logRootBlock(final boolean isJoinedService, + public void logRootBlock(//final boolean isJoinedService, final IRootBlockView rootBlock) throws IOException { - QuorumServiceBase.this.logRootBlock(isJoinedService, rootBlock); + QuorumServiceBase.this.logRootBlock(/*isJoinedService,*/ rootBlock); } @@ -294,7 +294,7 @@ * Note: The default implementation is a NOP. */ @Override - public void logRootBlock(final boolean isJoinedService, + public void logRootBlock(//final boolean isJoinedService, final IRootBlockView rootBlock) throws IOException { // NOP Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-30 19:11:05 UTC (rev 7500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-30 19:19:39 UTC (rev 7501) @@ -144,6 +144,7 @@ import com.bigdata.io.IDataRecord; import com.bigdata.io.IDataRecordAccess; import com.bigdata.io.SerializerUtil; +import com.bigdata.io.writecache.WriteCacheService; import com.bigdata.journal.Name2Addr.Entry; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.mdi.JournalMetadata; @@ -164,6 +165,7 @@ import com.bigdata.rwstore.IAllocationManager; import com.bigdata.rwstore.IHistoryManager; import com.bigdata.rwstore.IRWStrategy; +import com.bigdata.rwstore.RWStore; import com.bigdata.rwstore.sector.MemStrategy; import com.bigdata.rwstore.sector.MemoryManager; import com.bigdata.service.AbstractHATransactionService; @@ -2765,23 +2767,25 @@ */ _bufferStrategy.abort(); - /* - * Discard hard references to any indices. The Name2Addr reference - * will also be discarded below. This should be sufficient to ensure - * that any index requested by the methods on the AbstractJournal - * will be re-read from disk using the commit record which we - * re-load below. This is necessary in order to discard any - * checkpoints that may have been written on indices since the last - * commit. - * - * FIXME Verify this is not required. Historical index references - * should not be discarded on abort as they remain valid. Discarding - * them admits the possibility of a non-canonicalizing cache for the - * historical indices since an existing historical index reference - * will continue to be held but a new copy of the index will be - * loaded on the next request if we clear the cache here. - */ + /* + * The Name2Addr reference will be discarded below. This should be + * sufficient to ensure that any index requested by the methods on + * the AbstractJournal will be re-read from disk using the commit + * record which we re-load below. This is necessary in order to + * discard any checkpoints that may have been written on indices + * since the last commit (dirty indices that have not been + * checkpointed to the disk are discarded when we discard + * Name2Addr). + * + * Note: Historical index references should NOT be discarded on + * abort as they remain valid. Discarding them admits the + * possibility of a non-canonicalizing cache for the historical + * indices since an existing historical index reference will + * continue to be held but a new copy of the index will be loaded on + * the next request if we clear the cache here. + */ // historicalIndexCache.clear(); + // discard the commit record and re-read from the store. _commitRecord = _getCommitRecord(); @@ -3030,165 +3034,192 @@ } - /** - * An atomic commit is performed by directing each registered - * {@link ICommitter} to flush its state onto the store using - * {@link ICommitter#handleCommit(long)}. The address returned by that - * method is the address from which the {@link ICommitter} may be reloaded - * (and its previous address if its state has not changed). That address is - * saved in the {@link ICommitRecord} under the index for which that - * committer was {@link #registerCommitter(int, ICommitter) registered}. We - * then force the data to stable store, update the root block, and force the - * root block and the file metadata to stable store. - * <p> - * Note: Each invocation of this method MUST use a distinct - * <i>commitTime</i> and the commitTimes MUST be monotonically increasing. - * These guarantees support both the database version history mechanisms and - * the High Availability mechanisms. - * - * @param commitTime - * The commit time either of a transaction or of an unisolated - * commit. Note that when mixing isolated and unisolated commits - * you MUST use the same {@link ITimestampService} for both - * purposes. - * - * @return The timestamp assigned to the commit record -or- 0L if there were - * no data to commit. - */ - // Note: Overridden by StoreManager (DataService). - protected long commitNow(final long commitTime) { - - final WriteLock lock = _fieldReadWriteLock.writeLock(); + /** + * Class to which we attach all of the little pieces of state during + * {@link AbstractJournal#commitNow(long)}. + * <p> + * The non-final fields in this class are laid directly below the method + * which set those fields. The methods in the class are laid out in the + * top-to-bottom order in which they are executed by commitNow(). + */ + static private class CommitState { + + /** + * The timestamp at which the commit began. + */ + private final long beginNanos; - lock.lock(); + /** + * The backing store. + */ + private final AbstractJournal store; - try { - - assertOpen(); + /** + * The backing {@link IBufferStrategy} for the {@link #store}. + */ + private final IBufferStrategy _bufferStrategy; + + /** + * The quorum iff HA and <code>null</code> otherwise. + */ + private final Quorum<HAGlue, QuorumService<HAGlue>> quorum; - final long beginNanos = System.nanoTime(); + /** + * Local HA service implementation (non-Remote) and <code>null</code> if + * not in an HA mode.. + */ + private final QuorumService<HAGlue> quorumService; + + /** + * The commit time either of a transaction or of an unisolated commit. + * Note that when mixing isolated and unisolated commits you MUST use + * the same {@link ITimestampService} for both purposes. + */ + private final long commitTime; + + /** + * The current root block on the journal as of the start of the commit + * protocol. + */ + private final IRootBlockView old; - // #of bytes on the journal as of the previous commit point. - final long byteCountBefore = _rootBlock.getNextOffset(); + /** + * The quorum token associated with this commit point. + */ + private final long commitToken; + + /** The #of bytes on the journal as of the previous commit point. */ + private final long byteCountBefore; + + /** + * The commit counter that will be assigned to the new commit point. + */ + private final long newCommitCounter; + + /** + * + * @param store + * The backing store. + * @param commitTime + * The commit time either of a transaction or of an + * unisolated commit. Note that when mixing isolated and + * unisolated commits you MUST use the same + * {@link ITimestampService} for both purposes. + */ + public CommitState(final AbstractJournal store, final long commitTime) { - if (log.isInfoEnabled()) - log.info("commitTime=" + commitTime); + if (store == null) + throw new IllegalArgumentException(); + + this.beginNanos = System.nanoTime(); - assertCommitTimeAdvances(commitTime); + this.store = store; - final IRootBlockView old = _rootBlock; + this.commitTime = commitTime; - final long newCommitCounter = old.getCommitCounter() + 1; + this._bufferStrategy = store._bufferStrategy; - /* - * First, run each of the committers accumulating the updated root - * addresses in an array. In general, these are btrees and they may - * have dirty nodes or leaves that needs to be evicted onto the - * store. The first time through, any newly created btrees will have - * dirty empty roots (the btree code does not optimize away an empty - * root at this time). However, subsequent commits without - * intervening data written on the store should not cause any - * committers to update their root address. + // Note: null if not HA. + this.quorum = store.quorum; + + /* + * Local HA service implementation (non-Remote). * - * Note: This also checkpoints the deferred free block list. - */ - final long[] rootAddrs = notifyCommitters(commitTime); + * Note: getClient() throws IllegalStateException if quorum exists + * and is not not running. + */ + this.quorumService = quorum == null ? null : quorum.getClient(); - /* - * See if anything has been written on the store since the last - * commit. - */ - if (!_bufferStrategy.requiresCommit(_rootBlock)) { + this.old = store._rootBlock; - /* - * No data was written onto the store so the commit can not - * achieve any useful purpose. - */ + // #of bytes on the journal as of the previous commit point. + this.byteCountBefore = store._rootBlock.getNextOffset(); - if (log.isInfoEnabled()) - log.info("Nothing to commit"); + this.newCommitCounter = old.getCommitCounter() + 1; - return 0L; - } - + this.commitToken = store.quorumToken; + + store.assertCommitTimeAdvances(commitTime); + + } + + /** + * Notify {@link ICommitter}s to flush out application data. This sets + * the {@link #rootAddrs} for the {@link ICommitRecord}. + * + * @return <code>true</code> if the store is dirty and the commit should + * proceed and <code>false</code> otherwise. + */ + private boolean notifyCommitters() { + /* - * Explicitly call the RootBlockCommitter + * First, run each of the committers accumulating the updated root + * addresses in an array. In general, these are btrees and they may + * have dirty nodes or leaves that needs to be evicted onto the + * store. The first time through, any newly created btrees will have + * dirty empty roots (the btree code does not optimize away an empty + * root at this time). However, subsequent commits without + * intervening data written on the store should not cause any + * committers to update their root address. + * + * Note: This also checkpoints the deferred free block list. */ - rootAddrs[PREV_ROOTBLOCK] = this.m_rootBlockCommitter - .handleCommit(commitTime); + rootAddrs = store.notifyCommitters(commitTime); - // Local HA service implementation (non-Remote). - final QuorumService<HAGlue> quorumService = quorum == null ? null - : quorum.getClient(); + /* + * See if anything has been written on the store since the last + * commit. + */ + if (!_bufferStrategy.requiresCommit(store._rootBlock)) { - final IJoinedAndNonJoinedServices gatherJoinedAndNonJoinedServices; - final IHANotifyReleaseTimeResponse consensusReleaseTime; - if ((_bufferStrategy instanceof IHABufferStrategy) - && quorum != null && quorum.isHighlyAvailable()) { - - /** - * CRITICAL SECTION. We need obtain a distributed consensus for - * the services joined with the met quorum concerning the - * earliest commit point that is pinned by the combination of - * the active transactions and the minReleaseAge on the TXS. New - * transaction starts during this critical section will block - * (on the leader or the folllower) unless they are guaranteed - * to be allowable, e.g., based on the current minReleaseAge, - * the new tx would read from the most recent commit point, the - * new tx would ready from a commit point that is already pinned - * by an active transaction on that node, etc. + /* + * Will not do commit. * - * Note: Lock makes this section MUTEX with awaitServiceJoin(). - * - * @see <a href= - * "https://docs.google.com/document/d/14FO2yJFv_7uc5N0tvYboU-H6XbLEFpvu-G8RhAzvxrk/edit?pli=1#" - * > HA TXS Design Document </a> - * - * @see <a - * href="https://sourceforge.net/apps/trac/bigdata/ticket/623" - * > HA TXS / TXS Bottleneck </a> + * Note: No data was written onto the store so the commit can + * not achieve any useful purpose. */ - _gatherLock.lock(); + return false; - try { - - // Atomic decision point for GATHER re joined services. - gatherJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices( - quorum); - - // Run the GATHER protocol. - consensusReleaseTime = ((AbstractHATransactionService) getLocalTransactionManager() - .getTransactionService()) - .updateReleaseTimeConsensus(newCommitCounter, - commitTime, - gatherJoinedAndNonJoinedServices.getJoinedServiceIds(), - getHAReleaseTimeConsensusTimeout(), - TimeUnit.MILLISECONDS); + } + + /* + * Explicitly call the RootBlockCommitter + * + * Note: This logs the current root block and set the address of + * that root block in the as a root address in the commitRecord. + * This is of potential use solely in disaster recovery scenarios + * where your root blocks are toast, but good root blocks can be + * found elsewhere in the file. Once you find a root block, you can + * get the commitRecordIndex and then find earlier root blocks using + * that root addr. Or you can just scan the file looking for valid + * root blocks and then use the most recent one that you can find. + */ + rootAddrs[PREV_ROOTBLOCK] = store.m_rootBlockCommitter + .handleCommit(commitTime); - } catch (Exception ex) { + // Will do commit. + return true; - log.error(ex, ex); - - // Wrap and rethrow. - throw new RuntimeException(ex); - - } finally { + } - _gatherLock.unlock(); - - } - - } else { - - /* - * Not HA. Did not do GATHER. - */ - - gatherJoinedAndNonJoinedServices = null; - consensusReleaseTime = null; - - } // if (HA) do GATHER + /** + * The new root addresses for the {@link ICommitRecord}. + * + * @see #notifyCommitters() + */ + private long[] rootAddrs; + + /** + * Write out the {@link ICommitRecord}, noting the + * {@link #commitRecordAddr}, add the {@link ICommitRecord} to the + * {@link CommitRecordIndex}. Finally, checkpoint the + * {@link CommitRecordIndex} setting the {@link #commitRecordIndexAddr}. + * <p> + * Note: This is also responsible for recycling the deferred frees for + * {@link IHistoryManager} backends. + */ + private void writeCommitRecord() { /* * Before flushing the commitRecordIndex we need to check for @@ -3202,23 +3233,15 @@ */ if (_bufferStrategy instanceof IHistoryManager) { - ((IHistoryManager) _bufferStrategy).checkDeferredFrees(this); + ((IHistoryManager) _bufferStrategy) + .checkDeferredFrees(store); } - /* - * Write the commit record onto the store. - * - * @todo Modify to log the current root block and set the address of - * that root block in the commitRecord. This will be of use solely - * in disaster recovery scenarios where your root blocks are toast, - * but good root blocks can be found elsewhere in the file. - */ - final ICommitRecord commitRecord = new CommitRecord(commitTime, newCommitCounter, rootAddrs); - final long commitRecordAddr = write(ByteBuffer + this.commitRecordAddr = store.write(ByteBuffer .wrap(CommitRecordSerializer.INSTANCE .serialize(commitRecord))); @@ -3226,8 +3249,8 @@ * Add the commit record to an index so that we can recover * historical states efficiently. */ - _commitRecordIndex.add(commitRecordAddr, commitRecord); - + store._commitRecordIndex.add(commitRecordAddr, commitRecord); + /* * Flush the commit record index to the store and stash the address * of its metadata record in the root block. @@ -3239,22 +3262,484 @@ * CommitRecordIndex before we can flush the CommitRecordIndex to * the store. */ - final long commitRecordIndexAddr = _commitRecordIndex + commitRecordIndexAddr = store._commitRecordIndex .writeCheckpoint(); - final long commitToken = quorumToken; + } + + /** + * The address of the {@link ICommitRecord}. + * + * @see #writeCommitRecord() + */ + private long commitRecordAddr; + + /** + * The address of the {@link CommitRecordIndex} once it has been + * checkpointed against the backing store. + * <p> + * Note: The address of the root of the {@link CommitRecordIndex} needs + * to go right into the {@link IRootBlockView}. We are unable to place + * it into the {@link ICommitRecord} since we need to serialize the + * {@link ICommitRecord}, get its address, and add the entry to the + * {@link CommitRecordIndex} before we can flush the + * {@link CommitRecordIndex} to the store. + * + * @see #writeCommitRecord() + */ + private long commitRecordIndexAddr; + + /** + * Call commit on {@link IBufferStrategy} prior to creating the new + * {@link IRootBlockView}. This will flush the {@link WriteCacheService} + * . For HA, that ensures that the write set has been replicated to the + * followers. + * <p> + * Note: required for {@link RWStore} since the metaBits allocations are + * not made until commit, leading to invalid addresses for recent store + * allocations. + * <p> + * Note: After this, we do not write anything on the backing store other + * than the root block. The rest of this code is dedicated to creating a + * properly formed root block. For a non-HA deployment, we just lay down + * the root block. For an HA deployment, we do a 2-phase commit. + * <p> + * Note: In HA, the followers lay down the replicated writes + * synchronously. Thus, they are guaranteed to be on local storage by + * the time the leader finishes WriteCacheService.flush(). This does not + * create much latency because the WriteCacheService drains the + * dirtyList in a seperate thread. + */ + private void flushWriteSet() { + + _bufferStrategy.commit(); + + } + + /** + * Create the new root block. + */ + private void newRootBlock() { + + /* + * The next offset at which user data would be written. Calculated, + * after commit! + */ + final long nextOffset = _bufferStrategy.getNextOffset(); + + final long blockSequence; + if (_bufferStrategy instanceof IHABufferStrategy) { + + // always available for HA. + blockSequence = ((IHABufferStrategy) _bufferStrategy) + .getBlockSequence(); + + } else { + + blockSequence = old.getBlockSequence(); + + } + + /* + * Update the firstCommitTime the first time a transaction commits + * and the lastCommitTime each time a transaction commits (these are + * commit timestamps of isolated or unisolated transactions). + */ + + final long firstCommitTime = (old.getFirstCommitTime() == 0L ? commitTime + : old.getFirstCommitTime()); + + final long priorCommitTime = old.getLastCommitTime(); + + if (priorCommitTime != 0L) { + + /* + * This is a local sanity check to make sure that the commit + * timestamps are strictly increasing. An error will be reported + * if the commit time for the current (un)isolated transaction + * is not strictly greater than the last commit time on the + * store as read back from the current root block. + */ + + assertPriorCommitTimeAdvances(commitTime, priorCommitTime); + + } + + final long lastCommitTime = commitTime; + final long metaStartAddr = _bufferStrategy.getMetaStartAddr(); + final long metaBitsAddr = _bufferStrategy.getMetaBitsAddr(); + + // Create the new root block. + newRootBlock = new RootBlockView(!old.isRootBlock0(), + old.getOffsetBits(), nextOffset, firstCommitTime, + lastCommitTime, newCommitCounter, commitRecordAddr, + commitRecordIndexAddr, + old.getUUID(), // + blockSequence, + commitToken,// + metaStartAddr, metaBitsAddr, old.getStoreType(), + old.getCreateTime(), old.getCloseTime(), old.getVersion(), + store.checker); + + } + + /** + * The new {@link IRootBlockView}. + * + * @see #newRootBlock() + */ + private IRootBlockView newRootBlock; + + + /** + * Run the GATHER consensus protocol (iff HA). + */ + private void gatherPhase() { + + /* + * If not HA, do not do GATHER. + */ + + if (!(_bufferStrategy instanceof IHABufferStrategy)) + return; + + if (quorum == null) + return; + + if (!quorum.isHighlyAvailable()) + return; + + /** + * CRITICAL SECTION. We need obtain a distributed consensus for the + * services joined with the met quorum concerning the earliest + * commit point that is pinned by the combination of the active + * transactions and the minReleaseAge on the TXS. New transaction + * starts during this critical section will block (on the leader or + * the folllower) unless they are guaranteed to be allowable, e.g., + * based on the current minReleaseAge, the new tx would read from + * the most recent commit point, the new tx would ready from a + * commit point that is already pinned by an active transaction on + * that node, etc. + * + * Note: Lock makes this section MUTEX with awaitServiceJoin(). + * + * @see <a href= + * "https://docs.google.com/document/d/14FO2yJFv_7uc5N0tvYboU-H6XbLEFpvu-G8RhAzvxrk/edit?pli=1#" + * > HA TXS Design Document </a> + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/623" + * > HA TXS / TXS Bottleneck </a> + */ + + store._gatherLock.lock(); + + try { + + // Atomic decision point for GATHER re joined services. + gatherJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices( + quorum); + + // Run the GATHER protocol. + consensusReleaseTime = ((AbstractHATransactionService) store + .getLocalTransactionManager().getTransactionService()) + .updateReleaseTimeConsensus(newCommitCounter, + commitTime, gatherJoinedAndNonJoinedServices + .getJoinedServiceIds(), store + .getHAReleaseTimeConsensusTimeout(), + TimeUnit.MILLISECONDS); + + } catch (Exception ex) { + + log.error(ex, ex); + + // Wrap and rethrow. + throw new RuntimeException(ex); + + } finally { + + store._gatherLock.unlock(); + + } + + } + + /** + * Set by {@link #gatherPhase()} IFF HA. + */ + private IJoinedAndNonJoinedServices gatherJoinedAndNonJoinedServices = null; + /** + * Set by {@link #gatherPhase()} IFF HA. + */ + private IHANotifyReleaseTimeResponse consensusReleaseTime = null; + + /** + * Simple (non-HA) commit. + */ + private void commitSimple() { + + /* + * Force application data to stable storage _before_ + * we update the root blocks. This option guarantees + * that the application data is stable on the disk + * before the atomic commit. Some operating systems + * and/or file systems may otherwise choose an + * ordered write with the consequence that the root + * blocks are laid down on the disk before the + * application data and a hard failure could result + * in the loss of application data addressed by the + * new root blocks (data loss on restart). + * + * Note: We do not force the file metadata to disk. + * If that is done, it will be done by a force() + * after we write the root block on the disk. + */ + if (store.doubleSync) { + + _bufferStrategy.force(false/* metadata */); + + } + + // write the root block on to the backing store. + _bufferStrategy.writeRootBlock(newRootBlock, store.forceOnCommit); + + if (_bufferStrategy instanceof IRWStrategy) { + + /* + * Now the root blocks are down we can commit any transient + * state. + */ + + ((IRWStrategy) _bufferStrategy).postCommit(); + + } + + // set the new root block. + store._rootBlock = newRootBlock; + + // reload the commit record from the new root block. + store._commitRecord = store._getCommitRecord(); + + if (txLog.isInfoEnabled()) + txLog.info("COMMIT: commitTime=" + commitTime); + + } + + /** + * HA mode commit (2-phase commit). + * <p> + * Note: We need to make an atomic decision here regarding whether a + * service is joined with the met quorum or not. This information will + * be propagated through the HA 2-phase prepare message so services will + * know how they must intepret the 2-phase prepare(), commit(), and + * abort() requests. The atomic decision is necessary in order to + * enforce a consistent role on a services that is resynchronizing and + * which might vote to join the quorum and enter the quorum + * asynchronously with respect to this decision point. + * + * TODO If necessary, we could also explicitly provide the zk version + * metadata for the znode that is the parent of the joined services. + * However, we would need an expanded interface to get that metadata + * from zookeeper out of the Quorum. + */ + private void commitHA() { + + try { + + if(!prepare2Phase()) { + + // PREPARE rejected. + throw new QuorumException("PREPARE rejected: nyes=" + + prepareResponse.getYesCount() + ", replicationFactor=" + + prepareResponse.replicationFactor()); + + } + + commitRequest = new CommitRequest(prepareRequest, + prepareResponse); + + quorumService.commit2Phase(commitRequest); + + } catch (Throwable e) { + + try { + /* + * Something went wrong. Any services that were in the + * pipeline could have a dirty write set. Services that + * voted NO will have already discarded their dirty write + * set. We issue an abort2Phase() to tell the other services + * to discard the dirty write set as well. + * + * TODO We only need to issue the 2-phase abort against + * those services that (a) were joined with the met quorum; + * and (b) voted YES in response to the PREPARE message (any + * service that voted NO already discarded its dirty write + * set). + * + * TODO The service should only do the 2-phase abort if the + * commitToken and commitCounter are valid. If the quorum + * breaks, then the services will move into the Error state + * and will do a local abort as part of that transition. + */ + quorumService.abort2Phase(commitToken); + } catch (Throwable t) { + log.warn(t, t); + } + + if (commitRequest != null) { + /* + * The quorum voted to commit, but something went wrong. + * + * This forces the leader to fail over. The quorum can then + * meet up again around a new consensus. + * + * Note: It is possible that a new consensus can not be + * formed. The 2-phase commit protocol does not handle some + * cases of compound failure. For example, consider the case + * where the HA cluster is running with bare majority of + * services. All services that are active vote YES, but one + * of those services fails before processing the COMMIT + * message. The quorum will not meet again unless a new + * consensus can be formed from the services that remain up + * and the services that were not running. The services that + * were not running will be at an earlier commit point so + * they can not form a consensus with the services that + * remain up. Further, there can not be a majority among the + * services that were not running (except in the edge case + * where the failed commit was the first commit since the + * last commit on one of the services that was down at the + * start of that failed commit). Situations of this type + * require operator intervention. E.g., explicitly rollback + * the database or copy HALog files from one machine to + * another such that it will apply those HALog files on + * restart and form a consensus with the other services. + */ + quorumService.enterErrorState(); + } + + // Re-throw the root cause exception. + throw new RuntimeException(e); + + } + + } + // Fields set by the method above. + private CommitRequest commitRequest; + + /** + * Return <code>true</code> iff the 2-phase PREPARE votes to COMMIT. + * + * @throws IOException + * @throws TimeoutException + * @throws InterruptedException + */ + private boolean prepare2Phase() throws InterruptedException, + TimeoutException, IOException { + + // Atomic decision point for joined vs non-joined services. + prepareJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices( + quorum); + + prepareRequest = new PrepareRequest(// + consensusReleaseTime,// + gatherJoinedAndNonJoinedServices,// + prepareJoinedAndNonJoinedServices,// + newRootBlock,// + quorumService.getPrepareTimeout(), // timeout + TimeUnit.MILLISECONDS// + ); + + // issue prepare request. + prepareResponse = quorumService.prepare2Phase(prepareRequest); + + if (haLog.isInfoEnabled()) + haLog.info(prepareResponse.toString()); + + return prepareResponse.willCommit(); + + } + // Fields set by the method above. + private IJoinedAndNonJoinedServices prepareJoinedAndNonJoinedServices; + private PrepareRequest prepareRequest; + private PrepareResponse prepareResponse; + + } // class CommitState. + + /** + * An atomic commit is performed by directing each registered + * {@link ICommitter} to flush its state onto the store using + * {@link ICommitter#handleCommit(long)}. The address returned by that + * method is the address from which the {@link ICommitter} may be reloaded + * (and its previous address if its state has not changed). That address is + * saved in the {@link ICommitRecord} under the index for which that + * committer was {@link #registerCommitter(int, ICommitter) registered}. We + * then force the data to stable store, update the root block, and force the + * root block and the file metadata to stable store. + * <p> + * Note: Each invocation of this method MUST use a distinct + * <i>commitTime</i> and the commitTimes MUST be monotonically increasing. + * These guarantees support both the database version history mechanisms and + * the High Availability mechanisms. + * + * @param commitTime + * The commit time either of a transaction or of an unisolated + * commit. Note that when mixing isolated and unisolated commits + * you MUST use the same {@link ITimestampService} for both + * purposes. + * + * @return The timestamp assigned to the commit record -or- 0L if there were + * no data to commit. + */ + // Note: Overridden by StoreManager (DataService). + protected long commitNow(final long commitTime) { + + final WriteLock lock = _fieldReadWriteLock.writeLock(); + + lock.lock(); + + try { + + assertOpen(); + + if (log.isInfoEnabled()) + log.info("commitTime=" + commitTime); + + final CommitState cs = new CommitState(this, commitTime); + + /* + * Flush application data, decide whether or not the store is dirty, + * and return immediately if it is not dirty. + */ + if (!cs.notifyCommitters()) { + + if (log.isInfoEnabled()) + log.info("Nothing to commit"); + + return 0L; + + } + + // Do GATHER (iff HA). + cs.gatherPhase(); + + /* + * Flush deferred frees (iff RWS), write the commit record onto the + * store, and write the commit record index onto the store. + */ + cs.writeCommitRecord(); + if (quorum != null) { /* * Verify that the last negotiated quorum is still valid. */ - quorum.assertLeader(commitToken); + quorum.assertLeader(cs.commitToken); } /* * Conditionally obtain a lock that will protect the * commit()/postCommit() protocol. */ - final long nextOffset; +// final long nextOffset; final Lock commitLock; if (_bufferStrategy instanceof IRWStrategy) { commitLock = ((IRWStrategy) _bufferStrategy).getCommitLock(); @@ -3266,271 +3751,48 @@ commitLock.lock(); } try { - /* - * Call commit on buffer strategy prior to retrieving root block, - * required for RWStore since the metaBits allocations are not made - * until commit, leading to invalid addresses for recent store - * allocations. - * - * Note: This will flush the write cache. For HA, that ensures that - * the write set has been replicated to the followers. - * - * Note: After this, we do not write anything on the backing store - * other than the root block. The rest of this code is dedicated to - * creating a properly formed root block. For a non-HA deployment, - * we just lay down the root block. For an HA deployment, we do a - * 2-phase commit. - * - * Note: In HA, the followers lay down the replicated writes - * synchronously. Thus, they are guaranteed to be on local storage - * by the time the leader finishes WriteCacheService.flush(). This - * does not create much latency because the WriteCacheService drains - * the dirtyList in a seperate thread. - */ - _bufferStrategy.commit(); + + // Flush writes to the backing store / followers. + cs.flushWriteSet(); - /* - * The next offset at which user data would be written. - * Calculated, after commit! - */ - nextOffset = _bufferStrategy.getNextOffset(); - - final long blockSequence; - - if (_bufferStrategy instanceof IHABufferStrategy) { - - // always available for HA. - blockSequence = ((IHABufferStrategy) _bufferStrategy) - .getBlockSequence(); - - } else { - - blockSequence = old.getBlockSequence(); - - } - - /* - * Prepare the new root block. - */ - final IRootBlockView newRootBlock; - { - - /* - * Update the firstCommitTime the first time a transaction - * commits and the lastCommitTime each time a transaction - * commits (these are commit timestamps of isolated or - * unisolated transactions). - */ - - final long firstCommitTime = (old.getFirstCommitTime() == 0L ? commitTime - : old.getFirstCommitTime()); - - final long priorCommitTime = old.getLastCommitTime(); - - if (priorCommitTime != 0L) { - - /* - * This is a local sanity check to make sure that the commit - * timestamps are strictly increasing. An error will be - * reported if the commit time for the current (un)isolated - * transaction is not strictly greater than the last commit - * time on the store as read back from the current root - * block. - */ - - assertPriorCommitTimeAdvances(commitTime, priorCommitTime); - - } - - final long lastCommitTime = commitTime; - final long metaStartAddr = _bufferStrategy.getMetaStartAddr(); - final long metaBitsAddr = _bufferStrategy.getMetaBitsAddr(); - - // Create the new root block. - newRootBlock = new RootBlockView(!old.isRootBlock0(), old - .getOffsetBits(), nextOffset, firstCommitTime, - lastCommitTime, newCommitCounter, commitRecordAddr, - commitRecordIndexAddr, old.getUUID(), // - blockSequence, commitToken,// - metaStartAddr, metaBitsAddr, old.getStoreType(), - old.getCreateTime(), old.getCloseTime(), - old.getVersion(), checker); - - } - + // Prepare the new root block. + cs.newRootBlock(); + if (quorum == null) { - /* - * Non-HA mode. - */ + // Non-HA mode. + cs.commitSimple(); - /* - * Force application data to stable storage _before_ - * we update the root blocks. This option guarantees - * that the application data is stable on the disk - * before the atomic commit. Some operating systems - * and/or file systems may otherwise choose an - * ordered write with the consequence that the root - * blocks are laid down on the disk before the - * application data and a hard failure could result - * in the loss of application data addressed by the - * new root blocks (data loss on restart). - * - * Note: We do not force the file metadata to disk. - * If that is done, it will be done by a force() - * after we write the root block on the disk. - */ - if (doubleSync) { - - _bufferStrategy.force(false/* metadata */); - - } - - // write the root block on to the backing store. - _bufferStrategy.writeRootBlock(newRootBlock, forceOnCommit); - - if (_bufferStrategy instanceof IRWStrategy) { - - /* - * Now the root blocks are down we can commit any transient - * state. - */ - - ((IRWStrategy) _bufferStrategy).postCommit(); - - } - - // set the new root block. - _rootBlock = newRootBlock; - - // reload the commit record from the new root block. - _commitRecord = _getCommitRecord(); - - if (txLog.isInfoEnabled()) - txLog.info("COMMIT: commitTime=" + commitTime); - } else { - - /* - * HA mode. - * - * Note: We need to make an atomic decision here regarding - * whether a service is joined with the met quorum or not. This - * information will be propagated through the HA 2-phase prepare - * message so services will know how they must intepret the - * 2-phase prepare(), commit(), and abort() requests. The atomic - * decision is necessary in order to enforce a consistent role - * on a services that is resynchronizing and which might vote to - * join the quorum and enter the quorum asynchronously with - * respect to this decision point. - * - * TODO If necessary, we could also explicitly provide the zk - * version metadata for the znode that is the parent of the - * joined services. However, we would need an expanded interface - * to get that metadata from zookeeper out of the Quorum.. - */ - - boolean didVoteYes = false; - try { - - // Atomic decision point for joined vs non-joined services. - final IJoinedAndNonJoinedServices prepareJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices( - quorum); - - final PrepareRequest req = new PrepareRequest(// - consensusReleaseTime,// - gatherJoinedAndNonJoinedServices,// - prepareJoinedAndNonJoinedServices,// - newRootBlock,// - quorumService.getPrepareTimeout(), // timeout - TimeUnit.MILLISECONDS// - ); - - // issue prepare request. - final PrepareResponse resp = quorumService - .prepare2Phase(req); - - if (haLog.isInfoEnabled()) - haLog.info(resp.toString()); - - if (resp.willCommit()) { - - didVoteYes = true; - - quorumService - .commit2Phase(new CommitRequest(req, resp)); - - } else { - - /* - * TODO We only need to issue the 2-phase abort - * against those services that (a) were joined with - * the met quorum; and (b) voted YES in response to - * the PREPARE message. - */ - try { - quorumService.abort2Phase(commitToken); - } finally { - throw new RuntimeException( - "PREPARE rejected: nyes=" - + resp.getYesCount() - + ", replicationFactor=" - + resp.replicationFactor()); - } - - } - - } catch (Throwable e) { - if (didVoteYes) { - /* - * The quorum voted to commit, but something went wrong. - * - * FIXME RESYNC : At this point the quorum is probably - * inconsistent in terms of their root blocks. Rather - * than attempting to send an abort() message to the - * quorum, we probably should force the leader to yield - * its role at which point the quorum will attempt to - * elect a new master and resynchronize. - */ - if (quorumService != null) { - try { - quorumService.abort2Phase(commitToken); - } catch (Throwable t) { - log.warn(t, t); - } - } - } else { - /* - * This exception was thrown during the abort handling - * logic. Note that we already attempting an 2-phase - * abort since the quorum did not vote "yes". - * - * TODO We should probably force a quorum break since - * there is clearly something wrong with the lines of - * communication among the nodes. - */ - } - throw new RuntimeException(e); - } - + + // HA mode commit (2-phase commit). + cs.commitHA(); + } // else HA mode } finally { - if(commitLock != null) { + + if (commitLock != null) { /* * Release the [commitLock] iff one was taken above. */ commitLock.unlock(); } + } - final long elapsedNanos = System.nanoTime() - beginNanos; + final long elapsedNanos = System.nanoTime() - cs.beginNanos; if (BigdataStatics.debug || log.isInfoEnabled()) { - final String msg = "commit: commitTime=" + commitTime + ", latency=" - + TimeUnit.NANOSECONDS.toMillis(elapsedNanos) + ", nextOffset=" + nextOffset + ", byteCount=" - + (nextOffset - byteCountBefore); - if (BigdataStatics.debug) + final String msg = "commit: commitTime=" + + cs.commitTime + + ", latency=" + + TimeUnit.NANOSECONDS.toMillis(elapsedNanos) + + ", nextOffset=" + + cs.newRootBlock.getNextOffset() + + ", byteCo... [truncated message content] |
From: <tho...@us...> - 2013-10-30 19:11:11
|
Revision: 7500 http://bigdata.svn.sourceforge.net/bigdata/?rev=7500&view=rev Author: thompsonbry Date: 2013-10-30 19:11:05 +0000 (Wed, 30 Oct 2013) Log Message: ----------- Changed the defaults for zookeeper and river to use highly available installations. Refactored where the WRITE_CACHE_BUFFERS value gets overriden so this is not BSBM specific. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAJournal.config Modified: branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAJournal.config =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAJournal.config 2013-10-30 19:10:02 UTC (rev 7499) +++ branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAJournal.config 2013-10-30 19:11:05 UTC (rev 7500) @@ -124,12 +124,10 @@ */ static private locators = new LookupLocator[] { - // runs jini on the localhost using unicast locators. - new LookupLocator("jini://bigdata17/") - // runs jini on one or more hosts using unicast locators. - //new LookupLocator("jini://"+jini1), - //new LookupLocator("jini://"+jini2), + new LookupLocator("jini://bigdata15/"), + new LookupLocator("jini://bigdata16/"), + new LookupLocator("jini://bigdata17/"), }; @@ -219,14 +217,6 @@ // Override the number of statements for batch inserts (default 10k) new NV(com.bigdata.rdf.sail.BigdataSail.Options.BUFFER_CAPACITY,"1000000"), - // Override the #of write cache buffers. - // - // Note: On BSBM 100M, using 200 x 1M buffers with a 20% threshold offers - // good compaction and reduces the IO Wait. Values larger than this do not - // add much of a performance boost. - // - new NV(com.bigdata.journal.Options.WRITE_CACHE_BUFFER_COUNT,"2000"), - new NV(com.bigdata.journal.Options.WRITE_CACHE_COMPACTION_THRESHOLD,"20"), */ }; @@ -244,13 +234,8 @@ /* A comma separated list of host:port pairs, where the port is * the CLIENT port for the zookeeper server instance. */ - // standalone. - servers = "bigdata17:2081"; // ensemble -// servers = bigdata.zoo1+":2181" -// + ","+bigdata.zoo2+":2181" -// + ","+bigdata.zoo3+":2181" -// ; + servers = "bigdata15:2081,bigdata16:2081,bigdata17:2081"; /* Session timeout (optional). */ sessionTimeout = bigdata.sessionTimeout; @@ -347,7 +332,7 @@ new NV(Options.BUFFER_MODE,""+BufferMode.DiskRW), - new NV(Options.WRITE_CACHE_BUFFER_COUNT,"500"), + new NV(Options.WRITE_CACHE_BUFFER_COUNT,"2000"), new NV(IndexMetadata.Options.WRITE_RETENTION_QUEUE_CAPACITY,"4000"), This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-30 19:10:09
|
Revision: 7499 http://bigdata.svn.sourceforge.net/bigdata/?rev=7499&view=rev Author: thompsonbry Date: 2013-10-30 19:10:02 +0000 (Wed, 30 Oct 2013) Log Message: ----------- Changed the log levels that would apply if people uncomment the HA related loggers. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/log4j.properties Modified: branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/log4j.properties =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/log4j.properties 2013-10-30 12:37:37 UTC (rev 7498) +++ branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/log4j.properties 2013-10-30 19:10:02 UTC (rev 7499) @@ -19,18 +19,18 @@ log4j.logger.com.bigdata.rdf.sail.webapp.NanoSparqlServer=ALL # HA related loggers (debugging only) -#log4j.logger.com.bigdata.ha=ALL -#log4j.logger.com.bigdata.txLog=ALL -#log4j.logger.com.bigdata.haLog=ALL -#log4j.logger.com.bigdata.rwstore=ALL -#log4j.logger.com.bigdata.journal=ALL -#log4j.logger.com.bigdata.journal.AbstractBufferStrategy=ALL -#log4j.logger.com.bigdata.journal.jini.ha=ALL -#log4j.logger.com.bigdata.service.jini.lookup=ALL -#log4j.logger.com.bigdata.quorum=ALL -#log4j.logger.com.bigdata.quorum.zk=ALL -#log4j.logger.com.bigdata.quorum.quorumState=ALL,destPlain -#log4j.logger.com.bigdata.io.writecache=ALL +#log4j.logger.com.bigdata.ha=INFO +#log4j.logger.com.bigdata.txLog=INFO +#log4j.logger.com.bigdata.haLog=INFO +##log4j.logger.com.bigdata.rwstore=ALL +#log4j.logger.com.bigdata.journal=INFO +##log4j.logger.com.bigdata.journal.AbstractBufferStrategy=ALL +#log4j.logger.com.bigdata.journal.jini.ha=INFO +##log4j.logger.com.bigdata.service.jini.lookup=ALL +#log4j.logger.com.bigdata.quorum=INFO +#log4j.logger.com.bigdata.quorum.zk=INFO +##log4j.logger.com.bigdata.quorum.quorumState=ALL,destPlain +##log4j.logger.com.bigdata.io.writecache=ALL # dest2 includes the thread name and elapsed milliseconds. # Note: %r is elapsed milliseconds. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-30 12:37:47
|
Revision: 7498 http://bigdata.svn.sourceforge.net/bigdata/?rev=7498&view=rev Author: thompsonbry Date: 2013-10-30 12:37:37 +0000 (Wed, 30 Oct 2013) Log Message: ----------- added optional property to enable remote debugging and/or profiler for the HAJournalServer. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAJournalServer.sh Modified: branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAJournalServer.sh =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAJournalServer.sh 2013-10-29 15:20:41 UTC (rev 7497) +++ branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAJournalServer.sh 2013-10-30 12:37:37 UTC (rev 7498) @@ -3,6 +3,15 @@ # Setup the environment. source src/resources/HAJournal/HAJournal.env +# Uncomment to enable profiler. +#profilerAgent=-agentpath:/nas/install/yjp-12.0.6/bin/linux-x86-64/libyjpagent.so + +# Uncomment to have all profiling initially disabled. +#profilerAgentOptions=-agentlib:yjpagent=disableexceptiontelemetry,disablestacktelemetry + +# Uncomment to enable remote debugging at the specified port. +#debug=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1046 + # Start an HAJournalServer. java\ ${JAVAOPTS}\ @@ -10,5 +19,7 @@ -Djava.security.policy=${POLICY_FILE}\ -Dlog4j.configuration=${LOG4J_CONFIG}\ -Djava.util.logging.config.file=${LOGGING_CONFIG}\ + ${debug}\ + ${profilerAgent} ${profilerAgentOptions}\ com.bigdata.journal.jini.ha.HAJournalServer\ ${HAJOURNAL_CONFIG} \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-29 15:20:49
|
Revision: 7497 http://bigdata.svn.sourceforge.net/bigdata/?rev=7497&view=rev Author: thompsonbry Date: 2013-10-29 15:20:41 +0000 (Tue, 29 Oct 2013) Log Message: ----------- Added script for running DumpJournal. Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/DumpJournal.sh Added: branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/DumpJournal.sh =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/DumpJournal.sh (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/DumpJournal.sh 2013-10-29 15:20:41 UTC (rev 7497) @@ -0,0 +1,23 @@ +#!/bin/bash + +# Dumps the data in the specified Journal file. The journal MUST NOT +# be in use by another process. +# +# usage: (option*) filename+ +# +# where option is any of: +# +# -namespace : Dump only those indices having the specified namespace prefix +# -history : Dump metadata for indices in all commit records (default only dumps the metadata for the indices as of the most current committed state). +# -indices : Dump the indices (does not show the tuples by default). +# -pages : Dump the pages of the indices and reports some information on the page size. +# -tuples : Dump the records in the indices. + +# Setup the source environment. +source src/resources/HAJournal/HAJournal.env + +java ${JAVA_OPTS} \ + -cp ${CLASSPATH} \ + -Djava.security.policy=${POLICY_FILE}\ + com.bigdata.journal.DumpJournal \ + $* This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-29 14:01:00
|
Revision: 7496 http://bigdata.svn.sourceforge.net/bigdata/?rev=7496&view=rev Author: thompsonbry Date: 2013-10-29 14:00:53 +0000 (Tue, 29 Oct 2013) Log Message: ----------- reduced WARN => INFO for RWStore allocator recycling in postHACommit() Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2013-10-29 13:58:38 UTC (rev 7495) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2013-10-29 14:00:53 UTC (rev 7496) @@ -6301,8 +6301,8 @@ } - // if (log.isInfoEnabled()) - log.warn("Released: " + totalFreed + " addresses from " + modCount + " modified Allocators"); + if (log.isInfoEnabled()) + log.info("Released: " + totalFreed + " addresses from " + modCount + " modified Allocators"); if (log.isTraceEnabled()) { log.trace("OLD BITS: " + BytesUtil.toHexString(oldmetabits)); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-29 13:58:44
|
Revision: 7495 http://bigdata.svn.sourceforge.net/bigdata/?rev=7495&view=rev Author: thompsonbry Date: 2013-10-29 13:58:38 +0000 (Tue, 29 Oct 2013) Log Message: ----------- formatting of code for NPE fix. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-29 12:57:08 UTC (rev 7494) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-29 13:58:38 UTC (rev 7495) @@ -1897,9 +1897,13 @@ if (log.isInfoEnabled()) log.info("Terminating service management threads."); - if (haClient != null && haClient.isConnected()) { // Note: null reference is possible if ctor fails. + // Note: null reference is possible if ctor fails. + if (haClient != null && haClient.isConnected()) { + haClient.disconnect(false/* immediateShutdown */); + } + if (lookupDiscoveryManager != null) { try { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-29 12:57:14
|
Revision: 7494 http://bigdata.svn.sourceforge.net/bigdata/?rev=7494&view=rev Author: thompsonbry Date: 2013-10-29 12:57:08 +0000 (Tue, 29 Oct 2013) Log Message: ----------- bug fix to script Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HALogReader.sh Modified: branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HALogReader.sh =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HALogReader.sh 2013-10-29 12:53:25 UTC (rev 7493) +++ branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HALogReader.sh 2013-10-29 12:57:08 UTC (rev 7494) @@ -11,4 +11,4 @@ -cp ${CLASSPATH} \ -Djava.security.policy=${POLICY_FILE}\ com.bigdata.ha.halog.HALogReader \ - * + $* This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-29 12:53:32
|
Revision: 7493 http://bigdata.svn.sourceforge.net/bigdata/?rev=7493&view=rev Author: thompsonbry Date: 2013-10-29 12:53:25 +0000 (Tue, 29 Oct 2013) Log Message: ----------- Adding script for HALogReader utility. Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HALogReader.sh Added: branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HALogReader.sh =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HALogReader.sh (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HALogReader.sh 2013-10-29 12:53:25 UTC (rev 7493) @@ -0,0 +1,14 @@ +#!/bin/bash + +# Reads the identified HALog file(s) +# +# usage: HALogReader fileOrDir(s) + +# Setup the source environment. +source src/resources/HAJournal/HAJournal.env + +java ${JAVA_OPTS} \ + -cp ${CLASSPATH} \ + -Djava.security.policy=${POLICY_FILE}\ + com.bigdata.ha.halog.HALogReader \ + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-29 12:19:45
|
Revision: 7492 http://bigdata.svn.sourceforge.net/bigdata/?rev=7492&view=rev Author: thompsonbry Date: 2013-10-29 12:19:39 +0000 (Tue, 29 Oct 2013) Log Message: ----------- Added sort for the top-level arguments as well to force lexical visitation order for HALog files. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java 2013-10-29 12:18:26 UTC (rev 7491) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java 2013-10-29 12:19:39 UTC (rev 7492) @@ -406,6 +406,17 @@ */ public static void main(final String[] args) throws InterruptedException { + /* + * Sort into lexical order to force visitation in lexical order. + * + * Note: This should work under any OS. Files will be either directory + * names (3 digits) or filenames (21 digits plus the file extension). + * Thus the comparison centers numerically on the digits that encode + * either part of a commit counter (subdirectory) or an entire commit + * counter (HALog file). + */ + Arrays.sort(args); + final IBufferAccess buf = DirectBufferPool.INSTANCE.acquire(); try { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-29 12:18:32
|
Revision: 7491 http://bigdata.svn.sourceforge.net/bigdata/?rev=7491&view=rev Author: thompsonbry Date: 2013-10-29 12:18:26 +0000 (Tue, 29 Oct 2013) Log Message: ----------- Modified the main() routine for HALogReader to not halt if it encounters an HALog file that it can not read. It now reports the error and continues. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java 2013-10-29 12:09:49 UTC (rev 7490) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java 2013-10-29 12:18:26 UTC (rev 7491) @@ -48,6 +48,7 @@ import com.bigdata.journal.StoreTypeEnum; import com.bigdata.util.ChecksumError; import com.bigdata.util.ChecksumUtility; +import com.bigdata.util.InnerCause; /** * Given an HALog file can be used to replay the file and can provide a readable @@ -403,8 +404,7 @@ * @throws IOException * @throws InterruptedException */ - public static void main(final String[] args) throws IOException, - InterruptedException { + public static void main(final String[] args) throws InterruptedException { final IBufferAccess buf = DirectBufferPool.INSTANCE.acquire(); @@ -442,8 +442,7 @@ } - private static void doDirectory(final File dir, final IBufferAccess buf) - throws IOException { + private static void doDirectory(final File dir, final IBufferAccess buf) { final File[] files = dir.listFiles(new FilenameFilter() { @@ -489,48 +488,76 @@ } - private static void doFile(final File file, final IBufferAccess buf) - throws IOException { + private static void doFile(final File file, final IBufferAccess buf) { - final HALogReader r = new HALogReader(file); + try { + + doFile2(file,buf); + + } catch (Throwable e) { + + if(InnerCause.isInnerCause(e, InterruptedException.class)) { + + // Propagate interrupt. + Thread.currentThread().interrupt(); + + } - try { + final String msg = "ERROR: Could not read file: file=" + file + + ", cause=" + e; + + System.err.println(msg); + + log.error(msg, e); + + } + + } - final IRootBlockView openingRootBlock = r.getOpeningRootBlock(); + private static void doFile2(final File file, final IBufferAccess buf) + throws IOException { + + final HALogReader r = new HALogReader(file); - final IRootBlockView closingRootBlock = r.getClosingRootBlock(); + try { - final boolean isWORM = openingRootBlock.getStoreType() == StoreTypeEnum.WORM; + final IRootBlockView openingRootBlock = r.getOpeningRootBlock(); - if (openingRootBlock.getCommitCounter() == closingRootBlock - .getCommitCounter()) { + final IRootBlockView closingRootBlock = r.getClosingRootBlock(); - System.err.println("EMPTY LOG: " + file); + final boolean isWORM = openingRootBlock.getStoreType() == StoreTypeEnum.WORM; - } + System.out.println("----------begin----------"); + System.out.println("file=" + file); + System.out.println("openingRootBlock=" + openingRootBlock); + System.out.println("closingRootBlock=" + closingRootBlock); - System.out.println("----------begin----------"); - System.out.println("file=" + file); - System.out.println("openingRootBlock=" + openingRootBlock); - System.out.println("closingRootBlock=" + closingRootBlock); + if (openingRootBlock.getCommitCounter() == closingRootBlock + .getCommitCounter()) { - while (r.hasMoreBuffers()) { + System.err + .println("WARN : LOGICALLY EMPTY LOG (closing root block == opening root block): file=" + + file); - // don't pass buffer in if WORM, just validate the messages - final IHAWriteMessage msg = r.processNextBuffer(isWORM ? null - : buf.buffer()); + } + + while (r.hasMoreBuffers()) { - System.out.println(msg.toString()); + // don't pass buffer in if WORM, just validate the messages + final IHAWriteMessage msg = r.processNextBuffer(isWORM ? null + : buf.buffer()); - } - System.out.println("-----------end-----------"); + System.out.println(msg.toString()); - } finally { + } + System.out.println("-----------end-----------"); - r.close(); + } finally { - } + r.close(); + } + } @Override This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-29 12:09:56
|
Revision: 7490 http://bigdata.svn.sourceforge.net/bigdata/?rev=7490&view=rev Author: thompsonbry Date: 2013-10-29 12:09:49 +0000 (Tue, 29 Oct 2013) Log Message: ----------- added the file name into the error messages. javadoc update on class. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java 2013-10-29 12:07:08 UTC (rev 7489) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java 2013-10-29 12:09:49 UTC (rev 7490) @@ -37,6 +37,7 @@ import org.apache.log4j.Logger; +import com.bigdata.ha.msg.IHAMessage; import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.FileChannelUtility; @@ -50,17 +51,13 @@ /** * Given an HALog file can be used to replay the file and can provide a readable - * dump of the content. + * dump of the content. When replaying, the current position is compared to the + * EOF to determine whether more data can be read. The called should call + * {@link IHALogReader#hasMoreBuffers()} and if so read the next associated + * buffer and process with the returned {@link IHAMessage}. If + * {@link IHALogReader#hasMoreBuffers()} is false, then the committing + * {@link IRootBlockView} should be used to commit the replayed transaction. * - * When replaying, the current position is compared to the EOF to determine - * whether more data can be read. - * - * The called should call hasMoreBuffers() and if so read the next associated - * buffer and process with the returned IHAMessage. - * - * If hasMoreBuffers() is false, then the committing rootBlock should be used to - * commit the replayed transaction. - * * @author Martyn Cutcher */ public class HALogReader implements IHALogReader { @@ -121,17 +118,19 @@ */ magic = m_raf.readInt(); } catch (IOException ex) { - throw new RuntimeException( - "Can not read magic. Is file locked by another process?", - ex); + throw new RuntimeException( + "Can not read magic. Is file locked by another process? file=" + + file, ex); } - if (magic != HALogWriter.MAGIC) - throw new RuntimeException("Bad HALog magic: expected=" - + HALogWriter.MAGIC + ", actual=" + magic); + if (magic != HALogWriter.MAGIC) + throw new RuntimeException("Bad HALog magic: file=" + file + + ", expected=" + HALogWriter.MAGIC + ", actual=" + + magic); version = m_raf.readInt(); if (version != HALogWriter.VERSION1) - throw new RuntimeException("Bad HALog version: expected=" - + HALogWriter.VERSION1 + ", actual=" + version); + throw new RuntimeException("Bad HALog version: file=" + file + + ", expected=" + HALogWriter.VERSION1 + ", actual=" + + version); final RootBlockUtility tmp = new RootBlockUtility(reopener, file, true/* validateChecksum */, false/* alternateRootBlock */, @@ -151,8 +150,9 @@ * Counters are inconsistent with either an empty log file or a * single transaction scope. */ - throw new IllegalStateException("Incompatible rootblocks: cc0=" - + cc0 + ", cc1=" + cc1); + throw new IllegalStateException( + "Incompatible rootblocks: file=" + file + ", cc0=" + + cc0 + ", cc1=" + cc1); } m_channel.position(HALogWriter.headerSize0); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-29 12:07:14
|
Revision: 7489 http://bigdata.svn.sourceforge.net/bigdata/?rev=7489&view=rev Author: thompsonbry Date: 2013-10-29 12:07:08 +0000 (Tue, 29 Oct 2013) Log Message: ----------- changed "journal" to "HALog" in error messages for MAGIC and VERSION. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java 2013-10-28 19:16:50 UTC (rev 7488) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java 2013-10-29 12:07:08 UTC (rev 7489) @@ -126,11 +126,11 @@ ex); } if (magic != HALogWriter.MAGIC) - throw new RuntimeException("Bad journal magic: expected=" + throw new RuntimeException("Bad HALog magic: expected=" + HALogWriter.MAGIC + ", actual=" + magic); version = m_raf.readInt(); if (version != HALogWriter.VERSION1) - throw new RuntimeException("Bad journal version: expected=" + throw new RuntimeException("Bad HALog version: expected=" + HALogWriter.VERSION1 + ", actual=" + version); final RootBlockUtility tmp = new RootBlockUtility(reopener, file, This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-28 19:16:57
|
Revision: 7488 http://bigdata.svn.sourceforge.net/bigdata/?rev=7488&view=rev Author: thompsonbry Date: 2013-10-28 19:16:50 +0000 (Mon, 28 Oct 2013) Log Message: ----------- Handled possible NPE (queryId2 is not defined in the constructor and so it can be null in the finally clause). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java 2013-10-28 15:23:15 UTC (rev 7487) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java 2013-10-28 19:16:50 UTC (rev 7488) @@ -1093,7 +1093,7 @@ } finally { endNanos = System.nanoTime(); m_queries.remove(queryId); - m_queries2.remove(queryId2); + if (queryId2 != null) m_queries2.remove(queryId2); // if (os != null) { // try { // os.close(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-28 15:23:36
|
Revision: 7487 http://bigdata.svn.sourceforge.net/bigdata/?rev=7487&view=rev Author: thompsonbry Date: 2013-10-28 15:23:15 +0000 (Mon, 28 Oct 2013) Log Message: ----------- Checking HAClient reference for possible null value in terminate(). The field is final, but it can be null if there is a problem during the constructor call that winds up calling through to terminate(). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-28 13:22:57 UTC (rev 7486) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-28 15:23:15 UTC (rev 7487) @@ -1897,9 +1897,9 @@ if (log.isInfoEnabled()) log.info("Terminating service management threads."); - if (haClient.isConnected()) + if (haClient != null && haClient.isConnected()) { // Note: null reference is possible if ctor fails. haClient.disconnect(false/* immediateShutdown */); - + } if (lookupDiscoveryManager != null) { try { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-10-28 13:23:06
|
Revision: 7486 http://bigdata.svn.sourceforge.net/bigdata/?rev=7486&view=rev Author: mrpersonick Date: 2013-10-28 13:22:57 +0000 (Mon, 28 Oct 2013) Log Message: ----------- fixing some comments Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java 2013-10-25 21:18:46 UTC (rev 7485) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java 2013-10-28 13:22:57 UTC (rev 7486) @@ -326,6 +326,17 @@ final boolean noInput = chunkIn == null || chunkIn.length == 0 || (chunkIn.length == 1 && chunkIn[0].isEmpty()); + /* + * We need to keep a collection of parent solutions to join + * against the output from the fixed point operation. + */ + final Map<IConstant<?>, List<IBindingSet>> parentSolutionsToJoin = + noInput ? null : new LinkedHashMap<IConstant<?>, List<IBindingSet>>(); + + /* + * The join var is what we use to join the parent solutions to the + * output from the fixed point operation. + */ final IVariable<?> joinVar = gearing.inVar != null ? gearing.inVar : (gearing.outVar != null ? gearing.outVar : null); @@ -334,29 +345,22 @@ log.debug("join var: " + joinVar); } - /* - * Fix cardinality problem here - */ - final Map<IConstant<?>, List<IBindingSet>> chunkInBySolutionKey = - noInput ? null : - new LinkedHashMap<IConstant<?>, List<IBindingSet>>(); - if (!noInput) { for (IBindingSet parentSolutionIn : chunkIn) { - final IConstant<?> key = joinVar != null ? parentSolutionIn.get(joinVar) : null; //newSolutionKey(gearing, parentSolutionIn); + final IConstant<?> key = joinVar != null ? parentSolutionIn.get(joinVar) : null; if (log.isDebugEnabled()) { log.debug("adding parent solution for joining: " + parentSolutionIn); log.debug("join key: " + key); } - if (!chunkInBySolutionKey.containsKey(key)) { - chunkInBySolutionKey.put(key, new ArrayList<IBindingSet>()); + if (!parentSolutionsToJoin.containsKey(key)) { + parentSolutionsToJoin.put(key, new ArrayList<IBindingSet>()); } - chunkInBySolutionKey.get(key).add(parentSolutionIn); + parentSolutionsToJoin.get(key).add(parentSolutionIn); } @@ -628,13 +632,21 @@ } + /* + * Add the necessary zero-length path solutions for the case where + * there are variables on both side of the operator. + */ if (lowerBound == 0 && (gearing.inVar != null && gearing.outVar != null)) { - final Map<SolutionKey, IBindingSet> zlps = new LinkedHashMap<SolutionKey, IBindingSet>(); + final Map<SolutionKey, IBindingSet> zlps = + new LinkedHashMap<SolutionKey, IBindingSet>(); for (IBindingSet bs : solutionsOut.values()) { - // is this right?? + /* + * Do not handle the case where the out var is bound by + * the incoming solutions. + */ if (bs.isBound(gearing.outVar)) { continue; @@ -735,9 +747,10 @@ final IConstant<?> key = joinVar != null ? bs.get(joinVar) : null; - if (key != null && chunkInBySolutionKey.containsKey(key)) { + if (key != null && parentSolutionsToJoin.containsKey(key)) { - final List<IBindingSet> parentSolutionsIn = chunkInBySolutionKey.get(key); + final List<IBindingSet> parentSolutionsIn = + parentSolutionsToJoin.get(key); if (log.isDebugEnabled()) { log.debug("join key: " + key); @@ -818,13 +831,13 @@ /* * Always do the null solutions if there are any. */ - if (chunkInBySolutionKey.containsKey(null)) { + if (parentSolutionsToJoin.containsKey(null)) { /* * Join the null solutions. These solutions represent * a cross product (no shared variables with the ALP node). */ - final List<IBindingSet> nullSolutions = chunkInBySolutionKey.get(null); + final List<IBindingSet> nullSolutions = parentSolutionsToJoin.get(null); if (log.isDebugEnabled()) { log.debug("null solutions to join: " + nullSolutions); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-25 21:18:59
|
Revision: 7485 http://bigdata.svn.sourceforge.net/bigdata/?rev=7485&view=rev Author: thompsonbry Date: 2013-10-25 21:18:46 +0000 (Fri, 25 Oct 2013) Log Message: ----------- Resolution for #718 (ZK disconnect) Merged back to the main development branch (revisions from r7464 to r7484 were merged back). {{{ merge -r7464:HEAD https://bigdata.svn.sourceforge.net/svnroot/bigdata/branches/ZK_DISCONNECT_HANDLING /Users/bryan/Documents/workspace/BIGDATA_RELEASE_1_3_0_CLEAN ... C /Users/bryan/Documents/workspace/BIGDATA_RELEASE_1_3_0_CLEAN/bigdata-jini/src/test/com/bigdata/zookeeper/TestAll.java C /Users/bryan/Documents/workspace/BIGDATA_RELEASE_1_3_0_CLEAN/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java C /Users/bryan/Documents/workspace/BIGDATA_RELEASE_1_3_0_CLEAN/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java C /Users/bryan/Documents/workspace/BIGDATA_RELEASE_1_3_0_CLEAN/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java C /Users/bryan/Documents/workspace/BIGDATA_RELEASE_1_3_0_CLEAN/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java ... ===== File Statistics: ===== Added: 4 Updated: 41 ==== Conflict Statistics: ===== File conflicts: 5 }}} The conflicts are all the files that I had locally modified when I created the ZK_DISCONNECT_HANDLING branch. In each case, I accepted the changes from the ZK_DISCONNECT_HANDLING branch. The HA CI test suite runs green locally (except for the 3 known failures related to #760). Revision Links: -------------- http://bigdata.svn.sourceforge.net/bigdata/?rev=7464&view=rev http://bigdata.svn.sourceforge.net/bigdata/?rev=7484&view=rev http://bigdata.svn.sourceforge.net/bigdata/?rev=7464&view=rev Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/.classpath branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/counters/AbstractProcessCollector.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/resources/logging/log4j-dev.properties branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestAll.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/jini/start/ManageLogicalServiceTask.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-A.config branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-B.config branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-C.config branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/service/jini/lookup/ServiceCache.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-A.config branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-B.config branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-C.config branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAll.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3DumpLogs.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/dumpFile.config branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/zkClient.config branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/AbstractZkQuorumTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/MockQuorumMember.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestAll.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkQuorum.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkSingletonQuorumSemantics.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/zookeeper/TestAll.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/zookeeper/TestZLockImpl.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/CreateKBTask.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java branches/BIGDATA_RELEASE_1_3_0/src/resources/HAJournal/HAJournal.config Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/lib/apache/zookeeper-3.3.3.jar branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumClient.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestSplitZPath.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java Property Changed: ---------------- branches/BIGDATA_RELEASE_1_3_0/ branches/BIGDATA_RELEASE_1_3_0/bigdata/lib/jetty/ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/aggregate/ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/util/ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/htree/raba/ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/jsr166/ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/bop/joinGraph/ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/bop/util/ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/jsr166/ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/util/httpd/ branches/BIGDATA_RELEASE_1_3_0/bigdata-compatibility/ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/attr/ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/disco/ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/util/config/ branches/BIGDATA_RELEASE_1_3_0/bigdata-perf/ branches/BIGDATA_RELEASE_1_3_0/bigdata-perf/btc/ branches/BIGDATA_RELEASE_1_3_0/bigdata-perf/btc/src/resources/ branches/BIGDATA_RELEASE_1_3_0/bigdata-perf/lubm/ branches/BIGDATA_RELEASE_1_3_0/bigdata-perf/uniprot/ branches/BIGDATA_RELEASE_1_3_0/bigdata-perf/uniprot/src/ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/changesets/ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/error/ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/internal/ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/relation/ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/util/ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/samples/ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/internal/ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/relation/ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/bench/ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/ branches/BIGDATA_RELEASE_1_3_0/dsi-utils/ branches/BIGDATA_RELEASE_1_3_0/dsi-utils/LEGAL/ branches/BIGDATA_RELEASE_1_3_0/dsi-utils/lib/ branches/BIGDATA_RELEASE_1_3_0/dsi-utils/src/ branches/BIGDATA_RELEASE_1_3_0/dsi-utils/src/java/ branches/BIGDATA_RELEASE_1_3_0/dsi-utils/src/java/it/ branches/BIGDATA_RELEASE_1_3_0/dsi-utils/src/java/it/unimi/ branches/BIGDATA_RELEASE_1_3_0/dsi-utils/src/test/ branches/BIGDATA_RELEASE_1_3_0/dsi-utils/src/test/it/unimi/ branches/BIGDATA_RELEASE_1_3_0/dsi-utils/src/test/it/unimi/dsi/ branches/BIGDATA_RELEASE_1_3_0/lgpl-utils/src/java/it/unimi/dsi/fastutil/bytes/custom/ branches/BIGDATA_RELEASE_1_3_0/lgpl-utils/src/test/it/unimi/dsi/fastutil/bytes/custom/ branches/BIGDATA_RELEASE_1_3_0/osgi/ branches/BIGDATA_RELEASE_1_3_0/src/resources/bin/config/ Property changes on: branches/BIGDATA_RELEASE_1_3_0 ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE:6769-6785 /branches/BIGDATA_RELEASE_1_2_0:6766-7380 /branches/BTREE_BUFFER_BRANCH:2004-2045 /branches/DEV_BRANCH_27_OCT_2009:2270-2546,2548-2782 /branches/INT64_BRANCH:4486-4522 /branches/JOURNAL_HA_BRANCH:2596-4066 /branches/LARGE_LITERALS_REFACTOR:4175-4387 /branches/LEXICON_REFACTOR_BRANCH:2633-3304 /branches/QUADS_QUERY_BRANCH:4525-4531,4550-4584,4586-4609,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE:7215-7271 /branches/RWSTORE_1_1_0_DEBUG:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH:4814-4836 /branches/bugfix-btm:2594-3237 /branches/dev-btm:2574-2730 /branches/fko:3150-3194 /trunk:3392-3437,3656-4061 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE:6769-6785 /branches/BIGDATA_RELEASE_1_2_0:6766-7380 /branches/BTREE_BUFFER_BRANCH:2004-2045 /branches/DEV_BRANCH_27_OCT_2009:2270-2546,2548-2782 /branches/INT64_BRANCH:4486-4522 /branches/JOURNAL_HA_BRANCH:2596-4066 /branches/LARGE_LITERALS_REFACTOR:4175-4387 /branches/LEXICON_REFACTOR_BRANCH:2633-3304 /branches/QUADS_QUERY_BRANCH:4525-4531,4550-4584,4586-4609,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE:7215-7271 /branches/RWSTORE_1_1_0_DEBUG:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH:4814-4836 /branches/ZK_DISCONNECT_HANDLING:7465-7484 /branches/bugfix-btm:2594-3237 /branches/dev-btm:2574-2730 /branches/fko:3150-3194 /trunk:3392-3437,3656-4061 Modified: branches/BIGDATA_RELEASE_1_3_0/.classpath =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/.classpath 2013-10-25 20:18:24 UTC (rev 7484) +++ branches/BIGDATA_RELEASE_1_3_0/.classpath 2013-10-25 21:18:46 UTC (rev 7485) @@ -33,7 +33,7 @@ <classpathentry kind="src" path="bigdata-gas/src/test"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/dsi-utils-1.0.6-020610.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/lgpl-utils-1.0.6-020610.jar"/> - <classpathentry kind="lib" path="bigdata-jini/lib/apache/zookeeper-3.4.5.jar"/> + <classpathentry kind="lib" path="bigdata-jini/lib/apache/zookeeper-3.3.3.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/jetty/jetty-continuation-7.2.2.v20101205.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/jetty/jetty-http-7.2.2.v20101205.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/jetty/jetty-io-7.2.2.v20101205.jar"/> Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata/lib/jetty ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/lib/jetty:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/lib/jetty:6766-7380 /branches/INT64_BRANCH/bigdata/lib/jetty:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/lib/jetty:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/lib/jetty:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/lib/jetty:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/lib/jetty:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/lib/jetty:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/lib/jetty:6766-7380 /branches/INT64_BRANCH/bigdata/lib/jetty:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/lib/jetty:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/lib/jetty:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/lib/jetty:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/lib/jetty:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata/lib/jetty:7465-7484 Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/aggregate ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/java/com/bigdata/bop/aggregate:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/aggregate:6766-7380 /branches/INT64_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/java/com/bigdata/bop/aggregate:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/java/com/bigdata/bop/aggregate:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/java/com/bigdata/bop/aggregate:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/aggregate:6766-7380 /branches/INT64_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/java/com/bigdata/bop/aggregate:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/java/com/bigdata/bop/aggregate:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/bop/aggregate:7465-7484 Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/java/com/bigdata/bop/joinGraph:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/joinGraph:6766-7380 /branches/INT64_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/java/com/bigdata/bop/joinGraph:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/java/com/bigdata/bop/joinGraph:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/java/com/bigdata/bop/joinGraph:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/joinGraph:6766-7380 /branches/INT64_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/java/com/bigdata/bop/joinGraph:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/java/com/bigdata/bop/joinGraph:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/bop/joinGraph:7465-7484 Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/util ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/java/com/bigdata/bop/util:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/util:6766-7380 /branches/INT64_BRANCH/bigdata/src/java/com/bigdata/bop/util:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/util:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/java/com/bigdata/bop/util:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/java/com/bigdata/bop/util:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/java/com/bigdata/bop/util:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/java/com/bigdata/bop/util:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/util:6766-7380 /branches/INT64_BRANCH/bigdata/src/java/com/bigdata/bop/util:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/util:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/java/com/bigdata/bop/util:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/java/com/bigdata/bop/util:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/java/com/bigdata/bop/util:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/bop/util:7465-7484 Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/counters/AbstractProcessCollector.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/counters/AbstractProcessCollector.java 2013-10-25 20:18:24 UTC (rev 7484) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/counters/AbstractProcessCollector.java 2013-10-25 21:18:46 UTC (rev 7485) @@ -77,10 +77,13 @@ } /** + * {@inheritDoc} + * <p> * Creates the {@link ActiveProcess} and the - * {@link ActiveProcess#start(com.bigdata.counters.AbstractStatisticsCollector.AbstractProcessReader)}s - * it passing in the value returned by the {@link #getProcessReader()} + * {@link ActiveProcess#start(AbstractProcessReader)}s it passing in the + * value returned by the {@link #getProcessReader()} */ + @Override public void start() { log.info(""); @@ -91,6 +94,7 @@ } + @Override public void stop() { log.info(""); Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/htree/raba ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/java/com/bigdata/htree/raba:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/htree/raba:6766-7380 /branches/INT64_BRANCH/bigdata/src/java/com/bigdata/htree/raba:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/htree/raba:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/java/com/bigdata/htree/raba:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/java/com/bigdata/htree/raba:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/java/com/bigdata/htree/raba:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/java/com/bigdata/htree/raba:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/htree/raba:6766-7380 /branches/INT64_BRANCH/bigdata/src/java/com/bigdata/htree/raba:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/htree/raba:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/java/com/bigdata/htree/raba:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/java/com/bigdata/htree/raba:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/java/com/bigdata/htree/raba:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/htree/raba:7465-7484 Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-25 20:18:24 UTC (rev 7484) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-25 21:18:46 UTC (rev 7485) @@ -3293,7 +3293,7 @@ * The next offset at which user data would be written. * Calculated, after commit! */ - nextOffset = _bufferStrategy.getNextOffset(); + nextOffset = _bufferStrategy.getNextOffset(); final long blockSequence; @@ -3468,9 +3468,16 @@ * the met quorum; and (b) voted YES in response to * the PREPARE message. */ + try { + quorumService.abort2Phase(commitToken); + } finally { + throw new RuntimeException( + "PREPARE rejected: nyes=" + + resp.getYesCount() + + ", replicationFactor=" + + resp.replicationFactor()); + } - quorumService.abort2Phase(commitToken); - } } catch (Throwable e) { @@ -5422,8 +5429,17 @@ if (quorum == null) return; - // This quorum member. - final QuorumService<HAGlue> localService = quorum.getClient(); + // The HAQuorumService (if running). + final QuorumService<HAGlue> localService; + { + QuorumService<HAGlue> t; + try { + t = quorum.getClient(); + } catch (IllegalStateException ex) { + t = null; + } + localService = t; + } // Figure out the state transitions involved. final QuorumTokenTransitions transitionState = new QuorumTokenTransitions( Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/jsr166 ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/java/com/bigdata/jsr166:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/jsr166:6766-7380 /branches/INT64_BRANCH/bigdata/src/java/com/bigdata/jsr166:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/jsr166:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/java/com/bigdata/jsr166:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/java/com/bigdata/jsr166:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/java/com/bigdata/jsr166:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/java/com/bigdata/jsr166:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/jsr166:6766-7380 /branches/INT64_BRANCH/bigdata/src/java/com/bigdata/jsr166:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/jsr166:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/java/com/bigdata/jsr166:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/java/com/bigdata/jsr166:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/java/com/bigdata/jsr166:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/jsr166:7465-7484 Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-25 20:18:24 UTC (rev 7484) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-25 21:18:46 UTC (rev 7485) @@ -31,6 +31,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -48,7 +49,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -301,7 +301,9 @@ * Note: This is volatile to allow visibility without holding the * {@link #lock}. The field is only modified in {@link #start(QuorumClient)} * and {@link #terminate()}, and those methods use the {@link #lock} to - * impose an appropriate ordering over events. + * impose an appropriate ordering over events. The quorum is running iff + * there is a client for which it is delivering events. When <code>null</code>, + * the quorum is not running. * * @see #start(QuorumClient) */ @@ -582,71 +584,72 @@ */ interruptAll(); if (client == null) { - // No client is attached. + // No client? Not running. return; } if (log.isDebugEnabled()) log.debug("client=" + client); - if (client instanceof QuorumMember<?>) { - /* - * Update the distributed quorum state by removing our client - * from the set of member services. This will also cause a - * service leave, pipeline leave, and any vote to be withdrawn. - * - * We have observed Condition spins during terminate() that - * result in HAJournalServer hangs. This runs another Thread - * that will interrupt this Thread if the quorum member is - * unable to complete the memberRemove() within a timeout. - * - * Note: Since we are holding the lock in the current thread, we - * MUST execute memberRemove() in this thread (it requires the - * lock). Therefore, I have used a 2nd thread that will - * interrupt this thread if it does not succeed in a polite - * removal from the quorum within a timeout. - */ - { - final long MEMBER_REMOVE_TIMEOUT = 5000;// ms. - final AtomicBoolean didRemove = new AtomicBoolean(false); - final Thread self = Thread.currentThread(); - final Thread t = new Thread() { - public void run() { - try { - Thread.sleep(MEMBER_REMOVE_TIMEOUT); - } catch (InterruptedException e) { - // Expected. Ignored. - return; - } - if (!didRemove.get()) { - log.error("Timeout awaiting quorum member remove."); - self.interrupt(); - } - } - }; - t.setDaemon(true); - t.start(); - try { - // Attempt memberRemove() (interruptably). - actor.memberRemoveInterruptable(); - didRemove.set(true); // Success. - } catch (InterruptedException e) { - // Propagate the interrupt. - Thread.currentThread().interrupt(); - } finally { - t.interrupt(); // Stop execution of [t]. +// if (client instanceof QuorumMember<?>) { +// /* +// * Update the distributed quorum state by removing our client +// * from the set of member services. This will also cause a +// * service leave, pipeline leave, and any vote to be withdrawn. +// * +// * We have observed Condition spins during terminate() that +// * result in HAJournalServer hangs. This runs another Thread +// * that will interrupt this Thread if the quorum member is +// * unable to complete the memberRemove() within a timeout. +// * +// * Note: Since we are holding the lock in the current thread, we +// * MUST execute memberRemove() in this thread (it requires the +// * lock). Therefore, I have used a 2nd thread that will +// * interrupt this thread if it does not succeed in a polite +// * removal from the quorum within a timeout. +// */ +// { +// final long MEMBER_REMOVE_TIMEOUT = 5000;// ms. +// final AtomicBoolean didRemove = new AtomicBoolean(false); +// final Thread self = Thread.currentThread(); +// final Thread t = new Thread() { +// public void run() { +// try { +// Thread.sleep(MEMBER_REMOVE_TIMEOUT); +// } catch (InterruptedException e) { +// // Expected. Ignored. +// return; +// } +// if (!didRemove.get()) { +// log.error("Timeout awaiting quorum member remove."); +// self.interrupt(); +// } +// } +// }; +// t.setDaemon(true); +// t.start(); +// try { +// // Attempt memberRemove() (interruptably). +// actor.memberRemoveInterruptable(); +// didRemove.set(true); // Success. +// } catch (InterruptedException e) { +// // Propagate the interrupt. +// Thread.currentThread().interrupt(); +// } finally { +// t.interrupt(); // Stop execution of [t]. +// } +// } +// } + + if (watcher != null) { + try { + watcher.terminate(); + } catch (Throwable t) { + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + interrupted = true; + } else { + launderThrowable(t); } } } - - /* - * Let the service know that it is no longer running w/ the quorum. - */ - try { - client.terminate(); - } catch (Throwable t) { - launderThrowable(t); - } - if (watcher != null) - watcher.terminate(); if (watcherActionService != null) { watcherActionService.shutdown(); try { @@ -664,10 +667,15 @@ interrupted = true; } finally { /* - * Cancel any tasks which did terminate in a timely manner. + * Cancel any tasks which did not terminate in a timely manner. */ - watcherActionService.shutdownNow(); + final List<Runnable> notrun = watcherActionService.shutdownNow(); watcherActionService = null; + for (Runnable r : notrun) { + if (r instanceof Future) { + ((Future<?>) r).cancel(true/* mayInterruptIfRunning */); + } + } } } if (actorActionService != null) { @@ -684,10 +692,15 @@ interrupted = true; } finally { /* - * Cancel any tasks which did terminate in a timely manner. + * Cancel any tasks which did not terminate in a timely manner. */ - actorActionService.shutdownNow(); + final List<Runnable> notrun = actorActionService.shutdownNow(); actorActionService = null; + for (Runnable r : notrun) { + if (r instanceof Future) { + ((Future<?>) r).cancel(true/* mayInterruptIfRunning */); + } + } } } if (!sendSynchronous) { @@ -700,12 +713,41 @@ // Will be propagated below. interrupted = true; } finally { + /* + * Cancel any tasks which did terminate in a timely manner. + */ + final List<Runnable> notrun = eventService.shutdownNow(); eventService = null; + for (Runnable r : notrun) { + if (r instanceof Future) { + ((Future<?>) r).cancel(true/* mayInterruptIfRunning */); + } + } } } /* - * Signal all conditions so anyone blocked will wake up. + * Let the service know that it is no longer running w/ the quorum. */ + try { + client.terminate(); + } catch (Throwable t) { + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + interrupted = true; + } else { + launderThrowable(t); + } + } + /* + * Clear all internal state variables that mirror the distributed + * quorum state and then signal all conditions so anyone blocked + * will wake up. + */ + listeners.clear(); // discard listeners. + token = lastValidToken = NO_QUORUM; + members.clear(); + votes.clear(); + joined.clear(); + pipeline.clear(); quorumChange.signalAll(); membersChange.signalAll(); pipelineChange.signalAll(); @@ -713,8 +755,6 @@ joinedChange.signalAll(); // discard reference to the client. this.client = null; - // discard listeners. - listeners.clear(); } finally { lock.unlock(); } @@ -821,6 +861,19 @@ } } +// @Override + protected C getClientNoLock() { +// lock.lock(); +// try { + final C client = this.client; + if (client == null) + throw new IllegalStateException(); + return client; +// } finally { +// lock.unlock(); +// } + } + @Override public QuorumMember<S> getMember() { lock.lock(); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java 2013-10-25 20:18:24 UTC (rev 7484) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java 2013-10-25 21:18:46 UTC (rev 7485) @@ -82,11 +82,13 @@ void terminate(); /** - * The client has become disconnected from the quorum. This callback - * provides a hook to take any local actions that are required when the - * client can not longer rely on its role in the quorum state (if the client - * is disconnected from the quorum, then it is no longer part of the quorum, - * can not be a joined service, quorum member, etc). + * The client has become disconnected from the quorum (for zookeeper this is + * only generated if the session has expired rather than if there is a + * transient disconnect that can be cured). This callback provides a hook to + * take any local actions that are required when the client can not longer + * rely on its role in the quorum state (if the client is disconnected from + * the quorum, then it is no longer part of the quorum, can not be a joined + * service, quorum member, etc). */ void disconnected(); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/resources/logging/log4j-dev.properties =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/resources/logging/log4j-dev.properties 2013-10-25 20:18:24 UTC (rev 7484) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/resources/logging/log4j-dev.properties 2013-10-25 21:18:46 UTC (rev 7485) @@ -277,8 +277,8 @@ #log4j.logger.com.bigdata.journal.AbstractBufferStrategy=ALL log4j.logger.com.bigdata.journal.jini.ha=ALL #log4j.logger.com.bigdata.service.jini.lookup=ALL -#log4j.logger.com.bigdata.quorum=ALL -#log4j.logger.com.bigdata.quorum.zk=ALL +log4j.logger.com.bigdata.quorum=ALL +log4j.logger.com.bigdata.quorum.zk=ALL #log4j.logger.com.bigdata.quorum.quorumState=ALL,destPlain #log4j.logger.com.bigdata.io.writecache=ALL Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/bop/joinGraph ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/test/com/bigdata/bop/joinGraph:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/bop/joinGraph:6766-7380 /branches/INT64_BRANCH/bigdata/src/test/com/bigdata/bop/joinGraph:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/joinGraph:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/test/com/bigdata/bop/joinGraph:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/test/com/bigdata/bop/joinGraph:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/test/com/bigdata/bop/joinGraph:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/test/com/bigdata/bop/joinGraph:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/bop/joinGraph:6766-7380 /branches/INT64_BRANCH/bigdata/src/test/com/bigdata/bop/joinGraph:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/joinGraph:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/test/com/bigdata/bop/joinGraph:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/test/com/bigdata/bop/joinGraph:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/test/com/bigdata/bop/joinGraph:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/bop/joinGraph:7465-7484 Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/bop/util ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/test/com/bigdata/bop/util:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/bop/util:6766-7380 /branches/INT64_BRANCH/bigdata/src/test/com/bigdata/bop/util:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/util:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/test/com/bigdata/bop/util:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/test/com/bigdata/bop/util:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/test/com/bigdata/bop/util:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/test/com/bigdata/bop/util:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/bop/util:6766-7380 /branches/INT64_BRANCH/bigdata/src/test/com/bigdata/bop/util:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/util:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/test/com/bigdata/bop/util:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/test/com/bigdata/bop/util:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/test/com/bigdata/bop/util:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/bop/util:7465-7484 Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/jsr166 ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/test/com/bigdata/jsr166:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/jsr166:6766-7380 /branches/INT64_BRANCH/bigdata/src/test/com/bigdata/jsr166:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/jsr166:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/test/com/bigdata/jsr166:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/test/com/bigdata/jsr166:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/test/com/bigdata/jsr166:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/test/com/bigdata/jsr166:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/jsr166:6766-7380 /branches/INT64_BRANCH/bigdata/src/test/com/bigdata/jsr166:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/jsr166:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/test/com/bigdata/jsr166:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/test/com/bigdata/jsr166:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/test/com/bigdata/jsr166:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/jsr166:7465-7484 Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-25 20:18:24 UTC (rev 7484) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-25 21:18:46 UTC (rev 7485) @@ -742,15 +742,22 @@ // Save UUID -> QuorumMember mapping on the fixture. fixture.known.put(client.getServiceId(), client); } - + + @Override public void terminate() { - final MockQuorumWatcher watcher = (MockQuorumWatcher) getWatcher(); + MockQuorumWatcher watcher = null; + try { + watcher = (MockQuorumWatcher) getWatcher(); + } catch(IllegalStateException ex) { + // Already terminated. + } super.terminate(); // Stop the service accepting events for the watcher. watcherService.shutdownNow(); - // remove our watcher as a listener for the fixture's inner quorum. - fixture.removeWatcher(watcher); + // remove our watcher as a listener for the fixture's inner quorum. + if (watcher != null) + fixture.removeWatcher(watcher); } /** Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestAll.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestAll.java 2013-10-25 20:18:24 UTC (rev 7484) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestAll.java 2013-10-25 21:18:46 UTC (rev 7485) @@ -48,14 +48,6 @@ public class TestAll extends TestCase { private final static Logger log = Logger.getLogger(TestAll.class); - - /** - * FIXME This is used to avoid a CI deadlock that needs to be tracked down. - * - * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/530"> - * Journal HA </a> - */ - final private static boolean s_includeQuorum = true; /** * @@ -74,42 +66,38 @@ * Returns a test that will run each of the implementation specific test * suites in turn. */ - public static Test suite() - { + public static Test suite() { final TestSuite suite = new TestSuite("quorum"); - if (s_includeQuorum) { - /* - * Test the fixture used to test the quorums (the fixture builds on the - * same base class). - */ - suite.addTestSuite(TestMockQuorumFixture.class); - - /* - * Test the quorum semantics for a singleton quorum. This unit test - * allows us to verify that each quorum state change is translated into - * the appropriate methods against the public API of the quorum client - * or quorum member. - */ - suite.addTestSuite(TestSingletonQuorumSemantics.class); - - /* - * Test the quorum semantics for a highly available quorum of 3 - * nodes. The main points to test here are the particulars of events not - * observable with a singleton quorum, including a service join which - * does not trigger a quorum meet, a service leave which does not - * trigger a quorum break, a leader leave, etc. - */ - suite.addTestSuite(TestHA3QuorumSemantics.class); - - /* - * Run the test HA3 suite a bunch of times. - */ - suite.addTest(StressTestHA3.suite()); + /* + * Test the fixture used to test the quorums (the fixture builds on the + * same base class). + */ + suite.addTestSuite(TestMockQuorumFixture.class); - } - + /* + * Test the quorum semantics for a singleton quorum. This unit test + * allows us to verify that each quorum state change is translated into + * the appropriate methods against the public API of the quorum client + * or quorum member. + */ + suite.addTestSuite(TestSingletonQuorumSemantics.class); + + /* + * Test the quorum semantics for a highly available quorum of 3 nodes. + * The main points to test here are the particulars of events not + * observable with a singleton quorum, including a service join which + * does not trigger a quorum meet, a service leave which does not + * trigger a quorum break, a leader leave, etc. + */ + suite.addTestSuite(TestHA3QuorumSemantics.class); + + /* + * Run the test HA3 suite a bunch of times. + */ + suite.addTest(StressTestHA3.suite()); + return suite; } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java 2013-10-25 20:18:24 UTC (rev 7484) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java 2013-10-25 21:18:46 UTC (rev 7485) @@ -27,6 +27,7 @@ package com.bigdata.quorum; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -100,6 +101,66 @@ } + /** + * Unit test for quorum member add followed by the termination of the quorum + * client. This checks for proper termination of the client, including the + * clear down of the quorum's internal state. + * + * @throws InterruptedException + */ + public void test_memberAdd_terminateClient() throws InterruptedException { + + final Quorum<?, ?> quorum = quorums[0]; + final QuorumMember<?> client = clients[0]; + final QuorumActor<?,?> actor = actors[0]; + final UUID serviceId = client.getServiceId(); + + // client is not a member. + assertFalse(client.isMember()); + assertEquals(new UUID[] {}, quorum.getMembers()); + + // instruct actor to add client as a member. + actor.memberAdd(); + fixture.awaitDeque(); + + // client is a member. + assertTrue(client.isMember()); + assertEquals(new UUID[] {serviceId}, quorum.getMembers()); + + /* + * Verify termination of the quorum for that client. + */ + + assertEquals(client, quorum.getClient()); + + quorum.terminate(); + + try { + quorum.getClient(); + } catch (IllegalStateException ex) { + log.info("Ignoring expected exception: " + ex); + } + + // State was cleared. + assertEquals(Quorum.NO_QUORUM, quorum.token()); + assertEquals(Quorum.NO_QUORUM, quorum.lastValidToken()); + assertEquals(new UUID[] {}, quorum.getMembers()); + assertEquals(new UUID[] {}, quorum.getJoined()); + assertEquals(new UUID[] {}, quorum.getPipeline()); + assertEquals(Collections.emptyMap(), quorum.getVotes()); + + try { + // Note: Quorum reference was cleared. Client throws exception. + assertFalse(client.isMember()); + } catch (IllegalStateException ex) { + log.info("Ignoring expected exception: " + ex); + } + + // Double-termination is safe. + quorum.terminate(); + + } + /** * Unit test for write pipeline add/remove. * @throws InterruptedException @@ -345,6 +406,7 @@ * Unit test for the protocol up to a service join, which triggers a leader * election. Since the singleton quorum has only one member our client will * be elected the leader. + * * @throws InterruptedException */ public void test_serviceJoin() throws InterruptedException { @@ -479,6 +541,97 @@ } /** + * Unit test verifying that we clear down the quorum's reflection of the + * distributed quorum state where we first have a quorum meet and then + * terminate the quorum client. + * + * @throws InterruptedException + */ + public void test_serviceJoin_terminateClient() throws InterruptedException { + + final AbstractQuorum<?, ?> quorum = quorums[0]; + final MockQuorumMember<?> client = clients[0]; + final QuorumActor<?,?> actor = actors[0]; + final UUID serviceId = client.getServiceId(); + + final long lastCommitTime = 0L; + final long lastCommitTime2 = 2L; + + // declare the service as a quorum member. + actor.memberAdd(); + fixture.awaitDeque(); + + assertTrue(client.isMember()); + assertEquals(new UUID[]{serviceId},quorum.getMembers()); + + // add to the pipeline. + actor.pipelineAdd(); + fixture.awaitDeque(); + + assertTrue(client.isPipelineMember()); + assertEquals(new UUID[]{serviceId},quorum.getPipeline()); + + // cast a vote for a lastCommitTime. + actor.castVote(lastCommitTime); + fixture.awaitDeque(); + + assertEquals(1,quorum.getVotes().size()); + assertEquals(new UUID[] { serviceId }, quorum.getVotes().get( + lastCommitTime)); + + // verify the consensus was updated. + assertEquals(lastCommitTime, client.lastConsensusValue); + + // wait for quorum meet. + final long token1 = quorum.awaitQuorum(); + + // verify service was joined. + assertTrue(client.isJoinedMember(quorum.token())); + assertEquals(new UUID[] { serviceId }, quorum.getJoined()); + + // validate the token was assigned. + fixture.awaitDeque(); + assertEquals(Quorum.NO_QUORUM + 1, quorum.lastValidToken()); + assertEquals(Quorum.NO_QUORUM + 1, quorum.token()); + assertTrue(quorum.isQuorumMet()); + + /* + * Terminate the quorum. The state should be cleared down. + */ + + // Verify termination of the quorum for that client. + assertEquals(client, quorum.getClient()); + + // terminate the quorum. + quorum.terminate(); + + try { + quorum.getClient(); + } catch (IllegalStateException ex) { + log.info("Ignoring expected exception: " + ex); + } + + // State was cleared. + assertEquals(Quorum.NO_QUORUM, quorum.token()); + assertEquals(Quorum.NO_QUORUM, quorum.lastValidToken()); + assertEquals(new UUID[] {}, quorum.getMembers()); + assertEquals(new UUID[] {}, quorum.getJoined()); + assertEquals(new UUID[] {}, quorum.getPipeline()); + assertEquals(Collections.emptyMap(), quorum.getVotes()); + + try { + // Note: Quorum reference was cleared. Client throws exception. + assertFalse(client.isMember()); + } catch (IllegalStateException ex) { + log.info("Ignoring expected exception: " + ex); + } + + // re-terminate() is safe. + quorum.terminate(); + + } + + /** * Unit test of timeout in {@link Quorum#awaitQuorum(long, TimeUnit)}. and * {@link Quorum#awaitBreak(long, TimeUnit)}. * Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/util/httpd ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/test/com/bigdata/util/httpd:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/util/httpd:6766-7380 /branches/INT64_BRANCH/bigdata/src/test/com/bigdata/util/httpd:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/httpd:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/test/com/bigdata/util/httpd:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/test/com/bigdata/util/httpd:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/test/com/bigdata/util/httpd:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata/src/test/com/bigdata/util/httpd:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/util/httpd:6766-7380 /branches/INT64_BRANCH/bigdata/src/test/com/bigdata/util/httpd:4486-4522 /branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/httpd:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata/src/test/com/bigdata/util/httpd:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata/src/test/com/bigdata/util/httpd:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata/src/test/com/bigdata/util/httpd:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/util/httpd:7465-7484 Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata-compatibility ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata-compatibility:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata-compatibility:6766-7380 /branches/INT64_BRANCH/bigdata-compatibility:4486-4522 /branches/LARGE_LITERALS_REFACTOR/bigdata-compatibility:4175-4387 /branches/QUADS_QUERY_BRANCH/bigdata-compatibility:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata-compatibility:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata-compatibility:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata-compatibility:4814-4836 + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata-compatibility:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata-compatibility:6766-7380 /branches/INT64_BRANCH/bigdata-compatibility:4486-4522 /branches/LARGE_LITERALS_REFACTOR/bigdata-compatibility:4175-4387 /branches/QUADS_QUERY_BRANCH/bigdata-compatibility:4525-4531,4533-4548,4550-4584,4586-4609,4611-4632,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE/bigdata-compatibility:7215-7271 /branches/RWSTORE_1_1_0_DEBUG/bigdata-compatibility:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH/bigdata-compatibility:4814-4836 /branches/ZK_DISCONNECT_HANDLING/bigdata-compatibility:7465-7484 Copied: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/lib/apache/zookeeper-3.3.3.jar (from rev 7484, branches/ZK_DISCONNECT_HANDLING/bigdata-jini/lib/apache/zookeeper-3.3.3.jar) =================================================================== (Binary files differ) Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/attr ___________________________________________________________________ Modified: svn:mergeinfo - /branches/BIGDATA_OPENRDF_2_6_9_UPDATE/bigdata-jini/src/java/com/bigdata/attr:6769-6785 /branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/attr:6766-7380 /branches/BTREE_BUFFER_BRANCH/bigdata-jini/src/java/com/bigdata/attr:2004-2045 /branches/DEV_BRANCH_27_OCT_2009/bigdata-jini/src/java/com/bigdata/attr:2270-2546,2548-2782 /branches/INT64_BRANCH/bigdata-jini/src/java/com/bigdata/attr:4486-4522 /branches/JOURNAL_HA_BRANCH/bigdata-jini/src/java/com/bigdata/attr:2596-4066 /branch... [truncated message content] |
From: <tho...@us...> - 2013-10-25 20:18:33
|
Revision: 7484 http://bigdata.svn.sourceforge.net/bigdata/?rev=7484&view=rev Author: thompsonbry Date: 2013-10-25 20:18:24 +0000 (Fri, 25 Oct 2013) Log Message: ----------- Closing out #718. Will merge change set to main development branch. - HAJournalServer: modified to drop replicated write if getHAReady() == Quorum.NO_QUORUM. This prevents replicated live writes from being applied if the local journal is not HAReady. - ZkQuorumImpl: a zookeeper disconnect no longer triggers the QUORUM_DISCONNECT event. That event is only trigger for an expired session. - BigdataServlet: simplified the logic to decide if a service can be written or read in HA mode. This closes some gaps when the HAQuorumService is not running. - HA CI test suite: added several new tests to provide additional coverage when zookeeper is stopped. We have identified two new problems through these tests: (1) the code was failing to throw out an exception if a majority of the services did not vote to PREPARE. The write set was discarded, but the caller was not informed that the commit failed. In general, we need more tests in which the services are in bad states in order to vet the commit2Phase logic paths for various exception conditions. This problem was identified in testAB_stopZookeeper_failB_startZookeeper_quorumBreaksThenMeets(); (2) testAB_stopZookeeper_failB_startZookeeper_quorumBreaksThenMeets() does not accept new transactions after we restart zookeeper. (3) As as-yet not identified problem occurs when we cause the leader to fail. This is demonstrated by testABC_stopZookeeper_failA_startZookeeper_quorumMeetsAgainOnNewToken(). The root cause might be related to the root cause for (2). (1), (2), and (3) are related to #760 (HA 2-phase commit semantics). Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/QuorumClient.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java branches/ZK_DISCONNECT_HANDLING/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-25 16:26:56 UTC (rev 7483) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-25 20:18:24 UTC (rev 7484) @@ -3293,7 +3293,7 @@ * The next offset at which user data would be written. * Calculated, after commit! */ - nextOffset = _bufferStrategy.getNextOffset(); + nextOffset = _bufferStrategy.getNextOffset(); final long blockSequence; @@ -3468,9 +3468,16 @@ * the met quorum; and (b) voted YES in response to * the PREPARE message. */ + try { + quorumService.abort2Phase(commitToken); + } finally { + throw new RuntimeException( + "PREPARE rejected: nyes=" + + resp.getYesCount() + + ", replicationFactor=" + + resp.replicationFactor()); + } - quorumService.abort2Phase(commitToken); - } } catch (Throwable e) { Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/QuorumClient.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/QuorumClient.java 2013-10-25 16:26:56 UTC (rev 7483) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/QuorumClient.java 2013-10-25 20:18:24 UTC (rev 7484) @@ -82,11 +82,13 @@ void terminate(); /** - * The client has become disconnected from the quorum. This callback - * provides a hook to take any local actions that are required when the - * client can not longer rely on its role in the quorum state (if the client - * is disconnected from the quorum, then it is no longer part of the quorum, - * can not be a joined service, quorum member, etc). + * The client has become disconnected from the quorum (for zookeeper this is + * only generated if the session has expired rather than if there is a + * transient disconnect that can be cured). This callback provides a hook to + * take any local actions that are required when the client can not longer + * rely on its role in the quorum state (if the client is disconnected from + * the quorum, then it is no longer part of the quorum, can not be a joined + * service, quorum member, etc). */ void disconnected(); Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-25 16:26:56 UTC (rev 7483) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-25 20:18:24 UTC (rev 7484) @@ -42,7 +42,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -2084,14 +2083,6 @@ * * If we are (or become) connected with zookeeper before the * timeout, then do serviceLeave() for this service. - * - * TODO We may *choose* to watch the ZooKeeper client - * connection in RunMet. If it remains disconnected, then we - * could tear down the service. However, is not required. - * Bigdata should be fine without zookeeper as long as all - * services remain met. ZooKeeper is only used to coordinate - * changes in the quorum state. E.g., leader election, - * pipeline order change, etc. */ // session timeout as configured. final int sessionTimeout1 = server.getHAClient().zooConfig.sessionTimeout; @@ -2124,6 +2115,12 @@ final long elapsed = System.nanoTime() - begin; if (elapsed > TimeUnit.MILLISECONDS .toNanos(sessionTimeout)) { + /* + * TODO This forces the connection from CONNECTING + * to CLOSED if we are in the Error state and unable + * to connect to zookeeper. This might not be + * strictly necessary. + */ log.error("Tearing down service: ZK Session remains disconnected for " + TimeUnit.NANOSECONDS.toMillis(elapsed) + "ms, effectiveTimeout=" + sessionTimeout); @@ -3695,7 +3692,8 @@ assert req == null; // Note: MUST be a live message! - if (!isJoinedMember(msg.getQuorumToken())) { + if (journal.getHAReady() == Quorum.NO_QUORUM + || !isJoinedMember(msg.getQuorumToken())) { /* * If we are not joined, we can not do anything with a Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-25 16:26:56 UTC (rev 7483) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-25 20:18:24 UTC (rev 7484) @@ -1463,13 +1463,45 @@ } /** - * This service has become disconnected from the zookeeper ensemble. + * This service has become disconnected (at the TCP layer) from the + * zookeeper ensemble - this DOES NOT imply that the client session is + * expired. + * <p> + * Note: The client side of session CAN NOT be expired unless the client + * is connected. Session expiration is decided by the zookeeper server + * process, not the client. + * <p> + * Note: A {@link ZooKeeper} client connection that has become + * disconnected will be automatically cured once the client re-connects. + * When this happens, the watchers will be retriggered (automatically) + * and the {@link ZKQuorumImpl} will have an opportunity to + * resynchronize with any state changes in the quorum. THEREFORE, we do + * not force a session expire or a transition to an error state if the + * {@link ZooKeeper} client is transiently disconnected (or if the + * session is moved to another zookeeper server process). + * <p> + * There are serveral possible causes of a disconnect: + * <ul> + * + * <li>zookeeper server process is down.</li> + * + * <li>client can not reach the zookeeper ensemble.</li> + * + * <li>client fails over from one zookeeper server to another (either + * because the zookeeper server is down or because it has become + * unreachable).</li> + * + * </ul> + * * Invoke the {@link QuorumClient#disconnected()} method to allow the * client to handle this in an application specific manner. */ private void handleDisconnected() { - log.error("ZOOKEEPER CLIENT DISCONNECTED: token=" + token()); - doNotifyClientDisconnected(); + log.warn("ZOOKEEPER CLIENT DISCONNECTED: token=" + token()); + /* + * Per above, DO NOTHING. + */ +// doNotifyClientDisconnected(); } private void doNotifyClientDisconnected() { Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-10-25 16:26:56 UTC (rev 7483) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-10-25 20:18:24 UTC (rev 7484) @@ -363,6 +363,20 @@ discoveryClient = new HAGlueServicesClient(serviceDiscoveryManager, null/* serviceDiscoveryListener */, cacheMissTimeout); + if (!isZookeeperRunning()) { + + /* + * Ensure that zookeeper is running. + * + * Note: Some unit tests will tear down the zookeeper server + * process. This ensures that the zookeeper server process is + * restarted before the next test runs. + */ + + startZookeeper(); + + } + // Setup quorum client. quorum = newQuorum(); @@ -1773,13 +1787,44 @@ processHelper.kill(true/* immediateShutdown */); - throw new RuntimeException("Could not start/locate service: name=" - + name + ", configFile=" + configFile, t); + throw new ServiceStartException( + "Could not start/locate service: name=" + name + + ", configFile=" + configFile, t); } } + + /** + * Exception thrown when the test harness could not start a service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + protected static class ServiceStartException extends RuntimeException { + /** + * + */ + private static final long serialVersionUID = 1L; + + public ServiceStartException() { + super(); + } + + public ServiceStartException(String message, Throwable cause) { + super(message, cause); + } + + public ServiceStartException(String message) { + super(message); + } + + public ServiceStartException(Throwable cause) { + super(cause); + } + + } + /** * Copy a file * @@ -2524,6 +2569,22 @@ // Wait until that service is ready to act as the leader. assertEquals(HAStatusEnum.Leader, awaitNSSAndHAReady(leader)); + simpleTransaction_noQuorumCheck(leader); + + } + + /** + * Immediately issues a simple transaction against the service. + * + * @param leader + * The service (must be the leader to succeed). + * + * @throws IOException + * @throws Exception + */ + protected void simpleTransaction_noQuorumCheck(final HAGlue leader) + throws IOException, Exception { + final StringBuilder sb = new StringBuilder(); sb.append("DROP ALL;\n"); sb.append("PREFIX dc: <http://purl.org/dc/elements/1.1/>\n"); @@ -2534,9 +2595,6 @@ final String updateStr = sb.toString(); - // Verify quorum is still valid. - quorum.assertQuorum(token); - getRemoteRepository(leader).prepareUpdate(updateStr).evaluate(); } @@ -2941,6 +2999,10 @@ * directory and start/stop the zookeeper process. When running these tests, * you must specify this property in order to execute tests that stop and * start the zookeeper process under test control. + * <p> + * Note: The <code>zkServer.sh </code> script DEPENDS on the pid file + * written by that script. If this gets out of whack, you will not be able + * to stop zookeeper using this method. */ protected void zkCommand(final String cmd) throws InterruptedException, IOException { @@ -2964,7 +3026,12 @@ final ProcessBuilder pb = new ProcessBuilder(shell, executable, cmd); pb.directory(binDir); final int exitCode = pb.start().waitFor(); - // Make sure that the command executed normally! + /* + * Make sure that the command executed normally! + * + * Note: exitCode=1 could mean that the pid file is no longer correct + * hence "stop" can not be processed. + */ assertEquals("exitCode=" + exitCode, 0, exitCode); // Wait for zk to start or stop. Thread.sleep(1000/* ms */); @@ -3000,7 +3067,8 @@ ZooHelper.ruok(localIpAddr, clientPort); running = true; } catch (Throwable t) { - log.warn("localIpAddr=" + localIpAddr + ":: t", t); + log.warn("localIpAddr=" + localIpAddr + ", clientPort=" + + clientPort + " :: " + t, t); } return running; } Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java 2013-10-25 16:26:56 UTC (rev 7483) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java 2013-10-25 20:18:24 UTC (rev 7484) @@ -344,7 +344,7 @@ // // if (leaderId1.equals(leaderId2)) { // /* -// * FIXME This fail message is not useful. +// * This fail message is not useful. // * leaderId1.equals(leaderId2). it should report what the leader // * *should* have been, but reports two identical values instead. // */ Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-10-25 16:26:56 UTC (rev 7483) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-10-25 20:18:24 UTC (rev 7484) @@ -33,6 +33,8 @@ import net.jini.config.Configuration; +import org.apache.zookeeper.ZooKeeper; + import com.bigdata.ha.HACommitGlue; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; @@ -42,7 +44,6 @@ import com.bigdata.journal.AbstractTask; import com.bigdata.journal.jini.ha.HAJournalTest.HAGlueTest; import com.bigdata.journal.jini.ha.HAJournalTest.SpuriousTestException; -import com.bigdata.quorum.AsynchronousQuorumCloseException; import com.bigdata.quorum.zk.ZKQuorumImpl; import com.bigdata.rdf.sail.webapp.client.RemoteRepository; import com.bigdata.util.ClocksNotSynchronizedException; @@ -927,179 +928,543 @@ // // } - /* - * FIXME Consider re-enabling these tests. Do we want to attempt to - * programmatically stop and start the zookeeper ensemble under test - * control? For true HA deployments, what would be much more interesting - * is to run an ensemble with 3 zk servers and then do failovers of the - * zk servers and see how that effects the HAJournalServer instances. + /** + * Verify ability to stop and restart the zookeeper process under test + * control. + * + * @throws InterruptedException + * @throws IOException */ -// public void testStartAB_StopStartZookeeper() throws Exception { -// -// doStartAB_StopStartZookeeper(); -// } -// -// public void testStartAB_StopStartZookeeperA() throws Exception { -// -// doStartAB_StopStartZookeeper(); -// } -// -// public void testStartAB_StopStartZookeeperB() throws Exception { -// -// doStartAB_StopStartZookeeper(); -// } -// -// public void testStartAB_StopStartZookeeperC() throws Exception { -// -// doStartAB_StopStartZookeeper(); -// } -// -// public void testStartAB_StopStartZookeeperD() throws Exception { -// -// doStartAB_StopStartZookeeper(); -// } -// -// public void testStartAB_StopStartZookeeperE() throws Exception { -// -// doStartAB_StopStartZookeeper(); -// } -// -// public void testStartAB_StopStartZookeeperF() throws Exception { -// -// doStartAB_StopStartZookeeper(); -// } -// -// public void testStartAB_StopStartZookeeperG() throws Exception { -// -// doStartAB_StopStartZookeeper(); -// } + public void testStopStartZookeeper() throws InterruptedException, + IOException { + + assertZookeeperRunning(); + stopZookeeper(); + try { + Thread.sleep(3000); + assertZookeeperNotRunning(); + } finally { + startZookeeper(); + assertZookeeperRunning(); + } + } + + /** + * Verify that the {@link HAJournalServer} dies if we attempt to start it + * and the zookeeper server process is not running. + * + * @throws Exception + */ + public void test_stopZookeeper_startA_fails() + throws Exception { + + stopZookeeper(); + + try { + startA(); + } catch (Throwable t) { + if (!InnerCause.isInnerCause(t, ServiceStartException.class)) { + fail("Expecting " + ServiceStartException.class, t); + } + } + + } - private void doStartAB_StopStartZookeeper() throws Exception { + /** + * Zookeeper is not required except when there are quorum state changes. + * Futher, the services will not notice that zookeeper is dead and will + * continue to process transactions successfully while zookeeper is dead. + * <p> + * Note: because this test stops and starts the zookeeper server process, + * the {@link ZooKeeper} client objects REMAIN VALID. They will not observe + * any state changes while the zookeeper server process is shutdown, but + * they will re-connect when the zookeeper server process comes back up and + * will be usable again once they reconnect. + * + * TODO We should be doing NSS status requests in these zk server stop / + * start tests to verify that we do not have any deadlocks or errors on the + * /status page. + */ + public void testAB_stopZookeeper_commit_startZookeeper_commit() + throws Exception { final HAGlue serverA = startA(); final HAGlue serverB = startB(); final long token1 = quorum.awaitQuorum(awaitQuorumTimeout, TimeUnit.MILLISECONDS); + assertEquals(0L, token1); + awaitNSSAndHAReady(serverA); awaitNSSAndHAReady(serverB); awaitCommitCounter(1L, serverA, serverB); // wait for the initial KB create. + // verify that we know which service is the leader. + awaitHAStatus(serverA, HAStatusEnum.Leader); // must be the leader. + awaitHAStatus(serverB, HAStatusEnum.Follower); // must be a follower. + /* - * Shutdown zookeeper. + * Note: Once we stop the zookeeper service, the test harness is not + * able to observe zookeeper change events until after zookeeper has + * been restarted. However, the ZooKeeper client connection should + * remain valid after zookeeper restarts. */ - final int negotiatedSessionTimeout = ((ZKQuorumImpl) quorum) - .getZookeeper().getSessionTimeout(); - ((HAGlueTest)serverA).log("WILL SHUTDOWN ZOOKEEPER"); + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); stopZookeeper(); - assertZookeeperNotRunning(); + // transaction should succeed. + simpleTransaction_noQuorumCheck(serverA); + + // restart zookeeper. + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + startZookeeper(); + simpleTransaction_noQuorumCheck(serverA); + /* - * Force transition into the error state. The service will not otherwise - * notice that zookeeper is unreachable (it only actively uses zookeeper - * when acting on the quorum or processing events from zookeeper). + * The quorum has remained met. No events are propagated while zookeeper + * is shutdown. Further, zookeepers only sense of "time" is its own + * ticks. If the ensemble is entirely shutdown, then "time" does not + * advance for zookeeper. + * + * Since the zookeeper server has been restarted, the ZkQuorumImpl for + * the test harness (and the HAJournalServers) will now notice quorum + * state changes again. */ - ((HAGlueTest)serverA).enterErrorState(); - ((HAGlueTest)serverB).enterErrorState(); - -// /* -// * Make sure that the clients know that they are disconnected (otherwise -// * this can take up to the zk session timeout). -// */ -// ((HAGlueTest)serverA).dropZookeeperConnection(); -// ((HAGlueTest)serverB).dropZookeeperConnection(); + final long token2 = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + assertEquals(token1, token2); + // transaction should succeed. + simpleTransaction(); + /* - * Sleep until the session timeout would have expired. This should cause - * the zk clients to notice that they are disconnected and trigger the - * error state transition. + * Now verify that C starts, resyncs, and that the quorum fully meets. + * These behaviors depend on zookeeper. */ - Thread.sleep(negotiatedSessionTimeout + 2000/* ms */); + final HAGlue serverC = startC(); + + final long token3 = awaitFullyMetQuorum(); + assertEquals(token1, token3); + + // A is still the leader. B and C are followers. + awaitHAStatus(serverA, HAStatusEnum.Leader); + awaitHAStatus(serverB, HAStatusEnum.Follower); + awaitHAStatus(serverC, HAStatusEnum.Follower); + + // transaction should succeed. + simpleTransaction(); + + } + + /** + * Test variant where we force a service into an error state when zookeeper + * is shutdown. This will cause all commits to fail since the service can + * not update the quorum state and the other services can not notice that + * the service is not part of the quorum. Eventually the leader will be in + * an error state, and maybe the other services as well. Once we restart + * zookeeper, the quorum should meet again and then fully meet. + */ + public void testABC_stopZookeeper_failC_startZookeeper_quorumMeetsAgain() + throws Exception { + + final HAGlue serverA = startA(); + final HAGlue serverB = startB(); + final HAGlue serverC = startC(); + + final long token1 = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + assertEquals(0L, token1); + + // wait for the initial KB create. + awaitNSSAndHAReady(serverA); + awaitNSSAndHAReady(serverB); + awaitNSSAndHAReady(serverC); + awaitCommitCounter(1L, serverA, serverB, serverC); + + // verify that we know which service is the leader. + awaitHAStatus(serverA, HAStatusEnum.Leader); // must be the leader. + awaitHAStatus(serverB, HAStatusEnum.Follower); // must be a follower. + awaitHAStatus(serverC, HAStatusEnum.Follower); // must be a follower. + /* - * FIXME Restore this block of code once AbstractQuorum.getClient() no - * longer contents for a lock to provide access to the client reference - * (right now these operations can contend that lock and block during - * doServiceLeave() in the ErrorTask). + * Note: Once we stop the zookeeper service, the test harness is not + * able to observe zookeeper change events until after zookeeper has + * been restarted. However, the ZooKeeper client connection should + * remain valid after zookeeper restarts. */ + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + ((HAGlueTest)serverC).log("MARK"); + stopZookeeper(); + + // transaction should succeed. + simpleTransaction_noQuorumCheck(serverA); + awaitCommitCounter(2L, serverA, serverB, serverC); + /* - * The services should be NotReady (their haReadyToken and haStatus - * should all have been cleared). + * Force C into an Error state. + * + * Note: C will remain in the met quorum because it is unable to act on + * the quorum. */ - awaitHAStatus(serverA, HAStatusEnum.NotReady); - awaitHAStatus(serverB, HAStatusEnum.NotReady); - if (log.isInfoEnabled()) { - log.info("A:: " + serverA.getExtendedRunState()); - log.info("B:: " + serverB.getExtendedRunState()); - } + ((HAGlueTest) serverC).enterErrorState(); + awaitHAStatus(serverA, HAStatusEnum.Leader); + awaitHAStatus(serverB, HAStatusEnum.Follower); + awaitHAStatus(serverC, HAStatusEnum.NotReady); /* - * Actually, we now stop the HAQuorumService, so the services are not in - * any RunStateEnum. + * Transaction will NOT fail. C will just drop the replicated writes. C + * will vote NO on the prepare (actually, it fails on the GATHER). C + * will not participate in the commit. But A and B will both go through + * the commit. */ -// /* -// * The services should be in the Error state. They can not leave that -// * state until we restart zookeeper, at which point they can finish -// * their actor.doServiceLeave(). -// */ -// awaitRunStateEnum(RunStateEnum.Error, serverA); -// awaitRunStateEnum(RunStateEnum.Error, serverB); + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + ((HAGlueTest)serverC).log("MARK"); + simpleTransaction_noQuorumCheck(serverA); + awaitCommitCounter(3L, serverA, serverB); + awaitCommitCounter(2L, serverC); // C did NOT commit. - log.warn("Start Zookeeper, met: " + quorum.isQuorumMet()); - ((HAGlueTest)serverA).log("WILL START ZOOKEEPER"); - ((HAGlueTest)serverB).log("WILL START ZOOKEEPER"); + // restart zookeeper. + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + ((HAGlueTest)serverC).log("MARK"); startZookeeper(); - assertZookeeperRunning(); -// final Future<Boolean> res2 = serverA.submit( -// new InvariantTask.ServiceDoesntJoin(getServiceAId()), true); -// -// // The task should note service join and return -// assertTrue(res2.get(30, TimeUnit.SECONDS)); + /* + * C was in the Error state while zookeeper was dead. The quorum state + * does not update until zookeeper is restarted. Once we restart + * zookeeper, it should eventually go into Resync and then join and be + * at the same commit point. + */ - final long token2 = awaitNextQuorumMeet(token1); + // wait until C becomes a follower again. + awaitHAStatus(20, TimeUnit.SECONDS, serverC, HAStatusEnum.Follower); + + // quorum should become fully met on the original token. + final long token2 = awaitFullyMetQuorum(); + assertEquals(token1, token2); + awaitHAStatus(serverA, HAStatusEnum.Leader); + awaitHAStatus(serverB, HAStatusEnum.Follower); + awaitHAStatus(serverC, HAStatusEnum.Follower); + awaitCommitCounter(3L, serverA, serverB, serverC); + + } + + /** + * Variant test where B is forced into the error state after we have + * shutdown zookeeper. C should continue to replicate the data to C and A+C + * should be able to go through commits. + */ + public void testABC_stopZookeeper_failB_startZookeeper_quorumMeetsAgain() + throws Exception { - log.warn("Met: " + quorum.isQuorumMet() + ", token: " + token2); + final HAGlue serverA = startA(); + final HAGlue serverB = startB(); + final HAGlue serverC = startC(); -// assertTrue(token2 != token1); - + final long token1 = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + assertEquals(0L, token1); + + // wait for the initial KB create. + awaitNSSAndHAReady(serverA); + awaitNSSAndHAReady(serverB); + awaitNSSAndHAReady(serverC); + awaitCommitCounter(1L, serverA, serverB, serverC); + awaitPipeline(new HAGlue[] { serverA, serverB, serverC }); + + // verify that we know which service is the leader. + awaitHAStatus(serverA, HAStatusEnum.Leader); // must be the leader. + awaitHAStatus(serverB, HAStatusEnum.Follower); // must be a follower. + awaitHAStatus(serverC, HAStatusEnum.Follower); // must be a follower. + + /* + * Note: Once we stop the zookeeper service, the test harness is not + * able to observe zookeeper change events until after zookeeper has + * been restarted. However, the ZooKeeper client connection should + * remain valid after zookeeper restarts. + */ + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + ((HAGlueTest)serverC).log("MARK"); + stopZookeeper(); + + // transaction should succeed. + simpleTransaction_noQuorumCheck(serverA); + awaitCommitCounter(2L, serverA, serverB, serverC); + + /* + * Force B into an Error state. + * + * Note: B will remain in the met quorum because it is unable to act on + * the quorum. + */ + ((HAGlueTest) serverB).enterErrorState(); + awaitHAStatus(serverA, HAStatusEnum.Leader); + awaitHAStatus(serverB, HAStatusEnum.NotReady); + awaitHAStatus(serverC, HAStatusEnum.Follower); + + /* + * Transaction will NOT fail. B will just drop the replicated writes. B + * will vote NO on the prepare (actually, it fails on the GATHER). B + * will not participate in the commit. But A and C will both go through + * the commit. + */ + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + ((HAGlueTest)serverC).log("MARK"); + simpleTransaction_noQuorumCheck(serverA); + awaitCommitCounter(3L, serverA, serverC); + awaitCommitCounter(2L, serverB); // B did NOT commit. + + // restart zookeeper. + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + ((HAGlueTest)serverC).log("MARK"); + startZookeeper(); + + /* + * B was in the Error state while zookeeper was dead. The quorum state + * does not update until zookeeper is restarted. Once we restart + * zookeeper, it should eventually go into Resync and then join and be + * at the same commit point. + * + * Note: B will be moved to the end of the pipeline when this happens. + */ + + // wait until B becomes a follower again. + awaitHAStatus(20, TimeUnit.SECONDS, serverB, HAStatusEnum.Follower); + + // pipeline is changed. + awaitPipeline(new HAGlue[] { serverA, serverC, serverB }); + + // quorum should become fully met on the original token. + final long token2 = awaitFullyMetQuorum(); + assertEquals(token1, token2); + awaitHAStatus(serverA, HAStatusEnum.Leader); + awaitHAStatus(serverB, HAStatusEnum.Follower); + awaitHAStatus(serverC, HAStatusEnum.Follower); + awaitCommitCounter(3L, serverA, serverB, serverC); + } - public void _testStressStartAB_StopStartZookeeper() throws AsynchronousQuorumCloseException, InterruptedException, TimeoutException { - for (int test = 0; test < 20; test++) { - log.warn("Starting run: " + test); - try { - doStartAB_StopStartZookeeper(); - } catch (Throwable t) { - fail("Run " + test, t); - } finally { - Thread.sleep(3000); // wait for local state to settle? - destroyAll(); - } - } - - Thread.sleep(3000); // wait for local state to settle? - } - /** - * Verify ability to stop and restart the zookeeper process under test - * control. + * Variant where we start A+B+C, stop zookeeper, then force A into an error + * state. The quorum will not accept writes (leader is in an error state), + * but B and C will still accept queries. When we restart zookeeper, a new + * quorum will form around (A,B,C). The new leader (which could be any + * service) will accept new transactions. There could be a race here between + * a fully met quorum and new transactions, but eventually all services will + * be joined with the met quorum and at the same commit point. * - * @throws InterruptedException - * @throws IOException + * FIXME This test fails. Figure out why. */ - public void testStopStartZookeeper() throws InterruptedException, - IOException { + public void testABC_stopZookeeper_failA_startZookeeper_quorumMeetsAgainOnNewToken() + throws Exception { + + final HAGlue serverA = startA(); + final HAGlue serverB = startB(); + final HAGlue serverC = startC(); - assertZookeeperRunning(); + final long token1 = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + assertEquals(0L, token1); + + // wait for the initial KB create. + awaitNSSAndHAReady(serverA); + awaitNSSAndHAReady(serverB); + awaitNSSAndHAReady(serverC); + awaitCommitCounter(1L, serverA, serverB, serverC); + awaitPipeline(new HAGlue[] { serverA, serverB, serverC }); + + // verify that we know which service is the leader. + awaitHAStatus(serverA, HAStatusEnum.Leader); // must be the leader. + awaitHAStatus(serverB, HAStatusEnum.Follower); // must be a follower. + awaitHAStatus(serverC, HAStatusEnum.Follower); // must be a follower. + + /* + * Note: Once we stop the zookeeper service, the test harness is not + * able to observe zookeeper change events until after zookeeper has + * been restarted. However, the ZooKeeper client connection should + * remain valid after zookeeper restarts. + */ + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + ((HAGlueTest)serverC).log("MARK"); stopZookeeper(); + + // transaction should succeed. + simpleTransaction_noQuorumCheck(serverA); + awaitCommitCounter(2L, serverA, serverB, serverC); + + /* + * Force A into an Error state. + * + * Note: A will remain in the met quorum because it is unable to act on + * the quorum. However, it will no longer be a "Leader". The other + * services will remain "Followers" since zookeeper is not running and + * watcher events are not being triggered. + */ + ((HAGlueTest) serverA).enterErrorState(); + awaitHAStatus(serverA, HAStatusEnum.NotReady); + awaitHAStatus(serverB, HAStatusEnum.Follower); + awaitHAStatus(serverC, HAStatusEnum.Follower); + + /* + * Transaction will fail. + */ + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + ((HAGlueTest)serverC).log("MARK"); try { - Thread.sleep(3000); - assertZookeeperNotRunning(); - } finally { - startZookeeper(); - assertZookeeperRunning(); + simpleTransaction_noQuorumCheck(serverA); + fail("Expecting transaction to fail"); + } catch (Exception ex) { + log.warn("Ignoring expected exception: " + ex, ex); } + + /* + * Can query B/C. + */ + for (HAGlue service : new HAGlue[] { serverB, serverC }) { + + final RemoteRepository repo = getRemoteRepository(service); + + assertEquals( + 1L, + countResults(repo.prepareTupleQuery( + "SELECT (count(*) as ?count) {?a ?b ?c}") + .evaluate())); + + } + + // restart zookeeper. + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + ((HAGlueTest)serverC).log("MARK"); + startZookeeper(); + + /* + * A was in the Error state while zookeeper was dead. The quorum state + * does not update until zookeeper is restarted. Once we restart + * zookeeper, it should eventually go into Resync and then join and be + * at the same commit point. + * + * Note: Since A was the leader, the quorum will break and then reform. + * The services could be in any order in the new quorum. + */ + + // wait for the next quorum meet. + awaitNextQuorumMeet(token1); + + // wait until the quorum is fully met. + final long token2 = awaitFullyMetQuorum(6); + assertEquals(token1 + 1, token2); + + // Still 2 commit points. + awaitCommitCounter(2L, serverA, serverB, serverC); + + // New transactions now succeed. + simpleTransaction(); + + awaitCommitCounter(3L, serverA, serverB, serverC); + } + + /** + * Variant test where we start A+B, stop zookeeper, then force B into an + * error state. B will refuse to commit, which will cause A to fail the + * transaction. This should push A into an error state. When we restart + * zookeeper the quorum should break and then reform (in some arbitrary) + * order. Services should be at the last recorded commit point. New + * transactions should be accepted. FIXME Test fails. Figure out why. + */ + public void testAB_stopZookeeper_failB_startZookeeper_quorumBreaksThenMeets() + throws Exception { + + final HAGlue serverA = startA(); + final HAGlue serverB = startB(); + final long token1 = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + assertEquals(0L, token1); + + // wait for the initial KB create. + awaitNSSAndHAReady(serverA); + awaitNSSAndHAReady(serverB); + awaitCommitCounter(1L, serverA, serverB); + awaitPipeline(new HAGlue[] { serverA, serverB }); + + // verify that we know which service is the leader. + awaitHAStatus(serverA, HAStatusEnum.Leader); // must be the leader. + awaitHAStatus(serverB, HAStatusEnum.Follower); // must be a follower. + + /* + * Note: Once we stop the zookeeper service, the test harness is not + * able to observe zookeeper change events until after zookeeper has + * been restarted. However, the ZooKeeper client connection should + * remain valid after zookeeper restarts. + */ + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + stopZookeeper(); + + // transaction should succeed. + simpleTransaction_noQuorumCheck(serverA); + awaitCommitCounter(2L, serverA, serverB); + + /* + * Force B into an Error state. + * + * Note: B will remain in the met quorum because it is unable to act on + * the quorum. + */ + ((HAGlueTest) serverB).enterErrorState(); + awaitHAStatus(serverA, HAStatusEnum.Leader); + awaitHAStatus(serverB, HAStatusEnum.NotReady); + + /* + * Transaction will fail + */ + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + try { + simpleTransaction_noQuorumCheck(serverA); + fail("expecting transaction to fail"); + } catch (Exception ex) { + log.warn("Ignoring expected exception: " + ex, ex); + } + awaitCommitCounter(2L, serverA, serverB); // commit counter unchanged + awaitHAStatus(serverA, HAStatusEnum.Leader); + awaitHAStatus(serverB, HAStatusEnum.NotReady); + + // restart zookeeper. + ((HAGlueTest)serverA).log("MARK"); + ((HAGlueTest)serverB).log("MARK"); + startZookeeper(); + + /* + * Quorum should break and then meet again on a new token. We do not + * know which server will wind up as the leader. It could be either one. + */ + + awaitNextQuorumMeet(token1); + + // still at the same commit point. + awaitCommitCounter(2L, serverA, serverB); +// awaitNSSAndHAReady(serverA); +// awaitNSSAndHAReady(serverB); + + // transactions are now accepted. + simpleTransaction(); + + // commit counter advances. + awaitCommitCounter(3L, serverA, serverB); + + } + /** * Attempt to start a service. Once it is running, request a thread dump and * then issue a sure kill - both of these operations are done using a SIGNAL Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2013-10-25 16:26:56 UTC (rev 7483) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2013-10-25 20:18:24 UTC (rev 7484) @@ -37,12 +37,10 @@ import org.apache.log4j.Logger; -import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; -import com.bigdata.ha.QuorumService; +import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.IIndexManager; -import com.bigdata.journal.Journal; -import com.bigdata.quorum.Quorum; +import com.bigdata.quorum.AbstractQuorum; import com.bigdata.rdf.sail.webapp.client.IMimeTypes; /** @@ -116,24 +114,44 @@ } +// /** +// * Return the {@link Quorum} -or- <code>null</code> if the +// * {@link IIndexManager} is not participating in an HA {@link Quorum}. +// */ +// protected Quorum<HAGlue, QuorumService<HAGlue>> getQuorum() { +// +// final IIndexManager indexManager = getIndexManager(); +// +// if (indexManager instanceof Journal) { +// +// return ((Journal) indexManager).getQuorum(); +// +// } +// +// return null; +// +// } + /** - * Return the {@link Quorum} -or- <code>null</code> if the - * {@link IIndexManager} is not participating in an HA {@link Quorum}. + * Return the {@link HAStatusEnum} -or- <code>null</code> if the + * {@link IIndexManager} is not an {@link AbstractQuorum} or is not HA + * enabled. */ - protected Quorum<HAGlue, QuorumService<HAGlue>> getQuorum() { - + protected HAStatusEnum getHAStatus() { + final IIndexManager indexManager = getIndexManager(); - if (indexManager instanceof Journal) { + if (indexManager instanceof AbstractJournal) { - return ((Journal) indexManager).getQuorum(); + // Note: Invocation against local object (NOT RMI). + return ((AbstractJournal) indexManager).getHAStatus(); } return null; - - } - + + } + /** * If the node is not writable, then commit a response and return * <code>false</code>. Otherwise return <code>true</code>. @@ -148,40 +166,11 @@ protected boolean isWritable(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = getQuorum(); - - if (quorum == null) { - + final HAStatusEnum haStatus = getHAStatus(); + if (haStatus == null) { // No quorum. return true; - } - - /* - * Note: This is a summary of what we need to look at to proxy a - * request. - * - * Note: We need to proxy PUT, POST, DELETE requests to the quorum - * leader. - * - * Note: If an NSS service is NOT joined with a met quorum, but there is - * a met quorum, then we should just proxy the request to the met - * quorum. This includes both reads and writes. - */ - -// req.getMethod(); -// final Enumeration<String> names = req.getHeaderNames(); -// while(names.hasMoreElements()) { -// req.getHeaders(names.nextElement()); -// } -// req.getInputStream(); - - // Note: Response also has status code plus everything above. - - // Note: Invocation against local HAGlue object (NOT RMI). - final HAStatusEnum haStatus = getQuorum().getClient().getService() - .getHAStatus(); - switch (haStatus) { case Leader: return true; @@ -209,19 +198,11 @@ protected boolean isReadable(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = getQuorum(); - - if (quorum == null) { - + final HAStatusEnum haStatus = getHAStatus(); + if (haStatus == null) { // No quorum. return true; - } - - // Note: Invocation against local HAGlue object (NOT RMI). - final HAStatusEnum haStatus = getQuorum().getClient().getService() - .getHAStatus(); - switch (haStatus) { case Leader: case Follower: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-10-25 16:27:04
|
Revision: 7483 http://bigdata.svn.sourceforge.net/bigdata/?rev=7483&view=rev Author: mrpersonick Date: 2013-10-25 16:26:56 +0000 (Fri, 25 Oct 2013) Log Message: ----------- fixed ticket 761: Cardinality problem with ArbitraryLengthPathOp Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestPropertyPaths.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-1.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-1.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.ttl branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-3.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-3.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-4.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-5.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-6.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-6.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.ttl branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths.ttl Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java 2013-10-25 13:20:15 UTC (rev 7482) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java 2013-10-25 16:26:56 UTC (rev 7483) @@ -27,10 +27,12 @@ package com.bigdata.bop.paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -265,9 +267,10 @@ while (sitr.hasNext()) { final IBindingSet[] chunk = sitr.next(); - - for (IBindingSet bs : chunk) - processChunk(new IBindingSet[] { bs }); + processChunk(chunk); + +// for (IBindingSet bs : chunk) +// processChunk(new IBindingSet[] { bs }); } @@ -320,12 +323,51 @@ log.debug("gearing: " + gearing); } + final boolean noInput = chunkIn == null || chunkIn.length == 0 || + (chunkIn.length == 1 && chunkIn[0].isEmpty()); + + final IVariable<?> joinVar = gearing.inVar != null ? + gearing.inVar : + (gearing.outVar != null ? gearing.outVar : null); + + if (log.isDebugEnabled()) { + log.debug("join var: " + joinVar); + } + + /* + * Fix cardinality problem here + */ + final Map<IConstant<?>, List<IBindingSet>> chunkInBySolutionKey = + noInput ? null : + new LinkedHashMap<IConstant<?>, List<IBindingSet>>(); + + if (!noInput) { + + for (IBindingSet parentSolutionIn : chunkIn) { + + final IConstant<?> key = joinVar != null ? parentSolutionIn.get(joinVar) : null; //newSolutionKey(gearing, parentSolutionIn); + + if (log.isDebugEnabled()) { + log.debug("adding parent solution for joining: " + parentSolutionIn); + log.debug("join key: " + key); + } + + if (!chunkInBySolutionKey.containsKey(key)) { + chunkInBySolutionKey.put(key, new ArrayList<IBindingSet>()); + } + + chunkInBySolutionKey.get(key).add(parentSolutionIn); + + } + + } + for (IBindingSet parentSolutionIn : chunkIn) { if (log.isDebugEnabled()) log.debug("parent solution in: " + parentSolutionIn); - IBindingSet childSolutionIn = parentSolutionIn.clone(); + final IBindingSet childSolutionIn = parentSolutionIn.clone(); /* * The seed is either a constant on the input side of @@ -405,6 +447,9 @@ try { + /* + * TODO replace with code that does the PipelineJoins manually + */ runningSubquery = queryEngine.eval(subquery, nextRoundInput.toArray(new IBindingSet[nextRoundInput.size()])); @@ -550,103 +595,322 @@ } } // fixed point for loop - + /* - * Do some final filtering and then send the solutions - * down the pipeline. + * Handle the case where there is a constant on the output side of + * the subquery. Make sure the solution's transitive output + * variable matches. Filter out solutions where tVarOut != outConst. */ - final Iterator<Map.Entry<SolutionKey, IBindingSet>> it = - solutionsOut.entrySet().iterator(); - - while (it.hasNext()) { - - final Map.Entry<SolutionKey, IBindingSet> entry = it.next(); - - final IBindingSet bs = entry.getValue(); - - if (log.isDebugEnabled()) { - log.debug("considering possible solution: " + bs); - } - - if (gearing.outConst != null) { - + if (gearing.outConst != null) { + + final Iterator<Map.Entry<SolutionKey, IBindingSet>> it = + solutionsOut.entrySet().iterator(); + + while (it.hasNext()) { + + final IBindingSet bs = it.next().getValue(); + /* - * Handle the case where there is a constant on the - * output side of the subquery. Make sure the - * solution's transitive output variable matches. */ if (!bs.get(gearing.tVarOut).equals(gearing.outConst)) { if (log.isDebugEnabled()) { log.debug("transitive output does not match output const, dropping"); + log.debug(bs.get(gearing.tVarOut)); + log.debug(gearing.outConst); } it.remove(); + } + + } + + } + + if (lowerBound == 0 && (gearing.inVar != null && gearing.outVar != null)) { + + final Map<SolutionKey, IBindingSet> zlps = new LinkedHashMap<SolutionKey, IBindingSet>(); + + for (IBindingSet bs : solutionsOut.values()) { + + // is this right?? + if (bs.isBound(gearing.outVar)) { + continue; } + + { // left to right + + final IBindingSet zlp = bs.clone(); + + zlp.set(gearing.tVarOut, zlp.get(gearing.inVar)); + + final SolutionKey key = newSolutionKey(gearing, zlp); + + if (!solutionsOut.containsKey(key)) { + + zlps.put(key, zlp); + + } + + } + + { // right to left + + final IBindingSet zlp = bs.clone(); + + zlp.set(gearing.inVar, zlp.get(gearing.tVarOut)); + + final SolutionKey key = newSolutionKey(gearing, zlp); + + if (!solutionsOut.containsKey(key)) { + + zlps.put(key, zlp); + + } + + } + + } + + solutionsOut.putAll(zlps); + + } + + /* + * We can special case when there was no input (and + * thus no join is required). + */ + if (noInput) { - } else { // outVar != null - - /* - * Handle the case where the gearing.outVar was bound - * coming in. Again, make sure it matches the - * transitive output variable. - */ - if (bs.isBound(gearing.outVar)) { + for (IBindingSet bs : solutionsOut.values()) { + + /* + * Set the binding for the outVar if necessary. + */ + if (gearing.outVar != null) { - if (!bs.get(gearing.tVarOut).equals(bs.get(gearing.outVar))) { - - if (log.isDebugEnabled()) { - log.debug("transitive output does not match incoming binding for output var, dropping"); - } - - it.remove(); - - continue; - - } - - } else { - - /* - * Handle the normal case - when we simply - * need to copy the transitive output over to - * the real output. - */ bs.set(gearing.outVar, bs.get(gearing.tVarOut)); } + /* + * Clear the intermediate variables before sending the output + * down the pipeline. + */ + bs.clear(gearing.tVarIn); + bs.clear(gearing.tVarOut); + } - if (log.isDebugEnabled()) { - log.debug("solution accepted"); - } + final IBindingSet[] chunkOut = + solutionsOut.values().toArray( + new IBindingSet[solutionsOut.size()]); + + if (log.isDebugEnabled()) { + log.debug("final output to sink:\n" + Arrays.toString(chunkOut).replace("}, ", "},\n")); + } + + // copy accepted binding sets to the default sink. + context.getSink().add(chunkOut); - /* - * Should we drop the intermediate variables now? - */ - bs.clear(gearing.tVarIn); - bs.clear(gearing.tVarOut); - - } + } else { - final IBindingSet[] chunkOut = - solutionsOut.values().toArray( - new IBindingSet[solutionsOut.size()]); - - if (log.isDebugEnabled()) { - log.debug("final output to sink:\n" + Arrays.toString(chunkOut)); + final ArrayList<IBindingSet> finalOutput = new ArrayList<IBindingSet>(); + + final Iterator<Map.Entry<SolutionKey, IBindingSet>> it = + solutionsOut.entrySet().iterator(); + + while (it.hasNext()) { + + final Map.Entry<SolutionKey, IBindingSet> entry = it.next(); + + final IBindingSet bs = entry.getValue(); + + if (log.isDebugEnabled()) { + log.debug("considering possible solution: " + bs); + } + + final IConstant<?> key = joinVar != null ? bs.get(joinVar) : null; + + if (key != null && chunkInBySolutionKey.containsKey(key)) { + + final List<IBindingSet> parentSolutionsIn = chunkInBySolutionKey.get(key); + + if (log.isDebugEnabled()) { + log.debug("join key: " + key); + log.debug("parent solutions to join: " + parentSolutionsIn); + } + + for (IBindingSet parentSolutionIn : parentSolutionsIn) { + + if (gearing.outConst != null) { + + /* + * No need to clone, since we are not modifying the + * incoming parent solution in this case. The ALP + * is simply acting as a filter. + */ + finalOutput.add(parentSolutionIn); + + } else { // outVar != null + + /* + * Handle the case where the gearing.outVar was bound + * coming in. Again, make sure it matches the + * transitive output variable. + */ + if (parentSolutionIn.isBound(gearing.outVar)) { + + // do this later now + + if (!bs.get(gearing.tVarOut).equals(parentSolutionIn.get(gearing.outVar))) { + + if (log.isDebugEnabled()) { + log.debug("transitive output does not match incoming binding for output var, dropping"); + } + + continue; + + } else { + + /* + * No need to clone, since we are not modifying the + * incoming parent solution in this case. The ALP + * is simply acting as a filter. + */ + finalOutput.add(parentSolutionIn); + + } + + } else { + + /* + * Handle the normal case - when we simply + * need to copy the transitive output over to + * the real output. + */ + // bs.set(gearing.outVar, bs.get(gearing.tVarOut)); + + /* + * Clone, modify, and accept. + */ + final IBindingSet join = parentSolutionIn.clone(); + + join.set(gearing.outVar, bs.get(gearing.tVarOut)); + + finalOutput.add(join); + + } + + } + + if (log.isDebugEnabled()) { + log.debug("solution accepted"); + } + + } + + } + + /* + * Always do the null solutions if there are any. + */ + if (chunkInBySolutionKey.containsKey(null)) { + + /* + * Join the null solutions. These solutions represent + * a cross product (no shared variables with the ALP node). + */ + final List<IBindingSet> nullSolutions = chunkInBySolutionKey.get(null); + + if (log.isDebugEnabled()) { + log.debug("null solutions to join: " + nullSolutions); + } + + for (IBindingSet nullSolution : nullSolutions) { + + final IBindingSet solution; + + // if ((gearing.inVar != null && !nullSolution.isBound(gearing.inVar)) || + // (gearing.outVar != null && !nullSolution.isBound(gearing.outVar))) { + if (gearing.inVar != null || gearing.outVar != null) { + + solution = nullSolution.clone(); + + } else { + + solution = nullSolution; + + } + + if (gearing.inVar != null) { + + if (solution.isBound(gearing.inVar)) { + + /* + * This should never happen. + */ + throw new RuntimeException(); + + } else { + + solution.set(gearing.inVar, bs.get(gearing.inVar)); + + } + + } + + if (gearing.outVar != null) { + + if (solution.isBound(gearing.outVar)) { + + /* + * This should never happen. + */ + throw new RuntimeException(); + // if (!bs.get(gearing.tVarOut).equals(solution.get(gearing.outVar))) { + // + // // discard this solution; + // continue; + // + // } + + } else { + + solution.set(gearing.outVar, bs.get(gearing.tVarOut)); + + } + + } + + finalOutput.add(solution); + + if (log.isDebugEnabled()) { + log.debug("solution accepted"); + } + + } + + } + + } + + final IBindingSet[] chunkOut = finalOutput.toArray(new IBindingSet[finalOutput.size()]); + // solutionsOut.values().toArray( + // new IBindingSet[solutionsOut.size()]); + + if (log.isDebugEnabled()) { + log.debug("final output to sink:\n" + Arrays.toString(chunkOut).replace("}, ", "},\n")); + } + + // copy accepted binding sets to the default sink. + context.getSink().add(chunkOut); + } - - // copy accepted binding sets to the default sink. - context.getSink().add(chunkOut); - // done. -// return runningSubquery; - } // processChunk method /** Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestPropertyPaths.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestPropertyPaths.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestPropertyPaths.java 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,152 @@ +/** + +Copyright (C) SYSTAP, LLC 2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.rdf.sparql.ast.eval; + + + +public class TestPropertyPaths extends AbstractDataDrivenSPARQLTestCase { + + /** + * + */ + public TestPropertyPaths() { + } + + /** + * @param name + */ + public TestPropertyPaths(String name) { + super(name); + } + +// private void property_path_test(String name) throws Exception { +// +// new TestHelper( +// "property-path-734-" + name, // testURI, +// "property-path-734-" + name + ".rq", // queryFileURL +// "property-path-734.ttl", // dataFileURL +// "property-path-734.srx" // resultFileURL, +// ).runTest(); +// } +// +// private void property_path_using_workaround_test(String name) throws Exception { +// +// new TestHelper( +// "property-path-734-B-" + name, // testURI, +// "property-path-734-B-" + name + ".rq", // queryFileURL +// "property-path-734-B.ttl", // dataFileURL +// "property-path-734-B.srx" // resultFileURL, +// ).runTest(); +// } + + public void test_inVar_outConst_notBound() throws Exception { + + new TestHelper( + "property-paths", // testURI, + "property-paths-1.rq", // queryFileURL + "property-paths-2.ttl", // dataFileURL + "property-paths-1.srx" // resultFileURL, + ).runTest(); + + } + + public void test_inVar_outConst_inBound() throws Exception { + + new TestHelper( + "property-paths", // testURI, + "property-paths-1.rq", // queryFileURL + "property-paths.ttl", // dataFileURL + "property-paths-1.srx" // resultFileURL, + ).runTest(); + + } + + public void test_inVar_outVar_inBound() throws Exception { + + new TestHelper( + "property-paths", // testURI, + "property-paths-2.rq", // queryFileURL + "property-paths.ttl", // dataFileURL + "property-paths-2.srx" // resultFileURL, + ).runTest(); + + } + + public void test_inVar_outVar_outBound() throws Exception { + + new TestHelper( + "property-paths", // testURI, + "property-paths-3.rq", // queryFileURL + "property-paths-2.ttl", // dataFileURL + "property-paths-3.srx" // resultFileURL, + ).runTest(); + + } + + public void test_inVar_outVar_bothBound() throws Exception { + + new TestHelper( + "property-paths", // testURI, + "property-paths-3.rq", // queryFileURL + "property-paths.ttl", // dataFileURL + "property-paths-3.srx" // resultFileURL, + ).runTest(); + + } + + public void test_inConst_outConst() throws Exception { + + new TestHelper( + "property-paths", // testURI, + "property-paths-4.rq", // queryFileURL + "property-paths.ttl", // dataFileURL + "property-paths-3.srx" // resultFileURL, + ).runTest(); + + } + + public void test_inVar_outVar_noSharedVars() throws Exception { + + new TestHelper( + "property-paths", // testURI, + "property-paths-6.rq", // queryFileURL + "property-paths.ttl", // dataFileURL + "property-paths-6.srx" // resultFileURL, + ).runTest(); + + } + + public void test_inVar_outVar_someSharedVars() throws Exception { + + new TestHelper( + "property-paths", // testURI, + "property-paths-7.rq", // queryFileURL + "property-paths-7.ttl", // dataFileURL + "property-paths-7.srx" // resultFileURL, + ).runTest(); + + } + +} Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestPropertyPaths.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-1.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-1.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-1.rq 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,11 @@ + +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> + +SELECT ?A ?val +WHERE { + ?A <os:prop> <os:P> . + ?A rdf:value ?val . + ?A rdf:type / rdfs:subClassOf * + <os:ClassA> ; +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-1.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-1.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-1.srx 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,35 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="A"/> + </head> + <results> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.rq 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,10 @@ + +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> + +SELECT ?A ?val ?type +WHERE { + ?A <os:prop> <os:P> . + ?A rdf:value ?val . + ?A rdf:type / rdfs:subClassOf * ?type . +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.srx 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,55 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="A"/> + </head> + <results> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type"> + <uri>os:ClassB</uri> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.ttl =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.ttl (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-2.ttl 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,26 @@ +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . + +<os:0> rdf:type <os:ClassA> . +<os:1> rdf:type <os:ClassB> . +<os:2> rdf:type <os:ClassC> . +<os:3> rdf:type <os:ClassD> . +<os:4> rdf:type <os:ClassE> . + +<os:ClassB> rdfs:subClassOf <os:ClassA> . +<os:ClassC> rdfs:subClassOf <os:ClassA> . +<os:ClassD> rdfs:subClassOf <os:ClassB> . +<os:ClassE> rdfs:subClassOf <os:ClassX> . + +<os:0> <os:prop> <os:P>. +<os:1> <os:prop> <os:P>. +<os:2> <os:prop> <os:Q>. +<os:3> <os:prop> <os:Q>. +<os:4> <os:prop> <os:P>. + +<os:0> rdf:value "x" . +<os:0> rdf:value "y" . +<os:1> rdf:value "z" . +<os:2> rdf:value "foo" . +<os:3> rdf:value "foo" . +<os:4> rdf:value "foo" . Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-3.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-3.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-3.rq 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,13 @@ + +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> + +SELECT ?A ?val ?type +WHERE { + ?A <os:prop> <os:P> . + ?A rdf:value ?val . + ?A rdf:type / rdfs:subClassOf * ?type . +} +BINDINGS ?type { + (<os:ClassA>) +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-3.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-3.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-3.srx 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,44 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="A"/> + </head> + <results> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type"> + <uri>os:ClassA</uri> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-4.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-4.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-4.rq 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,14 @@ + +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> + +SELECT ?A ?val ?type +WHERE { + <os:ClassB> rdfs:subClassOf * <os:ClassA> . + ?A <os:prop> <os:P> . + ?A rdf:value ?val . + ?A rdf:type / rdfs:subClassOf * ?type . +} +BINDINGS ?type { + (<os:ClassA>) +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-5.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-5.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-5.rq 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,12 @@ + +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> + +SELECT ?A ?val ?type +WHERE { + hint:Group hint:optimizer "None" . + <os:ClassB> rdfs:subClassOf * <os:ClassA> . + ?A <os:prop> <os:P> . + ?A rdf:value ?val . + ?A rdf:type / rdfs:subClassOf * ?type . +} \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-6.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-6.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-6.rq 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,10 @@ + +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> + +SELECT ?A ?val ?type1 ?type2 +WHERE { + ?A <os:prop> <os:P> . + ?A rdf:value ?val . + ?type1 rdfs:subClassOf * ?type2 . +} \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-6.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-6.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-6.srx 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,477 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="A"/> + </head> + <results> + + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassC</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassC</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassA</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassC</uri> + </binding> + <binding name="type2"> + <uri>os:ClassC</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassD</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassE</uri> + </binding> + </result> + + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassC</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassC</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassA</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassC</uri> + </binding> + <binding name="type2"> + <uri>os:ClassC</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassD</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassE</uri> + </binding> + </result> + + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassC</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassC</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassA</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassC</uri> + </binding> + <binding name="type2"> + <uri>os:ClassC</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassD</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassE</uri> + </binding> + </result> + + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.rq 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,16 @@ + +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> + +SELECT ?A ?val ?type1 ?type2 +WITH { + select ?A ?val ?type1 where { + ?A <os:prop> <os:P> . + ?A rdf:value ?val . + optional { ?A rdf:type ?type1 } . + } +} as %ns1 +WHERE { + include %ns1 . + ?type1 rdfs:subClassOf * ?type2 . +} \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.srx 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,351 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="A"/> + </head> + <results> + + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:1</uri> + </binding> + <binding name="val"> + <literal>z</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassC</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassC</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassA</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassC</uri> + </binding> + <binding name="type2"> + <uri>os:ClassC</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassD</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>x</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassE</uri> + </binding> + </result> + + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassC</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassC</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassA</uri> + </binding> + <binding name="type2"> + <uri>os:ClassA</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassB</uri> + </binding> + <binding name="type2"> + <uri>os:ClassB</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassC</uri> + </binding> + <binding name="type2"> + <uri>os:ClassC</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassD</uri> + </binding> + <binding name="type2"> + <uri>os:ClassD</uri> + </binding> + </result> + <result> + <binding name="A"> + <uri>os:0</uri> + </binding> + <binding name="val"> + <literal>y</literal> + </binding> + <binding name="type1"> + <uri>os:ClassE</uri> + </binding> + <binding name="type2"> + <uri>os:ClassE</uri> + </binding> + </result> + + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.ttl =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.ttl (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths-7.ttl 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,20 @@ +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . + +#<os:0> rdf:type <os:ClassA> . +<os:1> rdf:type <os:ClassB> . +<os:2> rdf:type <os:ClassC> . +<os:3> rdf:type <os:ClassD> . +<os:4> rdf:type <os:ClassE> . + +<os:ClassB> rdfs:subClassOf <os:ClassA> . +<os:ClassC> rdfs:subClassOf <os:ClassA> . +<os:ClassD> rdfs:subClassOf <os:ClassB> . +<os:ClassE> rdfs:subClassOf <os:ClassC> . + +<os:0> <os:prop> <os:P>. +<os:1> <os:prop> <os:P>. + +<os:0> rdf:value "x" . +<os:0> rdf:value "y" . +<os:1> rdf:value "z" . Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths.ttl =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths.ttl (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/property-paths.ttl 2013-10-25 16:26:56 UTC (rev 7483) @@ -0,0 +1,20 @@ +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . + +<os:0> rdf:type <os:ClassA> . +<os:1> rdf:type <os:ClassB> . +<os:2> rdf:type <os:ClassC> . +<os:3> rdf:type <os:ClassD> . +<os:4> rdf:type <os:ClassE> . + +<os:ClassB> rdfs:subClassOf <os:ClassA> . +<os:ClassC> rdfs:subClassOf <os:ClassA> . +<os:ClassD> rdfs:subClassOf <os:ClassB> . +<os:ClassE> rdfs:subClassOf <os:ClassC> . + +<os:0> <os:prop> <os:P>. +<os:1> <os:prop> <os:P>. + +<os:0> rdf:value "x" . +<os:0> rdf:value "y" . +<os:1> rdf:value "z" . This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-25 13:20:22
|
Revision: 7482 http://bigdata.svn.sourceforge.net/bigdata/?rev=7482&view=rev Author: thompsonbry Date: 2013-10-25 13:20:15 +0000 (Fri, 25 Oct 2013) Log Message: ----------- Cleaning up test suite. - assertLogCount() => awaitLogCount() since there is non-determinism. Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2013-10-24 22:40:05 UTC (rev 7481) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2013-10-25 13:20:15 UTC (rev 7482) @@ -1176,29 +1176,52 @@ } private Iterator<File> getLogs(final File f, final FileFilter fileFilter) { - ArrayList<File> files = new ArrayList<File>(); - - recursiveAdd(files, f, fileFilter); - - return files.iterator(); + + final ArrayList<File> files = new ArrayList<File>(); + + recursiveAdd(files, f, fileFilter); + + return files.iterator(); + } - protected void assertLogCount(final File logDir, final long count) { + protected void awaitLogCount(final File logDir, final long count) { + assertCondition(new Runnable() { + public void run() { + assertLogCount(logDir, count); + } + }, 5000, TimeUnit.MILLISECONDS); + + } + + /** + * Note: There is typically some non-determinism around when an HALog file + * is closed and a new one is opened in a 2-phase commit. Therefore you + * should generally use {@link #awaitLogCount(File, long)} rather than this + * method. + * + * @param logDir + * @param count + */ + private void assertLogCount(final File logDir, final long count) { + final long actual = recursiveCount(logDir, IHALogReader.HALOG_FILTER); - + if (actual != count) { - - final Iterator<File> logs = getLogs(logDir, IHALogReader.HALOG_FILTER); - StringBuilder fnmes = new StringBuilder(); - while (logs.hasNext()) { - fnmes.append("\n" + logs.next().getName()); - } - - fail("Actual log files: " + actual + ", expected: " + count + ", files: " + fnmes); - + + final Iterator<File> logs = getLogs(logDir, + IHALogReader.HALOG_FILTER); + StringBuilder fnmes = new StringBuilder(); + while (logs.hasNext()) { + fnmes.append("\n" + logs.next().getName()); + } + + fail("Actual log files: " + actual + ", expected: " + count + + ", files: " + fnmes); + } } - + } Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-10-24 22:40:05 UTC (rev 7481) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-10-25 13:20:15 UTC (rev 7482) @@ -1688,42 +1688,66 @@ */ public void testStartABC_halog() throws Exception { - // Start 3 services - startA(); - startB(); - startC(); - - awaitFullyMetQuorum(); - // setup log directories final File logsA = getHALogDirA(); final File logsB = getHALogDirB(); final File logsC = getHALogDirC(); + /* + * Start 2 services and wait for first commit point. The HALogs will not + * be purged because the quorum is not fully met. + */ + HAGlue serverA = startA(); + HAGlue serverB = startB(); + final long token = awaitMetQuorum(); + // initial token value. + assertEquals(0L, token); + awaitCommitCounter(1L, serverA, serverB); + awaitLogCount(logsA, 2); + awaitLogCount(logsB, 2); + + /* + * Start next service. The service will resync and join with the met + * quorum. The HALog files will not be purged - they do not get purged + * until the fully met quorum goes through a commit point. + */ + HAGlue serverC = startC(); + final long token2 = awaitFullyMetQuorum(); + assertEquals(token2,token); + awaitCommitCounter(1L, serverA, serverB, + serverC); + // committed log files not purged prior to fully met commit - assertLogCount(logsA, 2); - assertLogCount(logsB, 2); - assertLogCount(logsC, 2); + awaitLogCount(logsA, 2); + awaitLogCount(logsB, 2); + awaitLogCount(logsC, 2); + + // + // Above here the code is shared with halogRestart() + // // Run through transaction simpleTransaction(); // again check that only open log files remaining - assertLogCount(logsA, 1); - assertLogCount(logsB, 1); - assertLogCount(logsC, 1); + awaitLogCount(logsA, 1); + awaitLogCount(logsB, 1); + awaitLogCount(logsC, 1); assertEquals(1L, recursiveCount(getHALogDirA(),IHALogReader.HALOG_FILTER)); assertEquals(1L, recursiveCount(getHALogDirB(),IHALogReader.HALOG_FILTER)); assertEquals(1L, recursiveCount(getHALogDirC(),IHALogReader.HALOG_FILTER)); // Now run several transactions - for (int i = 0; i < 5; i++) - simpleTransaction(); + for (int i = 0; i < 5; i++) { + + simpleTransaction(); + + } // again check that only open log files remaining - assertLogCount(logsA, 1); - assertLogCount(logsB, 1); - assertLogCount(logsC, 1); + awaitLogCount(logsA, 1); + awaitLogCount(logsB, 1); + awaitLogCount(logsC, 1); } /** @@ -1733,37 +1757,52 @@ * @throws Exception */ public void testStartABC_halogRestart() throws Exception { - // Start 3 services in strict order. -// startA(); -// Thread.sleep(1000); // ensure A will be leader -// startB(); -// startC(); - new ABC(true/*sequential*/); - awaitFullyMetQuorum(); - // setup log directories final File logsA = getHALogDirA(); final File logsB = getHALogDirB(); final File logsC = getHALogDirC(); - final long token = awaitFullyMetQuorum(); - + /* + * Start 2 services and wait for first commit point. The HALogs will not + * be purged because the quorum is not fully met. + */ + HAGlue serverA = startA(); + HAGlue serverB = startB(); + final long token = awaitMetQuorum(); // initial token value. assertEquals(0L, token); + awaitCommitCounter(1L, serverA, serverB); + awaitLogCount(logsA, 2); + awaitLogCount(logsB, 2); + /* + * Start next service. The service will resync and join with the met + * quorum. The HALog files will not be purged - they do not get purged + * until the fully met quorum goes through a commit point. + */ + HAGlue serverC = startC(); + final long token2 = awaitFullyMetQuorum(); + assertEquals(token2,token); + awaitCommitCounter(1L, serverA, serverB, + serverC); + // committed log files not purged prior to fully met commit - assertLogCount(logsA, 2); - assertLogCount(logsB, 2); - assertLogCount(logsC, 2); + awaitLogCount(logsA, 2); + awaitLogCount(logsB, 2); + awaitLogCount(logsC, 2); + + // + // Above here the code is shared with halogRestart2() + // // Run through transaction simpleTransaction(); // again check that only open log files remaining - assertLogCount(logsA, 1); - assertLogCount(logsB, 1); - assertLogCount(logsC, 1); + awaitLogCount(logsA, 1); + awaitLogCount(logsB, 1); + awaitLogCount(logsC, 1); // Now shutdown all servers shutdownB(); @@ -1771,13 +1810,13 @@ shutdownA(); // and check that there are no logs - assertLogCount(logsA, 0); - assertLogCount(logsB, 0); - assertLogCount(logsC, 0); + awaitLogCount(logsA, 0); + awaitLogCount(logsB, 0); + awaitLogCount(logsC, 0); // startup AB - final HAGlue serverA = startA(); - final HAGlue serverB = startB(); + serverA = startA(); + serverB = startB(); final long token1 = awaitMetQuorum(); awaitHAStatus(serverA, HAStatusEnum.Leader); @@ -1785,63 +1824,74 @@ // Verify new quorum token. assertEquals(token + 1, token1); -((HAGlueTest)serverA).log("MARK"); -((HAGlueTest)serverB).log("MARK"); -Thread.sleep(1000/*ms*/); // FIXME Why this delay? // and check that there are open logs - assertLogCount(logsA, 1); - assertLogCount(logsB, 1); + awaitLogCount(logsA, 1); + awaitLogCount(logsB, 1); // add C - final HAGlue serverC = startC(); + serverC = startC(); assertEquals(token1, awaitFullyMetQuorum()); awaitHAStatus(serverA, HAStatusEnum.Leader); awaitHAStatus(serverB, HAStatusEnum.Follower); awaitHAStatus(serverC, HAStatusEnum.Follower); -((HAGlueTest)serverA).log("MARK"); -((HAGlueTest)serverB).log("MARK"); -((HAGlueTest)serverC).log("MARK"); -Thread.sleep(1000/*ms*/); // FIXME Why this delay? // and check again for ABC - assertLogCount(logsA, 1); - assertLogCount(logsB, 1); - assertLogCount(logsC, 1); + awaitLogCount(logsA, 1); + awaitLogCount(logsB, 1); + awaitLogCount(logsC, 1); } /** * Variant where A is shutdown first. */ public void testStartABC_halogRestart2() throws Exception { - // Start 3 services in strict order. -// startA(); -// Thread.sleep(1000); // ensure A will be leader -// startB(); -// startC(); - new ABC(true/* sequential */); - - final long token = awaitFullyMetQuorum(); - - assertEquals(0L, token); - + // setup log directories final File logsA = getHALogDirA(); final File logsB = getHALogDirB(); final File logsC = getHALogDirC(); + /* + * Start 2 services and wait for first commit point. The HALogs will not + * be purged because the quorum is not fully met. + */ + HAGlue serverA = startA(); + HAGlue serverB = startB(); + final long token = awaitMetQuorum(); + // initial token value. + assertEquals(0L, token); + awaitCommitCounter(1L, serverA, serverB); + awaitLogCount(logsA, 2); + awaitLogCount(logsB, 2); + + /* + * Start next service. The service will resync and join with the met + * quorum. The HALog files will not be purged - they do not get purged + * until the fully met quorum goes through a commit point. + */ + HAGlue serverC = startC(); + final long token2 = awaitFullyMetQuorum(); + assertEquals(token2,token); + awaitCommitCounter(1L, serverA, serverB, + serverC); + // committed log files not purged prior to fully met commit - assertLogCount(logsA, 2); - assertLogCount(logsB, 2); - assertLogCount(logsC, 2); + awaitLogCount(logsA, 2); + awaitLogCount(logsB, 2); + awaitLogCount(logsC, 2); + + // + // Above here the code is shared with halogRestart() + // // Run through transaction simpleTransaction(); // again check that only open log files remaining - assertLogCount(logsA, 1); - assertLogCount(logsB, 1); - assertLogCount(logsC, 1); + awaitLogCount(logsA, 1); + awaitLogCount(logsB, 1); + awaitLogCount(logsC, 1); // Verify token unchanged. assertEquals(token, awaitFullyMetQuorum()); @@ -1852,18 +1902,15 @@ shutdownC(); // and check that there are no logs - assertLogCount(logsA, 0); - assertLogCount(logsB, 0); - assertLogCount(logsC, 0); + awaitLogCount(logsA, 0); + awaitLogCount(logsB, 0); + awaitLogCount(logsC, 0); // startup AB - final HAGlue serverA = startA(); - final HAGlue serverB = startB(); + serverA = startA(); + serverB = startB(); final long token1 = awaitMetQuorum(); -((HAGlueTest)serverA).log("MARK"); -((HAGlueTest)serverB).log("MARK"); -Thread.sleep(1000/*ms*/); // FIXME Why this delay? /* * Verify new quorum token (could be a quorum meet when the leader @@ -1872,23 +1919,19 @@ assertTrue(token1 >= token + 1); // and check that there are open logs - assertLogCount(logsA, 1); - assertLogCount(logsB, 1); + awaitLogCount(logsA, 1); + awaitLogCount(logsB, 1); // add C - final HAGlue serverC = startC(); + serverC = startC(); // Verify quorum token is unchanged. assertEquals(token1, awaitFullyMetQuorum()); -((HAGlueTest)serverA).log("MARK"); -((HAGlueTest)serverB).log("MARK"); -((HAGlueTest)serverC).log("MARK"); -Thread.sleep(1000/*ms*/); // FIXME Why this delay? // and check again for ABC - assertLogCount(logsA, 1); - assertLogCount(logsB, 1); - assertLogCount(logsC, 1); + awaitLogCount(logsA, 1); + awaitLogCount(logsB, 1); + awaitLogCount(logsC, 1); } public void testABCMultiTransactionFollowerReads() throws Exception { @@ -2336,18 +2379,18 @@ simpleTransaction(); // 5 committed files + 1 open == 6 - assertLogCount(logsA, 6); - assertLogCount(logsB, 6); + awaitLogCount(logsA, 6); + awaitLogCount(logsB, 6); // and restart C with empty journal, forces Rebuild startC(); awaitFullyMetQuorum(); - assertLogCount(logsA, 6); - assertLogCount(logsB, 6); + awaitLogCount(logsA, 6); + awaitLogCount(logsB, 6); // Log count on C is 1 after quiescent rebuild - assertLogCount(logsC, 1); + awaitLogCount(logsC, 1); log.warn("CHECK: Committed log files not copied on Rebuild"); @@ -2356,9 +2399,9 @@ simpleTransaction(); // and check that only open log files remaining - assertLogCount(logsA, 1); - assertLogCount(logsB, 1); - assertLogCount(logsC, 1); + awaitLogCount(logsA, 1); + awaitLogCount(logsB, 1); + awaitLogCount(logsC, 1); } Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java 2013-10-24 22:40:05 UTC (rev 7481) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServerWithHALogs.java 2013-10-25 13:20:15 UTC (rev 7482) @@ -152,9 +152,9 @@ * Note: This is (lastCommitCounter+1) since an empty HALog was created * for the next commit point. */ - assertLogCount(getHALogDirA(), commitCounter1 + 1); - assertLogCount(getHALogDirB(), commitCounter1 + 1); - assertLogCount(getHALogDirC(), commitCounter1 + 1); + awaitLogCount(getHALogDirA(), commitCounter1 + 1); + awaitLogCount(getHALogDirB(), commitCounter1 + 1); + awaitLogCount(getHALogDirC(), commitCounter1 + 1); /* * Shutdown C. @@ -209,9 +209,9 @@ awaitCommitCounter(commitCounter2, new HAGlue[] { serverA, serverB }); // Verify the expected #of HALogs on each service. - assertLogCount(getHALogDirA(), commitCounter2 + 1); - assertLogCount(getHALogDirB(), commitCounter2 + 1); - assertLogCount(getHALogDirC(), commitCounter2); + awaitLogCount(getHALogDirA(), commitCounter2 + 1); + awaitLogCount(getHALogDirB(), commitCounter2 + 1); + awaitLogCount(getHALogDirC(), commitCounter2); // Verify HALog file for next commit point on A is NOT empty. { @@ -264,9 +264,9 @@ * Note: Each service will have an empty HALog for the next commit * point. */ - assertLogCount(getHALogDirA(), commitCounter2+1); - assertLogCount(getHALogDirB(), commitCounter2+1); - assertLogCount(getHALogDirC(), commitCounter2+1); + awaitLogCount(getHALogDirA(), commitCounter2+1); + awaitLogCount(getHALogDirB(), commitCounter2+1); + awaitLogCount(getHALogDirC(), commitCounter2+1); } Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-10-24 22:40:05 UTC (rev 7481) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-10-25 13:20:15 UTC (rev 7482) @@ -492,10 +492,13 @@ * might have a consensus around the new commit point.) * * TODO Consider leader failure scenarios in this test suite, not just - * scenarios where B fails. Probably we should also cover failures of C (the - * 2nd follower). We should also cover scenariors where the quorum is barely - * met and a single failure causes a rejected commit (local decision) or - * 2-phase abort (joined services in joint agreement). + * scenarios where B fails. We MUST also cover failures of C (the 2nd + * follower). We should also cover scenariors where the quorum is barely met + * and a single failure causes a rejected commit (local decision) or 2-phase + * abort (joined services in joint agreement). + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/760" > + * Review commit2Phase semantics when a follower fails </a> */ public void testStartABC_commit2Phase_B_fails() throws Exception { @@ -517,61 +520,92 @@ new Class[] { IHA2PhaseCommitMessage.class }, 0/* nwait */, 1/* nfail */); - // Simple transaction. - simpleTransaction(); + /** + * FIXME We need to resolve the correct behavior when B fails the commit + * after having prepared. Two code paths are outlined below. The + * implementation currently does an abort2Phase() when the + * commit2Phase() observe an error for B. That causes the commit point + * to NOT advance. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/760" > + * Review commit2Phase semantics when a follower fails </a> + */ -// try { -// // Simple transaction. -// simpleTransaction(); -// fail("Expecting failed transaction"); -// } catch(HttpException ex) { -// if (!ex.getMessage().contains( -// SpuriousTestException.class.getName())) { -// /* -// * Wrong inner cause. -// * -// * Note: The stack trace of the local exception does not include -// * the remote stack trace. The cause is formatted into the HTTP -// * response body. -// */ -// fail("Expecting " + ClocksNotSynchronizedException.class, t); -// } -// } - - // Verify quorum is unchanged. - assertEquals(token, quorum.token()); - - // Should be two commit points on {A,C]. - awaitCommitCounter(2L, startup.serverA, startup.serverC); - - /* - * B should go into an ERROR state and then into SeekConsensus and from - * there to RESYNC and finally back to RunMet. We can not reliably - * observe the intervening states. So what we really need to do is watch - * for B to move to the end of the pipeline and catch up to the same - * commit point. - */ + if(true) { - /* - * The pipeline should be reordered. B will do a service leave, then - * enter seek consensus, and then re-enter the pipeline. - */ - awaitPipeline(new HAGlue[] { startup.serverA, startup.serverC, - startup.serverB }); + // Simple transaction. + simpleTransaction(); - /* - * There should be two commit points on {A,C,B} (note that this assert - * does not pay attention to the pipeline order). - */ - awaitCommitCounter(2L, startup.serverA, startup.serverC, - startup.serverB); + // Verify quorum is unchanged. + assertEquals(token, quorum.token()); - // B should be a follower again. - awaitHAStatus(startup.serverB, HAStatusEnum.Follower); + // Should be two commit points on {A,C]. + awaitCommitCounter(2L, startup.serverA, startup.serverC); - // quorum token is unchanged. - assertEquals(token, quorum.token()); + /* + * B should go into an ERROR state and then into SeekConsensus and + * from there to RESYNC and finally back to RunMet. We can not + * reliably observe the intervening states. So what we really need + * to do is watch for B to move to the end of the pipeline and catch + * up to the same commit point. + */ + /* + * The pipeline should be reordered. B will do a service leave, then + * enter seek consensus, and then re-enter the pipeline. + */ + awaitPipeline(new HAGlue[] { startup.serverA, startup.serverC, + startup.serverB }); + + /* + * There should be two commit points on {A,C,B} (note that this + * assert does not pay attention to the pipeline order). + */ + awaitCommitCounter(2L, startup.serverA, startup.serverC, + startup.serverB); + + // B should be a follower again. + awaitHAStatus(startup.serverB, HAStatusEnum.Follower); + + // quorum token is unchanged. + assertEquals(token, quorum.token()); + + } else { + + try { + + // Simple transaction. + simpleTransaction(); + + fail("Expecting failed transaction"); + + } catch (Exception t) { + + if (!t.getMessage().contains( + SpuriousTestException.class.getName())) { + /* + * Wrong inner cause. + * + * Note: The stack trace of the local exception does not + * include the remote stack trace. The cause is formatted + * into the HTTP response body. + */ + fail("Expecting " + SpuriousTestException.class, t); + } + + } + + // Verify quorum is unchanged. + assertEquals(token, quorum.token()); + + // Should be ONE commit point on {A,B, C]. + awaitCommitCounter(1L, startup.serverA, startup.serverB, + startup.serverC); + + fail("finish test under these assumptions"); + + } + } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-24 22:40:12
|
Revision: 7481 http://bigdata.svn.sourceforge.net/bigdata/?rev=7481&view=rev Author: thompsonbry Date: 2013-10-24 22:40:05 +0000 (Thu, 24 Oct 2013) Log Message: ----------- javadoc Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-24 20:34:58 UTC (rev 7480) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-24 22:40:05 UTC (rev 7481) @@ -1440,10 +1440,9 @@ log.error("ZOOKEEPER SESSION EXPIRED: token=" + token()); doNotifyClientDisconnected(); // /* -// * FIXME We can not cure an expire ZK session. Instead, we tear down +// * Note: We can not cure an expire ZK session. Instead, we tear down // * the QuorumClient, obtain a new HClient connectionm, and then -// * restart the QuorumClient. Therefore this code should go since it -// * not make progress. +// * restart the QuorumClient. Therefore this code can not make progress. // */ // while (true) { // try { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-24 20:35:08
|
Revision: 7480 http://bigdata.svn.sourceforge.net/bigdata/?rev=7480&view=rev Author: thompsonbry Date: 2013-10-24 20:34:58 +0000 (Thu, 24 Oct 2013) Log Message: ----------- I have disabled the unit tests that stop and restart the zookeeper service. We can investigate re-enabling these tests based on our updated understanding of the zookeeper semantics. However, another way to approach this (and one which may make more sense) is to run a highly available zookeeper ensemble and then investigate how the bigdata HA replication cluster handles the transient zk client changes as we stop and start various zk servers, forcing the zk clients to rollover to different zk servers in the ensemble. See #718 (zk disconnect handling) See #724 (wire pull and sudden kill testing). Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAll.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAll.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAll.java 2013-10-24 20:25:13 UTC (rev 7479) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAll.java 2013-10-24 20:34:58 UTC (rev 7480) @@ -114,7 +114,7 @@ suite.addTestSuite(TestHA3DumpLogs.class); // Verify ability to override the HAJournal implementation class. -// suite.addTestSuite(TestHAJournalServerOverride.class); FIXME RESTORE OVERRIDE TEST SUITE! + suite.addTestSuite(TestHAJournalServerOverride.class); // Test suite of longer running stress tests for an HA3 cluster. suite.addTestSuite(StressTestHA3JournalServer.class); Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-10-24 20:25:13 UTC (rev 7479) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-10-24 20:34:58 UTC (rev 7480) @@ -893,47 +893,54 @@ // // } - public void testStartAB_StopStartZookeeper() throws Exception { - - doStartAB_StopStartZookeeper(); - } - - public void testStartAB_StopStartZookeeperA() throws Exception { - - doStartAB_StopStartZookeeper(); - } - - public void testStartAB_StopStartZookeeperB() throws Exception { - - doStartAB_StopStartZookeeper(); - } - - public void testStartAB_StopStartZookeeperC() throws Exception { - - doStartAB_StopStartZookeeper(); - } - - public void testStartAB_StopStartZookeeperD() throws Exception { - - doStartAB_StopStartZookeeper(); - } - - public void testStartAB_StopStartZookeeperE() throws Exception { - - doStartAB_StopStartZookeeper(); - } - - public void testStartAB_StopStartZookeeperF() throws Exception { - - doStartAB_StopStartZookeeper(); - } - - public void testStartAB_StopStartZookeeperG() throws Exception { - - doStartAB_StopStartZookeeper(); - } + /* + * FIXME Consider re-enabling these tests. Do we want to attempt to + * programmatically stop and start the zookeeper ensemble under test + * control? For true HA deployments, what would be much more interesting + * is to run an ensemble with 3 zk servers and then do failovers of the + * zk servers and see how that effects the HAJournalServer instances. + */ +// public void testStartAB_StopStartZookeeper() throws Exception { +// +// doStartAB_StopStartZookeeper(); +// } +// +// public void testStartAB_StopStartZookeeperA() throws Exception { +// +// doStartAB_StopStartZookeeper(); +// } +// +// public void testStartAB_StopStartZookeeperB() throws Exception { +// +// doStartAB_StopStartZookeeper(); +// } +// +// public void testStartAB_StopStartZookeeperC() throws Exception { +// +// doStartAB_StopStartZookeeper(); +// } +// +// public void testStartAB_StopStartZookeeperD() throws Exception { +// +// doStartAB_StopStartZookeeper(); +// } +// +// public void testStartAB_StopStartZookeeperE() throws Exception { +// +// doStartAB_StopStartZookeeper(); +// } +// +// public void testStartAB_StopStartZookeeperF() throws Exception { +// +// doStartAB_StopStartZookeeper(); +// } +// +// public void testStartAB_StopStartZookeeperG() throws Exception { +// +// doStartAB_StopStartZookeeper(); +// } - public void doStartAB_StopStartZookeeper() throws Exception { + private void doStartAB_StopStartZookeeper() throws Exception { final HAGlue serverA = startA(); final HAGlue serverB = startB(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-24 20:25:20
|
Revision: 7479 http://bigdata.svn.sourceforge.net/bigdata/?rev=7479&view=rev Author: thompsonbry Date: 2013-10-24 20:25:13 +0000 (Thu, 24 Oct 2013) Log Message: ----------- fiddling with the startup/teardown logic for the service. Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-24 19:50:55 UTC (rev 7478) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-24 20:25:13 UTC (rev 7479) @@ -44,6 +44,8 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; @@ -1132,19 +1134,14 @@ * is specified and otherwise requires you to specify one or more unicast * locators (URIs of hosts running discovery services). * - * @return The {@link LookupDiscoveryManager}. The caller MUST invoke - * {@link LookupDiscoveryManager#terminate()} when they are done - * with this service. - * * @see JiniClientConfig * @see JiniClientConfig#Options * * @throws ConfigurationException * @throws IOException */ - synchronized LookupDiscoveryManager startLookupDiscoveryManager( - final Configuration config) throws ConfigurationException, - IOException { + private void startLookupDiscoveryManager(final Configuration config) + throws ConfigurationException, IOException { if (lookupDiscoveryManager == null) { @@ -1159,10 +1156,62 @@ } - return lookupDiscoveryManager; + } + private volatile LookupDiscoveryManager lookupDiscoveryManager = null; + /** + * Await discovery of at least one {@link ServiceRegistrar}. + * + * @param timeout + * The timeout. + * @param unit + * The units for that timeout. + * + * @throws IllegalArgumentException + * if minCount is non-positive. + */ + protected ServiceRegistrar[] awaitServiceRegistrars(final long timeout, + final TimeUnit unit) throws TimeoutException, + InterruptedException { + + if (lookupDiscoveryManager == null) + throw new IllegalStateException(); + + final long begin = System.nanoTime(); + final long nanos = unit.toNanos(timeout); + long remaining = nanos; + + ServiceRegistrar[] registrars = null; + + while ((registrars == null || registrars.length == 0) + && remaining > 0) { + + registrars = lookupDiscoveryManager.getRegistrars(); + + Thread.sleep(100/* ms */); + + final long elapsed = System.nanoTime() - begin; + + remaining = nanos - elapsed; + + } + + if (registrars == null || registrars.length == 0) { + + throw new RuntimeException( + "Could not discover ServiceRegistrar(s)"); + + } + + if (log.isInfoEnabled()) { + + log.info("Found " + registrars.length + " service registrars"); + + } + + return registrars; + } - private LookupDiscoveryManager lookupDiscoveryManager; /** * Export a proxy object for this service instance. @@ -1174,12 +1223,6 @@ throws ConfigurationException, IOException { /* - * Note: We run our own LookupDiscoveryManager in order to avoid forcing - * an unexport of the service proxy when the HAClient is terminated. - */ - final LookupDiscoveryManager lookupDiscoveryManager = startLookupDiscoveryManager(config); - - /* * Export a proxy object for this service instance. * * Note: This must be done before we start the join manager since the @@ -1332,25 +1375,6 @@ } - if (lookupDiscoveryManager != null) { - - try { - - lookupDiscoveryManager.terminate(); - - } catch (Throwable ex) { - - log.error("Could not terminate the lookup discovery manager: " - + this, ex); - - } finally { - - lookupDiscoveryManager = null; - - } - - } - return unexported; } @@ -1876,6 +1900,25 @@ if (haClient.isConnected()) haClient.disconnect(false/* immediateShutdown */); + if (lookupDiscoveryManager != null) { + + try { + + lookupDiscoveryManager.terminate(); + + } catch (Throwable ex) { + + log.error("Could not terminate the lookup discovery manager: " + + this, ex); + + } finally { + + lookupDiscoveryManager = null; + + } + + } + // if (serviceDiscoveryManager != null) { // // serviceDiscoveryManager.terminate(); @@ -1943,6 +1986,13 @@ try { + /* + * Note: We run our own LookupDiscoveryManager in order to avoid + * forcing an unexport of the service proxy when the HAClient is + * terminated. + */ + startLookupDiscoveryManager(config); + // Create the service object. impl = newService(config); Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-24 19:50:55 UTC (rev 7478) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-24 20:25:13 UTC (rev 7479) @@ -901,7 +901,7 @@ } catch (Throwable ex) { log.fatal( - "Problem initiating service discovery: " + "Could not connect: " + ex.getMessage(), ex); try { @@ -991,6 +991,9 @@ if (discoveryClient != null) { + if (log.isInfoEnabled()) + log.info("Terminating " + discoveryClient); + discoveryClient.terminate(); discoveryClient = null; @@ -1003,6 +1006,9 @@ if (serviceDiscoveryManager != null) { + if (log.isInfoEnabled()) + log.info("Terminating " + serviceDiscoveryManager); + serviceDiscoveryManager.terminate(); serviceDiscoveryManager = null; @@ -1011,6 +1017,9 @@ if (lookupDiscoveryManager != null) { + if (log.isInfoEnabled()) + log.info("Terminating " + lookupDiscoveryManager); + lookupDiscoveryManager.terminate(); lookupDiscoveryManager = null; Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-24 19:50:55 UTC (rev 7478) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-24 20:25:13 UTC (rev 7479) @@ -1363,20 +1363,20 @@ // Start the HAClient (River and Zookeeper). server.getHAClient().connect(); - /* - * Verify discovery of at least one ServiceRegistrar. - */ - try { - log.info("Awaiting service registrar discovery."); - server.getHAClient() - .getConnection() - .awaitServiceRegistrars(10/* timeout */, - TimeUnit.SECONDS); - } catch (TimeoutException e1) { - throw new RuntimeException(e1); - } catch (InterruptedException e1) { - throw new RuntimeException(e1); - } +// /* +// * Verify discovery of at least one ServiceRegistrar. +// */ +// try { +// log.info("Awaiting service registrar discovery."); +// server.getHAClient() +// .getConnection() +// .awaitServiceRegistrars(10/* timeout */, +// TimeUnit.SECONDS); +// } catch (TimeoutException e1) { +// throw new RuntimeException(e1); +// } catch (InterruptedException e1) { +// throw new RuntimeException(e1); +// } // Ensure key znodes exist. try { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-24 19:51:18
|
Revision: 7478 http://bigdata.svn.sourceforge.net/bigdata/?rev=7478&view=rev Author: thompsonbry Date: 2013-10-24 19:50:55 +0000 (Thu, 24 Oct 2013) Log Message: ----------- bug fixes to HAClient zk connection timeout logic. bug fixes to test harness zk start/stop and is running logic. Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-24 18:31:18 UTC (rev 7477) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-24 19:50:55 UTC (rev 7478) @@ -860,22 +860,37 @@ * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1666 */ { + boolean reverseDSNError = false; final long begin = System.nanoTime(); while (zk.getState().isAlive()) { if (zk.getState() == States.CONNECTED) { // connected. break; } + final long elapsed = System.nanoTime() - begin; + if (!reverseDSNError + && TimeUnit.NANOSECONDS.toSeconds(elapsed) > 4) { + reverseDSNError = true; // just one warning. + log.error("Reverse DNS is not configured. The ZooKeeper client is taking too long to resolve server(s): " + + zooConfig.servers + + ", took=" + + TimeUnit.NANOSECONDS.toMillis(elapsed) + + "ms"); + } + if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 10) { + // Fail if we can not reach zookeeper. + throw new RuntimeException( + "Could not connect to zookeeper: state=" + + zk.getState() + + ", config" + + zooConfig + + ", elapsed=" + + TimeUnit.NANOSECONDS + .toMillis(elapsed) + "ms"); + } // wait and then retry. Thread.sleep(100/* ms */); } - final long elapsed = System.nanoTime() - begin; - if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 4) { - log.error("Reverse DNS is not configured. The ZooKeeper client is taking too long to resolve server(s): " - + zooConfig.servers - + ", took=" - + TimeUnit.NANOSECONDS.toMillis(elapsed) + "ms"); - } } // And set the reference. The client is now "connected". @@ -883,7 +898,7 @@ log.info("Done."); - } catch (Exception ex) { + } catch (Throwable ex) { log.fatal( "Problem initiating service discovery: " Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-10-24 18:31:18 UTC (rev 7477) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-10-24 19:50:55 UTC (rev 7478) @@ -1319,16 +1319,19 @@ // connected. break; } + final long elapsed = System.nanoTime() - begin; + if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 4) { + fail("Either zookeeper is not running or reverse DNS is not configured. " + + "The ZooKeeper client is taking too long to resolve server(s): state=" + + zookeeper.getState() + + ", config=" + + zkClientConfig + + ", took=" + + TimeUnit.NANOSECONDS.toMillis(elapsed) + "ms"); + } // wait and then retry. Thread.sleep(100/* ms */); } - final long elapsed = System.nanoTime() - begin; - if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 4) { - fail("Reverse DNS is not configured. The ZooKeeper client is taking too long to resolve server(s): " - + zoohosts - + ", took=" - + TimeUnit.NANOSECONDS.toMillis(elapsed) + "ms"); - } } // znode name for the logical service. @@ -2903,8 +2906,10 @@ * @throws IOException */ protected void stopZookeeper() throws InterruptedException, IOException { + assertZookeeperRunning(); log.warn(""); zkCommand("stop"); + assertZookeeperNotRunning(); } /** @@ -2916,8 +2921,10 @@ * @throws IOException */ protected void startZookeeper() throws InterruptedException, IOException { + assertZookeeperNotRunning(); log.warn(""); zkCommand("start"); + assertZookeeperRunning(); } /** @@ -2943,11 +2950,19 @@ fail("System property not defined: " + pname); final File zookeeperDir = new File(zookeeperDirStr); if (!zookeeperDir.exists()) - fail("No such file: " + pname); + fail("No such file: " + zookeeperDir); + final File binDir = new File(zookeeperDir, "bin"); + if (!binDir.exists()) + fail("No such file: " + binDir); + final String shell = SystemUtil.isWindows() ? "cmd" : "/bin/sh"; final String executable = SystemUtil.isWindows() ? "zkServer.cmd" : "zkServer.sh"; - final ProcessBuilder pb = new ProcessBuilder("/bin/sh", executable, cmd); - pb.directory(new File(zookeeperDir, "bin")); + if (log.isInfoEnabled()) + log.info("installDir=" + zookeeperDirStr + ", binDir=" + binDir + + ", shell=" + shell + ", executable=" + executable + + ", cmd=" + cmd); + final ProcessBuilder pb = new ProcessBuilder(shell, executable, cmd); + pb.directory(binDir); final int exitCode = pb.start().waitFor(); // Make sure that the command executed normally! assertEquals("exitCode=" + exitCode, 0, exitCode); @@ -2958,35 +2973,45 @@ /** Verify zookeeper is running on the local host at the client port. */ protected void assertZookeeperRunning() { - final int clientPort = Integer.valueOf(System.getProperty( - "test.zookeeper.clientPort", "2081")); + if (!isZookeeperRunning()) + fail("Zookeeper not running: localIP=" + getZKInetAddress() + + ", clientPort=" + getZKClientPort()); - final InetAddress localIpAddr = NicUtil.getInetAddress(null, 0, null, - true); - try { - ZooHelper.ruok(localIpAddr, clientPort); - } catch (Throwable t) { - fail("Zookeeper not running:: " + localIpAddr + ":" + clientPort, t); - } - } /** Verify zookeeper is not running on the local host at the client port. */ protected void assertZookeeperNotRunning() { - final int clientPort = Integer.valueOf(System.getProperty( - "test.zookeeper.clientPort", "2081")); - final InetAddress localIpAddr = NicUtil.getInetAddress(null, 0, null, - true); + + if (isZookeeperRunning()) + fail("Zookeeper is running: localIP=" + getZKInetAddress() + + ", clientPort=" + getZKClientPort()); + + } + + /** + * Return <code>true</code>iff zookeeper is running on the local host at the + * client port. + */ + private boolean isZookeeperRunning() { + final int clientPort = getZKClientPort(); + final InetAddress localIpAddr = getZKInetAddress(); + boolean running = false; try { ZooHelper.ruok(localIpAddr, clientPort); - fail("Zookeeper is running: localIpAddr=" + localIpAddr - + ", clientPort=" + clientPort); + running = true; } catch (Throwable t) { - if (log.isInfoEnabled()) - log.info("Zookeeper not running:: " + localIpAddr + ":" - + clientPort); + log.warn("localIpAddr=" + localIpAddr + ":: t", t); } + return running; + } + private int getZKClientPort() { + return Integer.valueOf(System.getProperty("test.zookeeper.clientPort", + "2081")); } + private InetAddress getZKInetAddress() { + return NicUtil.getInetAddress(null, 0, null, true); + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-24 18:31:26
|
Revision: 7477 http://bigdata.svn.sourceforge.net/bigdata/?rev=7477&view=rev Author: thompsonbry Date: 2013-10-24 18:31:18 +0000 (Thu, 24 Oct 2013) Log Message: ----------- Bug fix to exit the while(true) loop correctly in the ErrorTask if the zookeeper client was connected. See #718 Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-24 14:58:13 UTC (rev 7476) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-24 18:31:18 UTC (rev 7477) @@ -2118,8 +2118,7 @@ // Reduce to negotiated timeout GT ZERO. sessionTimeout = sessionTimeout2; } - switch(zk.getState()) { - case CONNECTED: + if (zk.getState() == ZooKeeper.States.CONNECTED) { break; } final long elapsed = System.nanoTime() - begin; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |