From: <tho...@us...> - 2013-03-15 20:33:38
|
Revision: 6999 http://bigdata.svn.sourceforge.net/bigdata/?rev=6999&view=rev Author: thompsonbry Date: 2013-03-15 20:33:26 +0000 (Fri, 15 Mar 2013) Log Message: ----------- Checkpoint on refactoring to support snapshots (HA onlone backup). - Moved HALog property to HAJournalServer Configuration, reorganized how we handle the HAJournal initialization to use Configuration in preference to Properties for anything not already declared by Journal.Options. - Added HAJournalServer configuration options for snapshotDir, snapshotPolicy, and restorePolicy. - Added interfaces and implementations for ISnapshotPolicy and IRestorePolicy. - Removed QUORUM_BACKUP and QuourmBackupState. - Added logic to remove old snapshots and modified the logic to remove old HALogs to be snapshot aware. - Added HAJournal.snapshotIndex. The index is populated from the file system when the service starts. - Refactored logic into SnapshotManager. HA tests are green with the included configuration files. Modified Paths: -------------- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-A.config branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-B.config branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-C.config branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorum.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-A.config branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-B.config branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-C.config branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServer.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-A.properties branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-B.properties branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-C.properties branches/READ_CACHE/src/resources/HAJournal/HAJournal.config Added Paths: ----------- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/CommitTimeIndex.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultRestorePolicy.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultSnapshotPolicy.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/ForeverRestorePolicy.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/IRestorePolicy.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/ISnapshotPolicy.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/NoRestorePolicy.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/NoSnapshotPolicy.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java Removed Paths: ------------- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HABackupManager.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/quorum/zk/QuorumBackupState.java Added: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/CommitTimeIndex.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/CommitTimeIndex.java (rev 0) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/CommitTimeIndex.java 2013-03-15 20:33:26 UTC (rev 6999) @@ -0,0 +1,374 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.journal.jini.ha; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; +import java.util.UUID; + +import com.bigdata.btree.BTree; +import com.bigdata.btree.BytesUtil; +import com.bigdata.btree.Checkpoint; +import com.bigdata.btree.DefaultTupleSerializer; +import com.bigdata.btree.ITuple; +import com.bigdata.btree.IndexMetadata; +import com.bigdata.btree.keys.ASCIIKeyBuilderFactory; +import com.bigdata.btree.keys.IKeyBuilder; +import com.bigdata.btree.keys.IKeyBuilderFactory; +import com.bigdata.btree.keys.KeyBuilder; +import com.bigdata.journal.ICommitRecord; +import com.bigdata.journal.IRootBlockView; +import com.bigdata.journal.RootBlockView; +import com.bigdata.rawstore.Bytes; +import com.bigdata.rawstore.IRawStore; +import com.bigdata.util.ChecksumUtility; + +/** + * {@link BTree} mapping <em>commitTime</em> (long integers) to + * {@link IRootBlockView} records. + * <p> + * Note: Access to this object MUST be synchronized. + * <p> + * Note: This is used as a transient data structure that is populated from the + * file system by the {@link HAJournalServer}. A separate instance is maintained + * for the HALog files and the snapshot files. + */ +public class CommitTimeIndex extends BTree { + + /** + * Instance used to encode the timestamp into the key. + */ + final private IKeyBuilder keyBuilder = new KeyBuilder(Bytes.SIZEOF_LONG); + + /** + * Create a transient instance. + * + * @return The new instance. + */ + static public CommitTimeIndex createTransient() { + + final IndexMetadata metadata = new IndexMetadata(UUID.randomUUID()); + + metadata.setBTreeClassName(CommitTimeIndex.class.getName()); + + metadata.setTupleSerializer(new TupleSerializer( + new ASCIIKeyBuilderFactory(Bytes.SIZEOF_LONG))); + + return (CommitTimeIndex) BTree.createTransient(/*store, */metadata); + + } + + /** + * Load from the store. + * + * @param store + * The backing store. + * @param checkpoint + * The {@link Checkpoint} record. + * @param metadata + * The metadata record for the index. + */ + public CommitTimeIndex(final IRawStore store, final Checkpoint checkpoint, + final IndexMetadata metadata, final boolean readOnly) { + + super(store, checkpoint, metadata, readOnly); + + } + + /** + * Encodes the commit time into a key. + * + * @param commitTime + * The commit time. + * + * @return The corresponding key. + */ + private byte[] getKey(final long commitTime) { + + return keyBuilder.reset().append(commitTime).getKey(); + + } + + /** + * Return the {@link IRootBlock} identifying the journal having the largest + * commitTime that is less than or equal to the given timestamp. This is + * used primarily to locate the commit record that will serve as the ground + * state for a transaction having <i>timestamp</i> as its start time. In + * this context the LTE search identifies the most recent commit state that + * not later than the start time of the transaction. + * + * @param timestamp + * The given timestamp. + * + * @return The description of the relevant journal resource -or- + * <code>null</code> iff there are no journals in the index that + * satisify the probe. + * + * @throws IllegalArgumentException + * if <i>timestamp</i> is less than or equals to ZERO (0L). + */ + synchronized public IRootBlockView find(final long timestamp) { + + if (timestamp <= 0L) + throw new IllegalArgumentException(); + + // find (first less than or equal to). + final long index = findIndexOf(timestamp); + + if(index == -1) { + + // No match. + log.warn("Not found: " + timestamp); + + return null; + + } + + return valueAtIndex(index); + + } + + /** + * Retrieve the entry from the index. + */ + private IRootBlockView valueAtIndex(final long index) { + + final byte[] val = super.valueAt(index); + + assert val != null : "Entry has null value: index=" + index; + + final IRootBlockView entry = new RootBlockView(false/* rootBlock0 */, + ByteBuffer.wrap(val), ChecksumUtility.getCHK()); + + return entry; + + } + + /** + * Find the first journal whose <em>createTime</em> is strictly greater + * than the timestamp. + * + * @param timestamp + * The timestamp. A value of ZERO (0) may be used to find the + * first journal. + * + * @return The commit record -or- <code>null</code> if there is no commit + * record whose timestamp is strictly greater than <i>timestamp</i>. + */ + synchronized public IRootBlockView findNext(final long timestamp) { + + /* + * Note: can also be written using rangeIterator().next(). + */ + + if (timestamp < 0L) + throw new IllegalArgumentException(); + + // find first strictly greater than. + final long index = findIndexOf(Math.abs(timestamp)) + 1; + + if (index == nentries) { + + // No match. + + return null; + + } + + return valueAtIndex(index); + + } + + /** + * Find the index of the {@link ICommitRecord} having the largest timestamp + * that is less than or equal to the given timestamp. + * + * @return The index of the {@link ICommitRecord} having the largest + * timestamp that is less than or equal to the given timestamp -or- + * <code>-1</code> iff there are no {@link ICommitRecord}s + * defined. + */ + synchronized public long findIndexOf(final long timestamp) { + + long pos = super.indexOf(getKey(timestamp)); + + if (pos < 0) { + + /* + * the key lies between the entries in the index, or possible before + * the first entry in the index. [pos] represents the insert + * position. we convert it to an entry index and subtract one to get + * the index of the first commit record less than the given + * timestamp. + */ + + pos = -(pos+1); + + if(pos == 0) { + + // No entry is less than or equal to this timestamp. + return -1; + + } + + pos--; + + return pos; + + } else { + + /* + * exact hit on an entry. + */ + + return pos; + + } + + } + + /** + * Add an entry under the commitTime associated with the + * {@link IRootBlockView} record. + * + * @param rootBlock + * The {@link IRootBlockView} record. + * + * @exception IllegalArgumentException + * if <i>commitTime</i> is <code>0L</code>. + * @exception IllegalArgumentException + * if <i>rootBLock</i> is <code>null</code>. + * @exception IllegalArgumentException + * if there is already an entry registered under for the + * given timestamp. + */ + synchronized public void add(final IRootBlockView rootBlock) { + + if (rootBlock == null) + throw new IllegalArgumentException(); + + final long createTime = rootBlock.getLastCommitTime(); + + if (createTime == 0L) + throw new IllegalArgumentException(); + + final byte[] key = getKey(createTime); + + if(super.contains(key)) { + + throw new IllegalArgumentException("entry exists: timestamp=" + + createTime); + + } + + // add a serialized entry to the persistent index. + super.insert(key, BytesUtil.getBytes(rootBlock.asReadOnlyBuffer())); + + } + + /** + * Encapsulates key and value formation. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: JournalIndex.java 5892 2012-01-27 13:22:11Z thompsonbry $ + */ + static protected class TupleSerializer extends + DefaultTupleSerializer<Long, IRootBlockView> { + + /** + * + */ + private static final long serialVersionUID = -2851852959439807542L; + + /** + * De-serialization ctor. + */ + public TupleSerializer() { + + super(); + + } + + /** + * Ctor when creating a new instance. + * + * @param keyBuilderFactory + */ + public TupleSerializer(final IKeyBuilderFactory keyBuilderFactory) { + + super(keyBuilderFactory); + + } + + /** + * Decodes the key as a commit time. + */ + @Override + public Long deserializeKey(ITuple tuple) { + + return KeyBuilder + .decodeLong(tuple.getKeyBuffer().array(), 0/* offset */); + + } + + /** + * The initial version (no additional persistent state). + */ + private final static transient byte VERSION0 = 0; + + /** + * The current version. + */ + private final static transient byte VERSION = VERSION0; + + public void readExternal(final ObjectInput in) throws IOException, + ClassNotFoundException { + + super.readExternal(in); + + final byte version = in.readByte(); + + switch (version) { + case VERSION0: + break; + default: + throw new UnsupportedOperationException("Unknown version: " + + version); + } + + } + + public void writeExternal(final ObjectOutput out) throws IOException { + + super.writeExternal(out); + + out.writeByte(VERSION); + + } + + } + +} Added: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultRestorePolicy.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultRestorePolicy.java (rev 0) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultRestorePolicy.java 2013-03-15 20:33:26 UTC (rev 6999) @@ -0,0 +1,92 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.journal.jini.ha; + +import java.util.concurrent.TimeUnit; + +import com.bigdata.journal.IRootBlockView; + +/** + * The default restore policy. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class DefaultRestorePolicy implements IRestorePolicy { + + private final long millis; + + /** + * The default is to keep local backups on hand for 7 days. + */ + public DefaultRestorePolicy() { + + this(TimeUnit.DAYS.toMillis(7)); + + } + + /** + * Create a policy that will keep local backups on hand for the specified + * number of milliseconds. + * + * @param millis + * The #of milliseconds of state that can be restored from local + * backups. + */ + public DefaultRestorePolicy(final long millis) { + + if (millis < 0) + throw new IllegalArgumentException(); + + this.millis = millis; + + } + + /** + * This finds and returns the commit counter for the most recent snapshot + * whose commit time is LTE <code>now - millis</code>, where <i>millis</i> + * is the #of milliseconds specified by the constructor for this policy. The + * return value will be ZERO (0) if there are no commit points. + */ + @Override + public long getEarliestRestorableCommitPoint(final HAJournal jnl) { + + final long now = System.currentTimeMillis(); + + final long then = now - millis; + + final IRootBlockView rootBlock = jnl.getSnapshotManager() + .getSnapshotIndex().find(then); + + if (rootBlock == null) { + + // There are no snapshots. + return 0L; + + } + + return rootBlock.getCommitCounter(); + + } + +} Added: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultSnapshotPolicy.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultSnapshotPolicy.java (rev 0) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DefaultSnapshotPolicy.java 2013-03-15 20:33:26 UTC (rev 6999) @@ -0,0 +1,182 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.journal.jini.ha; + +import java.lang.ref.WeakReference; +import java.util.Calendar; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; + +/** + * Policy schedules a snapshot at the same time each day. A threshold is used to + * skip the backup if the HALog delta is LTE a specified percentage of the size + * of the journal on the disk. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class DefaultSnapshotPolicy implements ISnapshotPolicy { + + private static final transient Logger log = Logger + .getLogger(DefaultSnapshotPolicy.class); + + final private int timeOfDay; + final private int percentLogSize; + + /** + * The default policy wakes up at <code>0200</code> and takes a snapshot if + * the size of the HALogs written since the last snapshot is at least + * <code>50%</code> of the size of the journal on the disk. + */ + public DefaultSnapshotPolicy() { + + this(200/* 0200 hours */, 50/* percent */); + + } + + /** + * + * @param timeOfDay + * The time of day to wake up and decide whether or not to make a + * new snapshot. + * @param percentLogSize + * The threshold at which a new snapshot will be made. This is + * expressed as a percentage of the HALog size on the disk for + * those HALog files written since the last snapshot (or all + * HALogs if no snapshot has been taken yet). + */ + public DefaultSnapshotPolicy(final int timeOfDay, final int percentLogSize) { + + if (timeOfDay < 0) + throw new IllegalArgumentException(); + + if (percentLogSize < 10 || percentLogSize > 400) + throw new IllegalArgumentException("percentage must be in [10:400]"); + + this.timeOfDay = timeOfDay; + + this.percentLogSize = percentLogSize; + + } + + public void init(final HAJournal jnl) { + + final long initialDelay = delay(timeOfDay); + + jnl.addScheduledTask(new SnapshotTask(jnl), initialDelay, 1/* delay */, + TimeUnit.DAYS); + + } + + private class SnapshotTask implements Runnable { + + /** + * Note: Weak reference prevents the Journal from being pinned. + */ + private final WeakReference<HAJournal> ref; + + public SnapshotTask(final HAJournal jnl) { + + this.ref = new WeakReference<HAJournal>(jnl); + + } + + /** + * Note: Do not throw anything out of this method or it will cause the + * task to not be rescheduled! + */ + @Override + public void run() { + + try { + + final HAJournal jnl = ref.get(); + + if (jnl == null) + return; + + jnl.getSnapshotManager().takeSnapshot(percentLogSize); + + } catch (Throwable t) { + + log.error(t, t); + + } + + } + + } + + /** + * Return the delay (milliseconds) until the given time of day. The time of + * day is expressed as a single integer <code>hhmm</code>. + * + * @param tod + * The time of day expressed as a single integer + * <code>hhmm</code>. + * + * @return The milliseconds until that time of day. + */ + static private long delay(final int tod) { + + final long minuteMillis = 60 * 1000; + final long dayMillis = 24 * 60 * minuteMillis; + + final int todHours = tod / 100; + final int todMinutes = tod % 100; + + final long todMillis = ((todHours * 60) + todMinutes) * minuteMillis; + + final long now = System.currentTimeMillis(); + + final long tzAdjust = Calendar.getInstance().getTimeZone() + .getRawOffset(); + + // days mod 24 * 60 * 60 * 1000 + final long startOfDay = now - (now % dayMillis) - tzAdjust; + +// final long startOfDay = now - (now % dayMillis); + + final long targetTime = startOfDay + todMillis; + + final long delayMs = targetTime - now; + + if (delayMs < 0) { + + return delayMs + dayMillis; + + } else { + + return delayMs; + + } + + } + +// public static void main(String[] args) { +// System.out.println("1540 delay: " + delay(1540)); +// System.out.println("1330 delay: " + delay(1330)); +// } + +} Added: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/ForeverRestorePolicy.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/ForeverRestorePolicy.java (rev 0) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/ForeverRestorePolicy.java 2013-03-15 20:33:26 UTC (rev 6999) @@ -0,0 +1,49 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.journal.jini.ha; + +/** + * A policy that never permits the release of backups such that you can always + * restore any commit point. This policy will require unbounded disk space if + * there are continuing update transactions against the database. However, it is + * perfectly reasonable if you are using a write-once, read-many deployment + * strategy. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class ForeverRestorePolicy implements IRestorePolicy { + + /** + * {@inheritDoc} + * <p> + * This policy always returns ZERO to prevent backups from being released. + */ + @Override + public long getEarliestRestorableCommitPoint(HAJournal jnl) { + + return 0; + + } + +} Deleted: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HABackupManager.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HABackupManager.java 2013-03-15 20:31:28 UTC (rev 6998) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HABackupManager.java 2013-03-15 20:33:26 UTC (rev 6999) @@ -1,1222 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -package com.bigdata.journal.jini.ha; - -import java.io.File; -import java.io.IOException; -import java.net.BindException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.nio.ByteBuffer; -import java.rmi.Remote; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; - -import net.jini.config.Configuration; -import net.jini.config.ConfigurationException; -import net.jini.config.ConfigurationProvider; -import net.jini.core.entry.Entry; -import net.jini.core.lookup.ServiceItem; -import net.jini.core.lookup.ServiceRegistrar; -import net.jini.discovery.LookupDiscoveryManager; -import net.jini.lease.LeaseRenewalManager; -import net.jini.lookup.ServiceDiscoveryManager; - -import org.apache.log4j.Logger; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.data.ACL; - -import com.bigdata.Banner; -import com.bigdata.ha.HAGlue; -import com.bigdata.ha.HAPipelineGlue; -import com.bigdata.ha.QuorumPipelineImpl; -import com.bigdata.ha.QuorumService; -import com.bigdata.ha.msg.HARebuildRequest; -import com.bigdata.ha.msg.IHALogRequest; -import com.bigdata.ha.msg.IHALogRootBlocksRequest; -import com.bigdata.ha.msg.IHALogRootBlocksResponse; -import com.bigdata.ha.msg.IHARebuildRequest; -import com.bigdata.ha.msg.IHASendStoreResponse; -import com.bigdata.ha.msg.IHASyncRequest; -import com.bigdata.ha.msg.IHAWriteMessage; -import com.bigdata.ha.msg.IHAWriteSetStateRequest; -import com.bigdata.ha.msg.IHAWriteSetStateResponse; -import com.bigdata.io.SerializerUtil; -import com.bigdata.jini.start.config.ZookeeperClientConfig; -import com.bigdata.journal.IRootBlockView; -import com.bigdata.quorum.AbstractQuorumMember; -import com.bigdata.quorum.Quorum; -import com.bigdata.quorum.QuorumEvent; -import com.bigdata.quorum.QuorumException; -import com.bigdata.quorum.QuorumListener; -import com.bigdata.quorum.zk.QuorumBackupState; -import com.bigdata.quorum.zk.ZKQuorum; -import com.bigdata.quorum.zk.ZKQuorumImpl; -import com.bigdata.service.jini.JiniClient; -import com.bigdata.service.jini.JiniClientConfig; -import com.bigdata.util.concurrent.DaemonThreadFactory; -import com.bigdata.zookeeper.ZooKeeperAccessor; - -/** - * Service for making full and incremental backups. - * - * FIXME This is completely non-functional code. I was just experimenting - * with creating a standalone utility. A functional version should be - * derived by refactoring AbstractServer, HAJournalServer, etc. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - */ -public class HABackupManager { - - private static final Logger log = Logger.getLogger(HAJournalServer.class); - - /** - * Logger for HA events. - */ - private static final Logger haLog = Logger.getLogger("com.bigdata.haLog"); - - /** - * Configuration options for the {@link HAJournalServer}. - */ - public interface ConfigurationOptions -// extends AbstractServer.ConfigurationOptions - { - - String COMPONENT = HABackupManager.class.getName(); - - /** - * The target replication factor (k). - * - * @see HAJournalServer.ConfigurationOptions#REPLICATION_FACTOR - */ - String REPLICATION_FACTOR = HAJournalServer.ConfigurationOptions.REPLICATION_FACTOR; - - /** - * The {@link InetSocketAddress} at which the {@link HABackupManager} - * receives replicated writes. - * - * @see HAJournalServer.ConfigurationOptions#WRITE_PIPELINE_ADDR - */ - String WRITE_PIPELINE_ADDR = HAJournalServer.ConfigurationOptions.WRITE_PIPELINE_ADDR; - - /** - * The logical service identifier for the {@link HAJournalServer} - * replication cluster that that will be backed up by this service. - * - * @see HAJournalServer.ConfigurationOptions#LOGICAL_SERVICE_ID - */ - String LOGICAL_SERVICE_ID = HAJournalServer.ConfigurationOptions.LOGICAL_SERVICE_ID; - - } - - private LookupDiscoveryManager lookupDiscoveryManager; - - private ServiceDiscoveryManager serviceDiscoveryManager; - - /** - * The {@link Configuration} read based on the args[] provided when the - * server is started. - */ - protected Configuration config; - - /** - * The timeout in milliseconds to await the discovery of a service if there - * is a cache miss (default {@value #DEFAULT_CACHE_MISS_TIMEOUT}). - */ - final protected long cacheMissTimeout; - - /** - * A randomly generated {@link UUID} that this utility uses to identify - * itself. - */ - private final UUID serviceId = UUID.randomUUID(); - - /** - * The directory for the service. This is the directory within which the - * {@link #serviceIdFile} exists. A service MAY have its own concept of a - * data directory, log directory, etc. which can be somewhere else. - */ - private File serviceDir; - - /** - * Caching discovery client for the {@link HAGlue} services. - */ - private HAJournalDiscoveryClient discoveryClient; - - private ZookeeperClientConfig zkClientConfig; - - private ZooKeeperAccessor zka; - - private Quorum<HAGlue, QuorumService<HAGlue>> quorum; - - /** - * The znode name for the logical service. - * - * @see ConfigurationOptions#LOGICAL_SERVICE_ID - */ - private String logicalServiceId; - - /** - * The zpath for the logical service. - * - * @see ConfigurationOptions#LOGICAL_SERVICE_ID - */ - private String logicalServiceZPath; - - /** - * The {@link MyQuorumMember}. - */ - private MyQuorumMember quorumMember; - - /** - * An object used to manage jini service registrar discovery. - */ - public LookupDiscoveryManager getDiscoveryManagement() { - - return lookupDiscoveryManager; - - } - - /** - * An object used to lookup services using the discovered service registars. - */ - public ServiceDiscoveryManager getServiceDiscoveryManager() { - - return serviceDiscoveryManager; - - } - - /** - * Runs {@link AbstractServer#shutdownNow()} and terminates all asynchronous - * processing, including discovery. This is used for the shutdown hook (^C). - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private static class ShutdownThread extends Thread { - - public ShutdownThread() { - - super("shutdownThread"); - - setDaemon(true); - - } - - public void run() { - - /* - * FIXME Interrupt the backup (if any). It is logically empty until - * we write the root blocks. Those need to BOTH be written - * atomically (tricky) -or- just write the same root block twice - * (sneaky). - */ - - } - - } - - private HABackupManager(final String[] args) throws ConfigurationException { - - /* - * The runtime shutdown hook appears to be a robust way to handle ^C by - * providing a clean service termination. - * - * Note: This is setup before we start any async threads, including - * service discovery. - */ - Runtime.getRuntime().addShutdownHook(new ShutdownThread()); - - // Show the copyright banner during startup. - Banner.banner(); - - AbstractServer.setSecurityManager(); - - /* - * Display the banner. - * - * Note: This also installs the UncaughtExceptionHandler. - * - * @see https://sourceforge.net/apps/trac/bigdata/ticket/601 - */ - Banner.banner(); - - /* - * Read jini configuration & service properties - */ - - List<Entry> entries = null; - - final String COMPONENT = getClass().getName(); - final JiniClientConfig jiniClientConfig; - { - - config = ConfigurationProvider.getInstance(args); - - cacheMissTimeout = (Long) config.getEntry(COMPONENT, - AbstractServer.ConfigurationOptions.CACHE_MISS_TIMEOUT, Long.TYPE, - AbstractServer.ConfigurationOptions.DEFAULT_CACHE_MISS_TIMEOUT); - - jiniClientConfig = new JiniClientConfig( - JiniClientConfig.Options.NAMESPACE, config); - - // convert Entry[] to a mutable list. - entries = new LinkedList<Entry>( - Arrays.asList((Entry[]) jiniClientConfig.entries)); - - if (log.isInfoEnabled()) - log.info(jiniClientConfig.toString()); - - } - - /* - * Make sure that the parent directory exists. - * - * Note: the parentDir will be null if the serviceIdFile is in the - * root directory or if it is specified as a filename without any - * parents in the path expression. Note that the file names a file - * in the current working directory in the latter case and the root - * always exists in the former - and in both of those cases we do - * not have to create the parent directory. - */ - serviceDir = (File) config.getEntry(COMPONENT, - AbstractServer.ConfigurationOptions.SERVICE_DIR, File.class); - - if (serviceDir != null && !serviceDir.exists()) { - - log.warn("Creating: " + serviceDir); - - serviceDir.mkdirs(); - - } - - try { - - /* - * Note: This class will perform multicast discovery if ALL_GROUPS - * is specified and otherwise requires you to specify one or more - * unicast locators (URIs of hosts running discovery services). As - * an alternative, you can use LookupDiscovery, which always does - * multicast discovery. - */ - lookupDiscoveryManager = new LookupDiscoveryManager( - jiniClientConfig.groups, jiniClientConfig.locators, - null /* DiscoveryListener */, config); - - /* - * Setup a helper class that will be notified as services join or - * leave the various registrars to which the data server is - * listening. - */ - try { - - serviceDiscoveryManager = new ServiceDiscoveryManager( - lookupDiscoveryManager, new LeaseRenewalManager(), - config); - - } catch (IOException ex) { - - throw new RuntimeException( - "Could not initiate service discovery manager", ex); - - } - - } catch (IOException ex) { - - fatal("Could not setup discovery", ex); - throw new AssertionError();// keep the compiler happy. - - } catch (ConfigurationException ex) { - - fatal("Could not setup discovery", ex); - throw new AssertionError();// keep the compiler happy. - - } - - /* - * Create the service object. - */ - try { - - /* - * Note: By creating the service object here rather than outside of - * the constructor we potentially create problems for subclasses of - * AbstractServer since their own constructor will not have been - * executed yet. - * - * Some of those problems are worked around using a JiniClient to - * handle all aspects of service discovery (how this service locates - * the other services in the federation). - * - * Note: If you explicitly assign values to those clients when the - * fields are declared, e.g., [timestampServiceClient=null] then the - * ctor will overwrite the values set by [newService] since it is - * running before those initializations are performed. This is - * really crufty, may be JVM dependent, and needs to be refactored - * to avoid this subclass ctor init problem. - */ - - if (log.isInfoEnabled()) - log.info("Creating service impl..."); - - // init. -// impl = - newService(config); - -// if (log.isInfoEnabled()) -// log.info("Service impl is " + impl); - - } catch(Exception ex) { - - fatal("Could not start service: "+this, ex); - throw new AssertionError();// keeps compiler happy. - } - - } - - protected void fatal(String msg, Throwable t) { - - log.fatal(msg, t); - - terminate(); - - System.exit(1); - - } - - /** - * Terminates service management threads. - * <p> - * Subclasses which start additional service management threads SHOULD - * extend this method to terminate those threads. The implementation should - * be <strong>synchronized</strong>, should conditionally terminate each - * thread, and should trap, log, and ignore all errors. - */ - private void terminate() { - - if (log.isInfoEnabled()) - log.info("Terminating service management threads."); - -// if (joinManager != null) { -// -// try { -// -// joinManager.terminate(); -// -// } catch (Throwable ex) { -// -// log.error("Could not terminate the join manager: " + this, ex); -// -// } finally { -// -// joinManager = null; -// -// } -// -// } - - if (serviceDiscoveryManager != null) { - - serviceDiscoveryManager.terminate(); - - serviceDiscoveryManager = null; - - } - - if (lookupDiscoveryManager != null) { - - lookupDiscoveryManager.terminate(); - - lookupDiscoveryManager = null; - - } - - } - - protected void newService(final Configuration config) - throws Exception { - - /* - * Verify discovery of at least one ServiceRegistrar. - */ - { - final long begin = System.currentTimeMillis(); - - ServiceRegistrar[] registrars = null; - - long elapsed = 0; - - while ((registrars == null || registrars.length == 0) - && elapsed < TimeUnit.SECONDS.toMillis(10)) { - - registrars = getDiscoveryManagement().getRegistrars(); - - Thread.sleep(100/* ms */); - - elapsed = System.currentTimeMillis() - begin; - - } - - if (registrars == null || registrars.length == 0) { - - throw new RuntimeException( - "Could not discover ServiceRegistrar(s)"); - - } - - if (log.isInfoEnabled()) { - log.info("Found " + registrars.length + " service registrars"); - } - - } - -// // Setup discovery for HAGlue clients. -// final HAJournalDiscoveryClient discoveryClient = new HAJournalDiscoveryClient( -// getServiceDiscoveryManager(), -// null/* serviceDiscoveryListener */, cacheMissTimeout); - - /* - * Setup the Quorum. - */ - - zkClientConfig = new ZookeeperClientConfig(config); - - // znode name for the logical service. - final String logicalServiceId = (String) config.getEntry( - ConfigurationOptions.COMPONENT, - ConfigurationOptions.LOGICAL_SERVICE_ID, String.class); - - final String logicalServiceZPathPrefix = zkClientConfig.zroot + "/" - + HAJournalServer.class.getName(); - - // zpath for the logical service. - final String logicalServiceZPath = logicalServiceZPathPrefix + "/" - + logicalServiceId; - - final int replicationFactor = (Integer) config.getEntry( - ConfigurationOptions.COMPONENT, - ConfigurationOptions.REPLICATION_FACTOR, Integer.TYPE); - - { - - // The address at which this service exposes its write pipeline. - final InetSocketAddress writePipelineAddr = (InetSocketAddress) config - .getEntry(ConfigurationOptions.COMPONENT, - ConfigurationOptions.WRITE_PIPELINE_ADDR, - InetSocketAddress.class); - - /* - * Configuration properties for this HAJournal. - */ - final Properties properties = JiniClient.getProperties( - HAJournal.class.getName(), config); - - // Force the writePipelineAddr into the Properties. - properties.put(HAJournal.Options.WRITE_PIPELINE_ADDR, - writePipelineAddr); - - /* - * Zookeeper quorum. - */ - { - final List<ACL> acl = zkClientConfig.acl; - final String zoohosts = zkClientConfig.servers; - final int sessionTimeout = zkClientConfig.sessionTimeout; - - zka = new ZooKeeperAccessor(zoohosts, sessionTimeout); - - if (!zka.awaitZookeeperConnected(10, TimeUnit.SECONDS)) { - - throw new RuntimeException("Could not connect to zk"); - - } - - if (log.isInfoEnabled()) { - log.info("Connected to zookeeper"); - } - - /* - * Ensure key znodes exist. - */ - try { - zka.getZookeeper() - .create(zkClientConfig.zroot, - new byte[] {/* data */}, acl, - CreateMode.PERSISTENT); - } catch (NodeExistsException ex) { - // ignore. - } - try { - zka.getZookeeper() - .create(logicalServiceZPathPrefix, - new byte[] {/* data */}, acl, - CreateMode.PERSISTENT); - } catch (NodeExistsException ex) { - // ignore. - } - try { - zka.getZookeeper() - .create(logicalServiceZPath, - new byte[] {/* data */}, acl, - CreateMode.PERSISTENT); - } catch (NodeExistsException ex) { - // ignore. - } - - quorum = new ZKQuorumImpl<HAGlue, QuorumService<HAGlue>>( - replicationFactor, zka, acl); - } - -// // The HAJournal. -// this.journal = new HAJournal(properties, quorum); - - } - -// TODO executor for events received in the watcher thread. -// singleThreadExecutor = new LatchedExecutor( -// journal.getExecutorService(), 1/* nparallel */); - -// // our external interface. -// haGlueService = journal.newHAGlue(serviceUUID); -// -// // wrap the external interface, exposing administrative functions. -// final AdministrableHAGlueService administrableService = new AdministrableHAGlueService( -// this, haGlueService); -// -// // return that wrapped interface. -// return administrableService; - - } - - protected void startUp() throws IOException { - - if (log.isInfoEnabled()) - log.info("Starting server."); - - getQuorum().addListener(new QuorumListener() { - - @Override - public void notify(final QuorumEvent e) { - if (log.isTraceEnabled()) - log.trace(e); // TODO LOG @ TRACE - } - }); - - // Setup the quorum client (aka quorum service). - quorumMember = new MyQuorumMember(logicalServiceZPath, serviceId); - - } - - protected void backup() throws Exception { - - final long token = quorumMember.getQuorum().token(); - - quorumMember.getQuorum().assertQuorum(token); - - quorumMember.new BackupTask(token).call(); - - } - - /** - * - * A randomly generated {@link UUID} that this utility uses to identify - * itself. - */ - protected UUID getServiceId() { - return serviceId; - } - - protected Quorum<?,?> getQuorum() { - return quorum; - } - - private class MyQuorumMember extends AbstractQuorumMember<HAPipelineGlue> { - - private final QuorumPipelineImpl<HAPipelineGlue> pipelineImpl = new QuorumPipelineImpl<HAPipelineGlue>(this) { - - @Override - protected void handleReplicatedWrite(final IHASyncRequest req, - final IHAWriteMessage msg, final ByteBuffer data) - throws Exception { - - MyQuorumMember.this.handleReplicatedWrite(req, msg, data); - - } - - @Override - public UUID getStoreUUID() { - throw new UnsupportedOperationException(); - } - - @Override - public long getLastCommitTime() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public long getLastCommitCounter() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void logWriteCacheBlock(IHAWriteMessage msg, ByteBuffer data) - throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void logRootBlock(boolean isJoinedService,IRootBlockView rootBlock) - throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void purgeHALogs() { - // TODO Auto-generated method stub - - } - -// @Override -// public long getLastCommitTime() { -// -// return MyQuorumMember.this.getLastCommitTime(); -// -// } -// -// @Override -// public long getLastCommitCounter() { -// -// return MyQuorumMember.this.getLastCommitCounter(); -// -// } -// -// @Override -// public void logWriteCacheBlock(final IHAWriteMessage msg, -// final ByteBuffer data) throws IOException { -// -// MyQuorumMember.this.logWriteCacheBlock(msg, data); -// -// } -// -// @Override -// public void logRootBlock(final IRootBlockView rootBlock) -// throws IOException { -// -// MyQuorumMember.this.logRootBlock(rootBlock); -// -// } -// -// @Override -// public void purgeHALogs(final boolean includeCurrent) { -// -// MyQuorumMember.this.purgeHALogs(includeCurrent); -// -// } - - }; - - /** - * The local implementation of the {@link Remote} interface. - */ - private final HAPipelineGlue service; - -// /** -// * Simple service registrar. -// */ -// private final MockServiceRegistrar<S> registrar; - - /** - * The last lastCommitTime value around which a consensus was achieved - * and initially -1L, but this is cleared to -1L each time the consensus - * is lost. - */ - protected volatile long lastConsensusValue = -1L; - - /** - * The downstream service in the write pipeline. - */ - protected volatile UUID downStreamId = null; - - private volatile ExecutorService executorService = null; - - protected MyQuorumMember(final String logicalServiceId, - final UUID serviceId) throws IOException { - - super(logicalServiceId, serviceId); - - service = new MyQuorumService(); - - /* - * Delegates. - */ - - addListener(this.pipelineImpl); - - } - - @Override - public void start(final Quorum<?, ?> quorum) { - if (executorService == null) - executorService = Executors - .newSingleThreadExecutor(DaemonThreadFactory - .defaultThreadFactory()); - super.start(quorum); - } - - @Override - public void terminate() { - super.terminate(); - if(executorService!=null) { - executorService.shutdownNow(); - executorService = null; - } - } - - public Executor getExecutor() { - return executorService; - } - -// /** -// * Factory for the local service implementation object. -// */ -// abstract S newService(); - - public HAPipelineGlue getService() { - return service; - } - - /** - * Resolve an {@link HAGlue} object from its Service UUID. - */ - @Override - public HAGlue getService(final UUID serviceId) { - -// final HAJournalDiscoveryClient discoveryClient = -// .getDiscoveryClient(); - - final ServiceItem serviceItem = discoveryClient - .getServiceItem(serviceId); - - if (serviceItem == null) { - - // Not found (per the API). - throw new QuorumException("Service not found: uuid=" - + serviceId); - - } - - @SuppressWarnings("unchecked") - final HAGlue service = (HAGlue) serviceItem.service; - - return service; - - } - - /** - * {@inheritDoc} - * - * Overridden to save the <i>lastCommitTime</i> on - * {@link #lastConsensusValue}. - */ - @Override - public void consensus(long lastCommitTime) { - super.consensus(lastCommitTime); - this.lastConsensusValue = lastCommitTime; - } - - @Override - public void lostConsensus() { - super.lostConsensus(); - this.lastConsensusValue = -1L; - } - - /** - * {@inheritDoc} - * - * Overridden to save the current downstream service {@link UUID} on - * {@link #downStreamId} - */ - public void pipelineChange(final UUID oldDownStreamId, - final UUID newDownStreamId) { - super.pipelineChange(oldDownStreamId, newDownStreamId); - this.downStreamId = newDownStreamId; - } - - /** - * {@inheritDoc} - * - * Overridden to clear the {@link #downStreamId}. - */ - public void pipelineRemove() { - super.pipelineRemove(); - this.downStreamId = null; - } - - /** - * @see HAPipelineGlue - */ - protected void handleReplicatedWrite(IHASyncRequest req, - IHAWriteMessage msg, ByteBuffer data) throws Exception { - - // FIXME handle replicated writes! - - } - - /** - * Mock service class. - */ - class MyQuorumService implements HAPipelineGlue { - - private final InetSocketAddress addrSelf; - - public MyQuorumService() throws IOException { - this.addrSelf = new InetSocketAddress(getPort(0)); - } - - public InetSocketAddress getWritePipelineAddr() { - return addrSelf; - } - - /** - * @todo This is not fully general purpose since it is not strictly - * forbidden that the service's lastCommitTime could change, e.g., - * due to explicit intervention, and hence be updated across this - * operation. The real implemention should be a little more - * sophisticated. - */ - public Future<Void> moveToEndOfPipeline() throws IOException { - final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() { - public void run() { - - // note the current vote (if any). - final Long lastCommitTime = getQuorum().getCastVote( - getServiceId()); - - if (isPipelineMember()) { - -// System.err -// .println("Will remove self from the pipeline: " -// + getServiceId()); - - getActor().pipelineRemove(); - -// System.err -// .println("Will add self back into the pipeline: " -// + getServiceId()); - - getActor().pipelineAdd(); - - if (lastCommitTime != null) { - -// System.err -// .println("Will cast our vote again: lastCommitTime=" -// + +lastCommitTime -// + ", " -// + getServiceId()); - - getActor().castVote(lastCommitTime); - - } - - } - } - }, null/* result */); - getExecutor().execute(ft); - return ft; - } - - @Override - public Future<Void> receiveAndReplicate(final IHASyncRequest req, - IHAWriteMessage msg) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public IHALogRootBlocksResponse getHALogRootBlocksForWriteSet( - IHALogRootBlocksRequest msg) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public Future<Void> sendHALogForWriteSet(IHALogRequest msg) - throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public Future<IHASendStoreResponse> sendHAStore(IHARebuildRequest msg) - throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public IHAWriteSetStateResponse getHAWriteSetState( - IHAWriteSetStateRequest req) { - throw new UnsupportedOperationException(); - } - - } - - /** - * - * Create a full backup. - */ - private class BackupTask implements Callable<Void> { - - /** - * The quorum token in effect when we began the resync. - */ - private final long token; - - /** - * The quorum leader. This is fixed until the quorum breaks. - */ - private final HAGlue leader; - - public BackupTask(final long token) { - - // run while quorum is met. - this.token = token; - - // The leader for that met quorum (RMI interface). - leader = (... [truncated message content] |