From: <tho...@us...> - 2012-09-03 10:36:53
|
Revision: 6508 http://bigdata.svn.sourceforge.net/bigdata/?rev=6508&view=rev Author: thompsonbry Date: 2012-09-03 10:36:47 +0000 (Mon, 03 Sep 2012) Log Message: ----------- HAJournalServer: pass [token] rather than [NO_QUORUM] on meet. AbstractQuorum.assertLeader(token) : throw exception if caller's token is NO_QUORUM. AbstractJournal : lifted newHAGlue() and getPort() into AbstractHAJournalTestCase. Working on setQuorumToken() semantics. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-09-03 09:35:29 UTC (rev 6507) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-09-03 10:36:47 UTC (rev 6508) @@ -30,10 +30,7 @@ import java.io.File; import java.io.IOException; import java.lang.ref.WeakReference; -import java.net.BindException; import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.FileChannel; @@ -2610,7 +2607,7 @@ if (quorum != null) { /* - * Verify that the last negotiated quorum is still in valid. + * Verify that the last negotiated quorum is still valid. */ quorum.assertLeader(quorumToken); } @@ -4523,11 +4520,110 @@ private volatile long quorumToken = Quorum.NO_QUORUM; protected final long getQuorumToken() { + return quorumToken; + } + protected void setQuorumToken(final long newValue) { + + /* + * The token is [volatile]. Save it's state on entry. Figure out if this + * is a quorum meet or a quorum break. + */ + final long oldValue = quorumToken; - quorumToken = newValue; + + if (oldValue == newValue) { + + // No change. + return; + + } + + final boolean didBreak; + final boolean didMeet; + + if (newValue == Quorum.NO_QUORUM && oldValue != Quorum.NO_QUORUM) { + + /* + * Quorum break. + * + * Immediately invalidate the token. Do not wait for a lock. + */ + + this.quorumToken = newValue; + + didBreak = true; + didMeet = false; + + } else if (newValue != Quorum.NO_QUORUM && oldValue == Quorum.NO_QUORUM) { + + /* + * Quorum meet. + * + * We must wait for the lock to update the token. + */ + + didBreak = false; + didMeet = true; + + } else { + + /* + * Excluded middle. If there was no change, then we returned + * immediately up above. If there is a change, then it must be + * either a quorum break or a quorum meet, which were identified in + * the if-then-else above. + */ + + throw new AssertionError(); + + } + + /* + * Both a meet and a break require an exclusive write lock. + */ + final WriteLock lock = _fieldReadWriteLock.writeLock(); + + lock.lock(); + + try { + + if (didBreak) { + + /* + * We also need to discard any active read/write tx since there + * is no longer a quorum and a read/write tx was running on the + * old leader. + * + * We do not need to discard read-only tx since the committed + * state should remain valid even when a quorum is lost. + */ + abort(); + + } else if (didMeet) { + + quorumToken = newValue; + + /* + * FIXME We need to re-open the backing store with the token for + * the new quorum. + */ +// _bufferStrategy.reopen(quorumToken); + + } else { + + throw new AssertionError(); + + } + + } finally { + + lock.unlock(); + + } + } /** @@ -4547,24 +4643,16 @@ /** * Factory for the {@link HADelegate} object for this - * {@link AbstractJournal}. This may be overridden to publish additional - * methods for the low-level HA API. The object returned by this factor is + * {@link AbstractJournal}. The object returned by this method will be made * available using {@link QuorumMember#getService()}. + * + * @throws UnsupportedOperationException + * always. */ protected HAGlue newHAGlue(final UUID serviceId) { - // FIXME This is defaulting to a random port on the loopback address. - final InetSocketAddress writePipelineAddr; - try { - writePipelineAddr = new InetSocketAddress(getPort(0)); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - - return new BasicHA(serviceId, writePipelineAddr); - + throw new UnsupportedOperationException(); + } /** @@ -4934,30 +5022,6 @@ }; /** - * Return an unused port. - * - * @param suggestedPort - * The suggested port. - * - * @return The suggested port, unless it is zero or already in use, in which - * case an unused port is returned. - * - * @throws IOException - */ - static protected int getPort(int suggestedPort) throws IOException { - ServerSocket openSocket; - try { - openSocket = new ServerSocket(suggestedPort); - } catch (BindException ex) { - // the port is busy, so look for a random open port - openSocket = new ServerSocket(0); - } - final int port = openSocket.getLocalPort(); - openSocket.close(); - return port; - } - - /** * Remove all commit records between the two provided keys. * * This is called from the RWStore when it checks for deferredFrees against Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2012-09-03 09:35:29 UTC (rev 6507) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2012-09-03 10:36:47 UTC (rev 6508) @@ -921,9 +921,13 @@ } final public void assertLeader(final long token) { + if (token == NO_QUORUM) { + // The quorum was not met when the client obtained that token. + throw new QuorumException("Client token is invalid."); + } if (this.token == NO_QUORUM) { // The quorum is not met. - throw new QuorumException(); + throw new QuorumException("Quorum is not met."); } final UUID leaderId = getLeaderId(); final QuorumMember<S> client = getClientAsMember(); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java 2012-09-03 09:35:29 UTC (rev 6507) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java 2012-09-03 10:36:47 UTC (rev 6508) @@ -28,6 +28,11 @@ package com.bigdata.journal.ha; import java.io.File; +import java.io.IOException; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.rmi.Remote; import java.util.Properties; @@ -340,12 +345,65 @@ super(properties, quorum); } + /** + * {@inheritDoc} + * <p> + * Note: This uses a random port on the loopback address. + */ + @Override public HAGlue newHAGlue(final UUID serviceId) { - return super.newHAGlue(serviceId); - + final InetSocketAddress writePipelineAddr; + try { + writePipelineAddr = new InetSocketAddress(getPort(0)); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new HAGlueService(serviceId, writePipelineAddr); + } + /** + * Return an unused port. + * + * @param suggestedPort + * The suggested port. + * + * @return The suggested port, unless it is zero or already in use, in which + * case an unused port is returned. + * + * @throws IOException + */ + static protected int getPort(int suggestedPort) throws IOException { + ServerSocket openSocket; + try { + openSocket = new ServerSocket(suggestedPort); + } catch (BindException ex) { + // the port is busy, so look for a random open port + openSocket = new ServerSocket(0); + } + final int port = openSocket.getLocalPort(); + openSocket.close(); + return port; + } + + /** + * Extended implementation supports RMI. + */ + protected class HAGlueService extends BasicHA { + + protected HAGlueService(final UUID serviceId, + final InetSocketAddress writePipelineAddr) { + + super(serviceId, writePipelineAddr); + + } + + } + } protected Quorum<HAGlue, QuorumService<HAGlue>> newQuorum() { Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-09-03 09:35:29 UTC (rev 6507) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-09-03 10:36:47 UTC (rev 6508) @@ -498,7 +498,7 @@ super.quorumMeet(token, leaderId); // Inform the journal that there is a new quorum token. - journal.setQuorumToken(Quorum.NO_QUORUM); + journal.setQuorumToken(token); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |