From: <tho...@us...> - 2014-01-20 22:27:57
|
Revision: 7816 http://bigdata.svn.sourceforge.net/bigdata/?rev=7816&view=rev Author: thompsonbry Date: 2014-01-20 22:27:50 +0000 (Mon, 20 Jan 2014) Log Message: ----------- I have modified HALogNexus to release the HALog files asynchronously. This causes some problems in the existing unit tests, so I have added a configuration parameter that may be used to specify a synchronous HALog release timeout - this defaults to ZERO, but may be increased as desired (in exchange for latency in 2-phase commits) or as necessary for the existing tests, some of which assume that HALog files are released synchronously at the 2-phase commit. The expected behavior should now be that HALog files are asynchronously release starting with the first 2-phase commit in which the corresponding commit point is reclaimable. The release of HALog files will continue until the specified first commit point that may not be released, or until an invariant is violated. Under sustained commits with concurrent releases, any commit in which the task to release HALog files is longer running will start a new such task with a new earliest commit point that may not be released. Changes are to: * HALogNexus: now submits a task to release the HALog files. There can be at most one instance of this task running for a given HAJournal. The task is wrapped by the FutureTaskInvariantMon and will be cancelled if the service is no longer joined with the met quorum. * HAJournalServer: A new configuration parameter exists to control the maximum delay for releasing the HALog files (default is ZERO milliseconds). * TestHARestorePolicy: This was modified to raise the timeout for the synchronous purge of the HALogs so the test assumptions would remain valid. Note: The HAJournalServer.ConfigurationOptions.HA_LOG_PURGE_TIMEOUT defaults to zero. This is probably what we always want. However, the existence of this option allows us to revert to the old behavior using a configuration change or by changing the default. See #780 Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3RestorePolicy.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2014-01-20 20:29:52 UTC (rev 7815) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2014-01-20 22:27:50 UTC (rev 7816) @@ -347,6 +347,27 @@ String DEFAULT_HA_LOG_DIR = "HALog"; /** + * The maximum amount of time in milliseconds to await the synchronous + * release of older HALog files during a 2-phase commit (default + * {@value #DEFAULT_HA_LOG_PURGE_TIMEOUT}). This MAY be ZERO to not + * wait. Large timeouts can cause significant latency during a 2-phase + * commit if a large number of HALog files should be released + * accordinging to the {@link IRestorePolicy}. + * + * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/780" + * >Incremental or asynchronous purge of HALog files</a> + */ + String HA_LOG_PURGE_TIMEOUT = "HALogPurgeTimeout"; + + /** + * The default is ZERO (0L) milliseconds, which is probably what we + * always want. However, the existence of this option allows us to + * revert to the old behavior using a configuration change or by + * changing the default. + */ + long DEFAULT_HA_LOG_PURGE_TIMEOUT = 0L; // milliseconds + + /** * The name of the directory in which periodic snapshots of the journal * will be written. Each snapshot is a full copy of the journal. * Snapshots are compressed and therefore may be much more compact than Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2014-01-20 20:29:52 UTC (rev 7815) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2014-01-20 22:27:50 UTC (rev 7816) @@ -32,7 +32,15 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -43,6 +51,9 @@ import com.bigdata.btree.ITuple; import com.bigdata.btree.ITupleIterator; +import com.bigdata.concurrent.FutureTaskInvariantMon; +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.QuorumService; import com.bigdata.ha.QuorumServiceBase; import com.bigdata.ha.halog.HALogReader; import com.bigdata.ha.halog.HALogWriter; @@ -56,6 +67,7 @@ import com.bigdata.journal.RootBlockView; import com.bigdata.journal.jini.ha.HALogIndex.HALogRecord; import com.bigdata.journal.jini.ha.HALogIndex.IHALogRecord; +import com.bigdata.quorum.Quorum; import com.bigdata.striterator.Resolver; import com.bigdata.striterator.Striterator; import com.bigdata.util.ChecksumError; @@ -206,12 +218,41 @@ */ private final HALogIndex haLogIndex; + /** + * The maximum amount of time in milliseconds to await the synchronous purge + * of HALog files during a 2-phase commit. + * + * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/780" + * >Incremental or asynchronous purge of HALog files</a> + * + * @see HAJournalServer.ConfigurationOptions#HA_LOG_PURGE_TIMEOUT + */ + private final long haLogPurgeTimeout; + public HALogNexus(final HAJournalServer server, final HAJournal journal, final Configuration config) throws IOException, ConfigurationException { this.journal = journal; - + + { + haLogPurgeTimeout = (Long) config + .getEntry( + HAJournalServer.ConfigurationOptions.COMPONENT, + HAJournalServer.ConfigurationOptions.HA_LOG_PURGE_TIMEOUT, + Long.TYPE, + HAJournalServer.ConfigurationOptions.DEFAULT_HA_LOG_PURGE_TIMEOUT); + + if (haLogPurgeTimeout < 0) { + throw new ConfigurationException( + HAJournalServer.ConfigurationOptions.HA_LOG_PURGE_TIMEOUT + + "=" + + haLogPurgeTimeout + + " : must be GTE ZERO"); + } + + } + // Note: This is the effective service directory. final File serviceDir = server.getServiceDir(); @@ -1021,6 +1062,87 @@ } /** + * Class purges all HALog files LT the specified commit counter. This class + * is intended to run asynchronously in order to avoid large latency during + * a commit in which many HALog files may be released. + * + * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/780" + * >Incremental or asynchronous purge of HALog files</a> + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class DeleteHALogsTask implements Callable<Void> { + + private final long token; + private final long earliestRetainedSnapshotCommitCounter; + + DeleteHALogsTask(final long token, + final long earliestRetainedSnapshotCommitCounter) { + this.token = token; + this.earliestRetainedSnapshotCommitCounter = earliestRetainedSnapshotCommitCounter; + } + + @Override + public Void call() throws Exception { + + final long nfiles = haLogIndex.getEntryCount(); + + long ndeleted = 0L, totalBytes = 0L; + + final Iterator<IHALogRecord> itr = getHALogs(); + + while(itr.hasNext() && logAccessors.get() == 0) { + + final IHALogRecord r = itr.next(); + + final long closingCommitCounter = r.getCommitCounter(); + + final boolean deleteFile = closingCommitCounter < earliestRetainedSnapshotCommitCounter; + + if (!deleteFile) { + + // No more files to delete. + break; + + } + + if (!journal.getQuorum().isQuorumFullyMet(token)) { + /* + * Halt operation. + * + * Note: This is not an error, but we can not remove + * snapshots or HALogs if this invariant is violated. + */ + break; + } + + // The HALog file to be removed. + final File logFile = getHALogFile(closingCommitCounter); + + // Remove that HALog file from the file system and our index. + removeHALog(logFile); + + ndeleted++; + + totalBytes += r.sizeOnDisk(); + + } + + if (haLog.isInfoEnabled()) + haLog.info("PURGED LOGS: nfound=" + nfiles + ", ndeleted=" + + ndeleted + ", totalBytes=" + totalBytes + + ", earliestRetainedSnapshotCommitCounter=" + + earliestRetainedSnapshotCommitCounter); + + // done + return null; + + } + + } // class DeleteHALogsTask + + /** * Delete HALogs that are no longer required. * * @param earliestRetainedSnapshotCommitCounter @@ -1028,60 +1150,126 @@ * retained snapshot. We need to retain any HALogs that are GTE * this commit counter since they will be applied to that * snapshot. + * + * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/780" + * >Incremental or asynchronous purge of HALog files</a> */ void deleteHALogs(final long token, final long earliestRetainedSnapshotCommitCounter) { - final long nfiles = haLogIndex.getEntryCount(); - - long ndeleted = 0L, totalBytes = 0L; + synchronized (deleteHALogFuture) { - final Iterator<IHALogRecord> itr = getHALogs(); - - while(itr.hasNext() && logAccessors.get() == 0) { - - final IHALogRecord r = itr.next(); + { + final Future<Void> f = deleteHALogFuture.get(); - final long closingCommitCounter = r.getCommitCounter(); - - final boolean deleteFile = closingCommitCounter < earliestRetainedSnapshotCommitCounter; + if (f != null) { - if (!deleteFile) { + /* + * Existing task. Check to see if done or still running. + */ - // No more files to delete. - break; + if (!f.isDone()) { + // Still releasing some HALogs from a previous request. + return; + } - } + try { + f.get(); + } catch (InterruptedException e) { + // propagate interrupt. + Thread.currentThread().interrupt(); + return; + } catch (CancellationException e) { + /* + * Note: This is not an error. If the invariants are + * violated, the task will be cancelled. The task is + * "safe" as long as the invariants are valid. + */ + log.warn("Cancelled: " + e); + } catch (ExecutionException e) { + log.error(e, e); + } - if (!journal.getQuorum().isQuorumFullyMet(token)) { - /* - * Halt operation. - * - * Note: This is not an error, but we can not remove - * snapshots or HALogs if this invariant is violated. - */ - break; + // clear reference. + deleteHALogFuture.set(null); + + } } - // The HALog file to be removed. - final File logFile = getHALogFile(closingCommitCounter); + /* + * Start new request. + */ - // Remove that HALog file from the file system and our index. - removeHALog(logFile); + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal + .getQuorum(); - ndeleted++; + final QuorumService<HAGlue> localService = quorum.getClient(); - totalBytes += r.sizeOnDisk(); + // Task sends an HALog file along the pipeline. + final FutureTask<Void> ft = new FutureTaskInvariantMon<Void>( + new DeleteHALogsTask(token, + earliestRetainedSnapshotCommitCounter), quorum) { - } + @Override + protected void establishInvariants() { + assertQuorumMet(); + assertJoined(localService.getServiceId()); + assertMember(localService.getServiceId()); + } - if (haLog.isInfoEnabled()) - haLog.info("PURGED LOGS: nfound=" + nfiles + ", ndeleted=" - + ndeleted + ", totalBytes=" + totalBytes - + ", earliestRetainedSnapshotCommitCounter=" - + earliestRetainedSnapshotCommitCounter); + }; + // save reference to prevent concurrent execution of this task + deleteHALogFuture.set(ft); + + // Run task. + journal.getExecutorService().submit(ft); + + /* + * Wait up to a deadline for the HALogs to be purged. If this + * operation can not be completed synchronously, then it will + * continus asynchronously while the invariants remain valid. + * + * Note: Some of the unit tests were written to assume that the + * purge of the HALog files was synchronous. This assumption is no + * longer valid since we will purge the HALog files asynchronously + * in order to avoid latency during a commit when a large number of + * HALog files must be purged. + */ + if (haLogPurgeTimeout > 0) { + try { + ft.get(haLogPurgeTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // propagate interrupt. + Thread.currentThread().interrupt(); + return; + } catch (CancellationException e) { + /* + * Note: This is not an error. If the invariants are + * violated, the task will be cancelled. The task is "safe" + * as long as the invariants are valid. + */ + log.warn("Cancelled: " + e); + return; + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + // ignore. + } + } + + } + } + + /** + * Reference is used to avoid concurrent execution of multiple instances of + * the {@link DeleteHALogsTask}. + * <p> + * Note: This {@link AtomicReference} also doubles as a monitor object to + * provide a guard for {@link #deleteHALogs(long, long)}. + */ + private final AtomicReference<Future<Void>> deleteHALogFuture = new AtomicReference<Future<Void>>(); /** * Delete all HALog files (except the current one). The {@link #haLogIndex} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3RestorePolicy.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3RestorePolicy.java 2014-01-20 20:29:52 UTC (rev 7815) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3RestorePolicy.java 2014-01-20 22:27:50 UTC (rev 7816) @@ -97,7 +97,8 @@ return new String[]{ "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy("+restorePolicyMinSnapshotAgeMillis+","+restorePolicyMinSnapshots+","+restorePolicyMinRestorePoints+")", - "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()" + "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", + "com.bigdata.journal.jini.ha.HAJournalServer.HALogPurgeTimeout="+Long.MAX_VALUE+"L" // force synchronous purge. }; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |