From: <tho...@us...> - 2013-02-21 15:19:24
|
Revision: 6924 http://bigdata.svn.sourceforge.net/bigdata/?rev=6924&view=rev Author: thompsonbry Date: 2013-02-21 15:19:09 +0000 (Thu, 21 Feb 2013) Log Message: ----------- Refactored the HALog file extension onto the IHALogReader interface (in both the halog and althalog packages). This data was being declared in several different locations. Added declaration for the quorum/backup znode to ZKQuorum in support of HA backups. Added QuorumBackupState object in support of HA backups. Modified purgeHALogs() to check the data on the quorum/backup znode. When that znode exists, it will not delete HALogs unless they have been backed up. Added code to HAJournalServer.serviceJoin() to purge HALogs (but not the current HALog) when we transition to a fully met quorum. This allows the purge to occur before we reach a commit point. This code is conditionally disabled for now. Additional cleanup and documentation on HAJournalServer and the HA transition flowcharts. Removed the incremental option from IHASyncRequest. This is no longer required since we have restructured how we do RESYNC. Modified Paths: -------------- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogManager.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/IHALogReader.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/IHALogReader.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HALogRequest.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HARebuildRequest.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/IHASyncRequest.java branches/READ_CACHE/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HABackupManager.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorum.java branches/READ_CACHE/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java Added Paths: ----------- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/quorum/zk/QuorumBackupState.java Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -64,7 +64,7 @@ */ public class HALogFile { - public static final String HA_LOG_EXT = ".ha-log"; +// public static final String HA_LOG_EXT = ".ha-log"; /** * Logger for HA events. @@ -175,7 +175,7 @@ m_callback = callback; final File hadir = m_callback.getHALogDir(); m_haLogFile = new File(hadir, getHALogFileName(rbv.getCommitCounter()) - + HA_LOG_EXT); + + IHALogReader.HA_LOG_EXT); if (m_haLogFile.exists()) throw new IllegalStateException("File already exists: " @@ -675,7 +675,7 @@ final Formatter f = new Formatter(sb); - f.format("%020d" + HA_LOG_EXT, commitCounter); + f.format("%020d" + IHALogReader.HA_LOG_EXT, commitCounter); f.flush(); f.close(); @@ -930,8 +930,6 @@ /** Current write cache block sequence counter. */ private long m_nextSequence = 0; - public static final String HA_LOG_EXT = ".ha-log"; - private void assertOpen() { if (!isOpen()) Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogManager.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogManager.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogManager.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -337,7 +337,7 @@ } - return name.endsWith(HALogFile.HA_LOG_EXT); + return name.endsWith(IHALogReader.HA_LOG_EXT); } }); Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/IHALogReader.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/IHALogReader.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/IHALogReader.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -41,6 +41,11 @@ */ public interface IHALogReader { + /** + * The filename extension used for the HALog files. + */ + public static final String HA_LOG_EXT = ".ha-log"; + /** * Closes the Reader. * Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/HALogReader.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -423,7 +423,7 @@ } - return name.endsWith(HALogWriter.HA_LOG_EXT); + return name.endsWith(IHALogReader.HA_LOG_EXT); } }); Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -121,7 +121,7 @@ /** state lock **/ final private ReentrantReadWriteLock m_stateLock = new ReentrantReadWriteLock(); - public static final String HA_LOG_EXT = ".ha-log"; +// public static final String HA_LOG_EXT = ".ha-log"; /** current write point on the channel. */ private long m_position = headerSize0; @@ -199,7 +199,7 @@ final Formatter f = new Formatter(sb); - f.format("%020d" + HA_LOG_EXT, commitCounter); + f.format("%020d" + IHALogReader.HA_LOG_EXT, commitCounter); f.flush(); f.close(); Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/IHALogReader.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/IHALogReader.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/halog/IHALogReader.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -39,6 +39,11 @@ */ public interface IHALogReader { + /** + * The filename extension used for the HALog files. + */ + public static final String HA_LOG_EXT = ".ha-log"; + /** * Closes the Reader. * Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HALogRequest.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HALogRequest.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HALogRequest.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -34,7 +34,7 @@ private final UUID serviceId; private final long commitCounter; - private final boolean incremental; +// private final boolean incremental; /** * @param serviceId @@ -43,12 +43,13 @@ * The commit counter used to identify the desired commit point * (the commit counter of the closing root block). */ - public HALogRequest(final UUID serviceId, final long commitCounter, - final boolean incremental) { + public HALogRequest(final UUID serviceId, final long commitCounter +// , final boolean incremental + ) { this.serviceId = serviceId; this.commitCounter = commitCounter; - this.incremental = incremental; +// this.incremental = incremental; } @@ -69,15 +70,17 @@ public String toString() { return getClass() + "{serviceId=" + getServiceId() + ", commitCounter=" - + getCommitCounter() + ", incremental=" + isIncremental() + "}"; + + getCommitCounter() + +// ", incremental=" + isIncremental() + + "}"; } - @Override - public boolean isIncremental() { - - return incremental; - - } +// @Override +// public boolean isIncremental() { +// +// return incremental; +// +// } } Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HARebuildRequest.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HARebuildRequest.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HARebuildRequest.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -53,16 +53,17 @@ public String toString() { - return getClass() + "{serviceId=" + getServiceId() + ", incremental=" - + isIncremental() + "}"; + return getClass() + "{serviceId=" + getServiceId() + // + ", incremental=" + isIncremental() + + "}"; } - @Override - final public boolean isIncremental() { - - return false; - - } +// @Override +// final public boolean isIncremental() { +// +// return false; +// +// } } Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/IHASyncRequest.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/IHASyncRequest.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/IHASyncRequest.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -33,12 +33,12 @@ */ public interface IHASyncRequest extends IHAMessage { - /** - * When <code>true</code> the request is part of an incremental - * re-synchronization. When <code>false</code> the request is part of - * a total re-build. - */ - boolean isIncremental(); +// /** +// * When <code>true</code> the request is part of an incremental +// * re-synchronization. When <code>false</code> the request is part of +// * a total re-build. +// */ +// boolean isIncremental(); /** * The UUID of the service that issued this request. Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -1013,10 +1013,18 @@ lock.lock(); try { - + + if (joined.size() != k) { + + // Quorum is not fully met. + return false; + + } + + // Verify token. assertQuorum(token); - return joined.size() == k; + return true; } finally { Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HABackupManager.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HABackupManager.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HABackupManager.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -29,6 +29,7 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; +import java.rmi.Remote; import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -72,6 +73,7 @@ import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.ha.msg.IHAWriteSetStateRequest; import com.bigdata.ha.msg.IHAWriteSetStateResponse; +import com.bigdata.io.SerializerUtil; import com.bigdata.jini.start.config.ZookeeperClientConfig; import com.bigdata.journal.IRootBlockView; import com.bigdata.quorum.AbstractQuorumMember; @@ -79,6 +81,8 @@ import com.bigdata.quorum.QuorumEvent; import com.bigdata.quorum.QuorumException; import com.bigdata.quorum.QuorumListener; +import com.bigdata.quorum.zk.QuorumBackupState; +import com.bigdata.quorum.zk.ZKQuorum; import com.bigdata.quorum.zk.ZKQuorumImpl; import com.bigdata.service.jini.JiniClient; import com.bigdata.service.jini.JiniClientConfig; @@ -1047,6 +1051,33 @@ // FIXME Install the root blocks (atomically or as dups of the current root block). // installRootBlocks(resp.getRootBlock0(), resp.getRootBlock1()); + /* + * TODO After we run the backup utility, we need to ensure that + * the znode exists against which the backup will be registered. + * Then we need to write in the values of {inc,full} for the + * most recent incremental and full backups (depending on which + * one we just did). + */ + final long inc = -1; // TODO set this. + final long full = -1; // TODO set this. + final byte[] data = SerializerUtil + .serialize(new QuorumBackupState(inc, full)); + final String zpath = logicalServiceZPath + "/" + + ZKQuorum.QUORUM + "/" + ZKQuorum.QUORUM_BACKUP; + // Ensure exists. + try { + zka.getZookeeper().create( + zpath, data, + zkClientConfig.acl, CreateMode.PERSISTENT); + } catch (NodeExistsException ex) { + + /* + * Since it already exists, we just update it's state now. + */ + zka.getZookeeper().setData(zpath, data, -1/* version */); + + } + // Done return null; Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -476,7 +476,7 @@ if (f.isDirectory()) return true; - return f.getName().endsWith(HALogWriter.HA_LOG_EXT); + return f.getName().endsWith(IHALogReader.HA_LOG_EXT); } }); Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -56,6 +56,7 @@ import org.apache.log4j.Logger; import org.apache.log4j.MDC; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.data.ACL; import org.eclipse.jetty.server.Server; @@ -83,6 +84,7 @@ import com.bigdata.ha.msg.IHAWriteSetStateResponse; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.IBufferAccess; +import com.bigdata.io.SerializerUtil; import com.bigdata.io.writecache.WriteCache; import com.bigdata.jini.start.config.ZookeeperClientConfig; import com.bigdata.jini.util.JiniUtil; @@ -94,6 +96,8 @@ import com.bigdata.quorum.QuorumEvent; import com.bigdata.quorum.QuorumException; import com.bigdata.quorum.QuorumListener; +import com.bigdata.quorum.zk.QuorumBackupState; +import com.bigdata.quorum.zk.ZKQuorum; import com.bigdata.quorum.zk.ZKQuorumImpl; import com.bigdata.rdf.sail.webapp.ConfigParams; import com.bigdata.rdf.sail.webapp.NanoSparqlServer; @@ -531,9 +535,9 @@ * Ensure that the HAQuorumService will not attempt to cure any * serviceLeave or related actions. * - * TODO If we properly enter a ShutdownTask run state then we would - * not have to do this since it will already be in the Shutdown - * runstate. + * TODO SHUTDOWN: If we properly enter a ShutdownTask run state then + * we would not have to do this since it will already be in the + * Shutdown runstate. */ quorumService.runStateRef .set(HAQuorumService.RunStateEnum.Shutdown); @@ -674,7 +678,7 @@ RunMet, Resync, Rebuild, - Shutdown; // TODO We are not using this systematically (no ShutdownTask for this run state). + Shutdown; // TODO SHUTDOWN: We are not using this systematically (no ShutdownTask for this run state). } private final AtomicReference<RunStateEnum> runStateRef = new AtomicReference<RunStateEnum>( @@ -1039,8 +1043,34 @@ return null; } } - + /** + * If there is a fully met quorum, then we can purge all HA logs + * <em>EXCEPT</em> the current one. + */ + @Override + public void serviceJoin() { + + super.serviceJoin(); + + if (false) { + /* + * TODO BACKUP: Purge of HALog files on a fully met quorum is + * current disabled. Enable, but review implications in more + * depth first. + */ + final long token = getQuorum().token(); + + if (getQuorum().isQuorumFullyMet(token)) { + + purgeHALogs(false/* includeCurrent */); + + } + } + + } + + /** * Task to handle a quorum break event. */ private class SeekConsensusTask extends RunStateCallable<Void> { @@ -1233,16 +1263,16 @@ @Override protected Void doRun() throws Exception { /* - * FIXME Enable restore and test. There is a problem with - * replication of the WORM HALog files and backup/restore. The - * WORM HALog files currently do not have the actual data on the - * leader. This makes some of the code messier and also means - * that the HALog files can not be binary equals on the leader - * and follower and could cause problems if people harvest them - * from the file system directly rather than through - * sendHALogFile() since they will be missing the necessary - * state in the file system if they were put there by the - * leader. + * FIXME RESTORE: Enable restore and test. There is a problem + * with replication of the WORM HALog files and backup/restore. + * The WORM HALog files currently do not have the actual data on + * the leader. This makes some of the code messier and also + * means that the HALog files can not be binary equals on the + * leader and follower and could cause problems if people + * harvest them from the file system directly rather than + * through sendHALogFile() since they will be missing the + * necessary state in the file system if they were put there by + * the leader. */ if(false) while (true) { @@ -1625,7 +1655,7 @@ * HALog files that we need. However, REBUILD is still * semantically correct so long as we restart the procedure. * - * TODO RESYNC : It is possible to go to another service in the + * TODO RESYNC: It is possible to go to another service in the * met quorum for the same log file, but it needs to be a * service that is UPSTREAM of this service. */ @@ -1742,8 +1772,9 @@ ft = leader .sendHALogForWriteSet(new HALogRequest( - server.serviceUUID, closingCommitCounter, - true/* incremental */)); + server.serviceUUID, closingCommitCounter +// , true/* incremental */ + )); // Wait until all write cache blocks are received. ft.get(); @@ -2070,13 +2101,8 @@ } - // The current root block on this service. - final long commitCounter = journal.getRootBlockView() - .getCommitCounter(); + if (req != null && req instanceof IHARebuildRequest) { - if (req != null && !req.isIncremental() - && req instanceof IHARebuildRequest) { - /* * This message and payload are part of a ground up service * rebuild (disaster recovery from the quorum) rather than @@ -2135,17 +2161,22 @@ final RunStateEnum runState = runStateRef.get(); - if (RunStateEnum.Resync.equals(runState) - || RunStateEnum.Rebuild.equals(runState)) { + if (RunStateEnum.Resync.equals(runState)) { /* * If we are resynchronizing, then pass ALL messages (both * live and historical) into handleResyncMessage(). + * + * Note: This method handles the transition into the met + * quorum when we observe a LIVE message that is the + * successor of the last received HISTORICAL message. This + * is the signal that we are caught up on the writes on the + * met quorum and may join. */ handleResyncMessage((IHALogRequest) req, msg, data); - } else if (commitCounter == msg.getCommitCounter() + } else if (journal.getRootBlockView().getCommitCounter() == msg.getCommitCounter() && isJoinedMember(msg.getQuorumToken())) { /* @@ -2242,17 +2273,14 @@ final HALogWriter logWriter = journal.getHALogWriter(); - final long journalCommitCounter = journal.getRootBlockView() - .getCommitCounter(); - if (req == null) { /* * Live message. */ - if (msg.getCommitCounter() == journalCommitCounter - && msg.getSequence() + 1 == logWriter.getSequence()) { + if ((msg.getCommitCounter() == journal.getRootBlockView().getCommitCounter()) + && ((msg.getSequence() + 1) == logWriter.getSequence())) { /* * We just received a live message that is the successor @@ -2378,30 +2406,9 @@ */ getActor().serviceJoin(); - /* - * - */ - + // Transition to RunMet. enterRunState(new RunMetTask(token, leaderId)); - // /* - // * TODO RESYNC : If there is a fully met quorum, then we can purge - // * all HA logs *EXCEPT* the current one. However, in order - // * to have the same state on each node, we really need to - // * make this decision when a service observes the - // * SERVICE_JOIN event that results in a fully met quorum. - // * That state change will be globally visible. If we do - // this - // * here, then only the service that was resynchronizing - // will - // * wind up purging its logs. - // */ - // if (getQuorum().isQuorumFullyMet()) { - // - // purgeHALogs(false/* includeCurrent */); - // - // } - } /** @@ -2539,6 +2546,33 @@ * {@inheritDoc} * <p> * Destroys the HA log files in the HA log directory. + * + * TODO BACKUP: This already parses the closing commit counter out of + * the HALog filename. To support backup, we MUST NOT delete an HALog + * file that is being retained by the backup retention policy. This will + * be coordinated through zookeeper. + * + * <pre> + * logicalServiceZPath/backedUp {inc=CC;full=CC} + * </pre> + * + * where + * <dl> + * <dt>inc</dt> + * <dd>The commitCounter of the last incremental backup that was + * captured</dd> + * <dt>full</dt> + * <dd>The commitCounter of the last full backup that was captured.</dd> + * </dl> + * + * IF this znode exists, then backups are being captured. When backups + * are being captured, then HALog files may only be removed once they + * have been captured by a backup. The znode is automatically created by + * the backup utility. If you destroy the znode, it will no longer + * assume that backups are being captured until the next time you run + * the backup utility. + * + * @see HABackupManager */ @Override public void purgeHALogs(final boolean includeCurrent) { @@ -2547,35 +2581,116 @@ try { - final File logDir = journal.getHALogDir(); + final QuorumBackupState backupState; + { + QuorumBackupState tmp = null; + try { + final String zpath = server.logicalServiceZPath + "/" + + ZKQuorum.QUORUM + "/" + + ZKQuorum.QUORUM_BACKUP; + tmp = (QuorumBackupState) SerializerUtil + .deserialize(server.zka + .getZookeeper() + .getData(zpath, false/* watch */, null/* stat */)); + } catch (KeeperException.NoNodeException ex) { + // ignore. + } catch (KeeperException e) { + // log @ error and ignore. + log.error(e); + } catch (InterruptedException e) { + // Propagate interrupt. + Thread.currentThread().interrupt(); + return; + } + backupState = tmp; + } + + /* + * List the HALog files for this service. + */ + final File[] logFiles; + { - final File[] files = logDir.listFiles(new FilenameFilter() { + final File currentLogFile = journal.getHALogWriter() + .getFile(); - @Override - public boolean accept(final File dir, final String name) { + final String currentLogFileName = currentLogFile == null ? null + : currentLogFile.getName(); - return name.endsWith(HALogWriter.HA_LOG_EXT); + final File logDir = journal.getHALogDir(); - } - }); + logFiles = logDir.listFiles(new FilenameFilter() { + /** + * Return <code>true</code> iff the file is an HALog + * file that should be deleted. + * + * @param name + * The name of that HALog file (encodes the + * commitCounter). + */ + @Override + public boolean accept(final File dir, final String name) { + + if (!name.endsWith(IHALogReader.HA_LOG_EXT)) { + // Not an HALog file. + return false; + } + + if (!includeCurrent && currentLogFile != null + && name.equals(currentLogFileName)) { + /* + * The caller requested that we NOT purge the + * current HALog, and this is it. + */ + return false; + } + + // Strip off the filename extension. + final String logFileBaseName = name.substring(0, + IHALogReader.HA_LOG_EXT.length()); + + // Closing commitCounter for HALog file. + final long logCommitCounter = Long + .parseLong(logFileBaseName); + + if (backupState != null) { + /* + * Backups are being made. + * + * When backups are being made, we DO NOT delete + * HALog files for commit points GT this + * commitCounter. + */ + if (logCommitCounter > backupState.inc()) { + /* + * Backups are being made and this HALog + * file has not been backed up yet. + */ + return false; + } + } + + // This HALog file MAY be deleted. + return true; + + } + }); + + } + int ndeleted = 0; long totalBytes = 0L; - final File currentFile = journal.getHALogWriter().getFile(); + for (File logFile : logFiles) { - for (File file : files) { + // #of bytes in that HALog file. + final long len = logFile.length(); - final long len = file.length(); + if (!logFile.delete()) { - final boolean delete = includeCurrent - || currentFile != null - && file.getName().equals(currentFile.getName()); + haLog.warn("COULD NOT DELETE FILE: " + logFile); - if (delete && !file.delete()) { - - haLog.warn("COULD NOT DELETE FILE: " + file); - continue; } Added: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/quorum/zk/QuorumBackupState.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/quorum/zk/QuorumBackupState.java (rev 0) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/quorum/zk/QuorumBackupState.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -0,0 +1,118 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Jun 29, 2010 + */ + +package com.bigdata.quorum.zk; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * A object whose state indicates the commitCounter of (a) the last full backup; + * and (b) the last incremental backup for the highly available logical service + * associated with this quorum. + * + * @see ZKQuorum#QUORUM_BACKUP + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: QuorumServiceState.java 4069 2011-01-09 20:58:02Z thompsonbry $ + */ +public class QuorumBackupState implements Externalizable { + + private static final long serialVersionUID = 1L; + + /** + * The initial version. The version appears as the first byte. The remaining + * fields for this version are: the {@link #serviceUUID} written as the most + * significant bits (long) followed by least significant bits (long). + */ + protected static final byte VERSION0 = 0; + + private long inc, full; + + public String toString() { + return getClass().getName() + // + "{inc=" + inc + ", full=" + full + // + "}"; + } + + /** + * Deserialization constructor. + */ + public QuorumBackupState() { + + } + + public QuorumBackupState(final long inc, final long full) { + + if (inc <= 0L) + throw new IllegalArgumentException(); + if (full <= 0L) + throw new IllegalArgumentException(); + + this.inc = inc; + this.full = full; + + } + + public long inc() { + return inc; + } + + public long full() { + return full; + } + + public void readExternal(final ObjectInput in) throws IOException, + ClassNotFoundException { + + final byte version = in.readByte(); + + switch (version) { + case VERSION0: { + inc = in.readLong(); + full = in.readLong(); + break; + } + default: + throw new IOException("Unknown version: " + version); + } + + } + + public void writeExternal(final ObjectOutput out) throws IOException { + + out.write(VERSION0); + + out.writeLong(inc); + + out.writeLong(full); + + } + +} Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorum.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorum.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorum.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -187,4 +187,18 @@ */ String QUORUM_PIPELINE_PREFIX = "pipeline"; + /** + * The name of the znode whose data reflects the commit counters associated + * with the most recent full and incremental backups of the logical service + * associated with this quorum. + * <p> + * Note: This znode does not exist unless backups are being performed. If it + * exists, then HALog files (each of which corresponds to a single commit + * point) will not be purged unless their commit point is LT the last + * incremental or full backup recorded in this znode. + * + * @see QuorumBackupState + */ + String QUORUM_BACKUP = "backup"; + } Modified: branches/READ_CACHE/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java =================================================================== --- branches/READ_CACHE/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java 2013-02-20 20:26:26 UTC (rev 6923) +++ branches/READ_CACHE/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java 2013-02-21 15:19:09 UTC (rev 6924) @@ -35,7 +35,7 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.QuorumService; -import com.bigdata.ha.halog.HALogWriter; +import com.bigdata.ha.halog.IHALogReader; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.IIndexManager; import com.bigdata.quorum.zk.ZKQuorumImpl; @@ -147,7 +147,7 @@ @Override public boolean accept(File dir, String name) { return name - .endsWith(HALogWriter.HA_LOG_EXT); + .endsWith(IHALogReader.HA_LOG_EXT); } }); int nfiles = 0; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |