|
From: <mar...@us...> - 2013-09-12 17:08:07
|
Revision: 7401
http://bigdata.svn.sourceforge.net/bigdata/?rev=7401&view=rev
Author: martyncutcher
Date: 2013-09-12 17:07:59 +0000 (Thu, 12 Sep 2013)
Log Message:
-----------
Amend HALogNexus to add an accessor increment pattern to guard against file removal - initially used by DumpLogDigest.
And amend DumpLogDigest to access all joined services to determine the union of the range of halogs to test.
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-09-12 17:03:57 UTC (rev 7400)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-09-12 17:07:59 UTC (rev 7401)
@@ -563,7 +563,7 @@
/**
* Close the file (does not flush).
- */
+ */
private void close() throws IOException { // Note: caller owns m_stateLock!
try {
if (m_state != null) {
@@ -611,7 +611,7 @@
*
* @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/695">
* HAJournalServer reports "follower" but is in SeekConsensus and is
- * not participating in commits\xA7</a>
+ * not participating in commits</a>
*/
private void remove() throws IOException {
@@ -817,8 +817,8 @@
// One less reader/writer.
--m_accessors;
if (m_accessors == 0) {
- if (haLog.isDebugEnabled())
- haLog.debug("Closing file");
+ if (haLog.isInfoEnabled())
+ haLog.info("Closing file", new StackInfoReport());
/*
* Note: Close the RandomAccessFile rather than the
* FileChannel. Potential fix for leaking open file
@@ -938,7 +938,7 @@
// Note: Must be synchronized for visibility and atomicity!
synchronized (m_state) {
- m_state.m_accessors++;
+ m_state.m_accessors++;
}
@@ -1051,7 +1051,7 @@
*/
synchronized(m_state) {
- if(m_state.m_accessors == 0) {
+ if (m_state.m_accessors == 0) {
/**
* TODO This is a bit of a hack. The problem is that
@@ -1070,9 +1070,9 @@
}
- m_state.close();
+ m_state.close();
+ }
}
- }
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java 2013-09-12 17:03:57 UTC (rev 7400)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java 2013-09-12 17:07:59 UTC (rev 7401)
@@ -118,99 +118,146 @@
return dump(serviceRoot, DEFAULT_BATCH, DEFAULT_SERVICE_THREADS);
}
- public Iterator<ServiceLogs> dump(final String serviceRoot, final int batchSize, final int serviceThreads) throws IOException, ExecutionException {
- try {
- // wait for zk services to register!
- Thread.sleep(1000);
-
- List<HAGlue> services = services(serviceRoot);
-
- if (services.isEmpty())
- throw new IllegalArgumentException("No services found for " + serviceRoot);
-
- // Start by grabbing a nominal service to pin the logs
- final HAGlue pinner = services.get(0);
-
- final LogDigestParams params = pinner.submit(new PinLogs(), false).get();
-
- if (log.isInfoEnabled())
- log.info("Pinning startCC: " + params.startCC + ", endCC: " + params.endCC + ", last snapshot: " + params.snapshotCC);
-
- /**
- * Now access serviceIDs so that we can use discovery to gain HAGlue interface.
- *
- * Submit all requests for concurrent processing, then add results
- */
- List<Future<List<HALogInfo>>> results = new ArrayList<Future<List<HALogInfo>>>();
- long batchStart = params.startCC;
- long batchEnd = batchStart + batchSize - 1;
- int tasks = 0;
- while (true) {
- if (batchEnd > params.endCC)
- batchEnd = params.endCC;
-
- if (log.isInfoEnabled())
- log.info("Running batch start: " + batchStart + ", end: " + batchEnd + " across " + services);
-
- for (final HAGlue glue : services) {
-
- results.add(glue.submit(new GetLogInfo(batchStart, batchEnd, serviceThreads), false));
-
- tasks++;
- }
-
- if (batchEnd == params.endCC)
- break;
-
- batchStart += batchSize;
- batchEnd += batchSize;
- }
-
- final ArrayList<ServiceLogWait> logs = new ArrayList<ServiceLogWait>();
- for (int t = 0; t < tasks; t++) {
- final int s = t % services.size();
- logs.add(new ServiceLogWait(services.get(s).getServiceUUID().toString(), results.get(t), s, services.size()));
- }
-
- // now submit task to release the pinning transaction and wait for it to complete
- pinner.submit(new UnpinLogs(params.tx), false).get();
-
- // return an Iterator blocking on the Future value of the next source item before
- // creating a return value
- return new Iterator<ServiceLogs>() {
- final Iterator<ServiceLogWait> src = logs.iterator();
-
- @Override
- public boolean hasNext() {
- return src.hasNext();
- }
+ public Iterator<ServiceLogs> dump(final String serviceRoot,
+ final int batchSize, final int serviceThreads) throws IOException,
+ ExecutionException {
+ try {
+ // wait for zk services to register should no longer be necessary
+ // Thread.sleep(1000);
- @Override
- public ServiceLogs next() {
- final ServiceLogWait data = src.next();
+ // retrieve a list of joined services
+ List<HAGlue> services = services(serviceRoot);
+
+ if (services.isEmpty())
+ throw new IllegalArgumentException("No services found for "
+ + serviceRoot);
+
+ // Retrieve a LogDigestParmas for each service with a PinLogs task
+ // Retrieve in sequential order and use a try finally pattern to
+ // invoke UnpinLogs for everything that was pinned.
+ //
+ // Note that the new accessor pattern is used to increment the
+ // access count in GetLogInfo so this ensures that while concurrent
+ // digest tasks are running the logs will not be removed.
+ final ArrayList<HAGlue> pinners = new ArrayList<HAGlue>();
+ try {
+
+ long startCC = -1;
+ long endCC = -1;
+
+ for (HAGlue pinner : services) {
+ final LogDigestParams params = pinner.submit(new PinLogs(),
+ false).get();
+
+ if (log.isInfoEnabled())
+ log.info("Pinning startCC: " + params.startCC + ", endCC: "
+ + params.endCC + ", last snapshot: "
+ + params.snapshotCC);
- try {
- // This will block on the future.get()
- return new ServiceLogs(data.service, data.waitlogInfos.get(), data.item, data.batch);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
+ if (params.startCC != -1) { // there are logs available
+ if (startCC == -1 || startCC > params.startCC){
+ startCC = params.startCC;
+ }
+ if (endCC < params.endCC){
+ endCC = params.endCC;
+ }
}
+
+ // only added if PinLogs is successful
+ pinners.add(pinner); // add as pinner to be unpinned later
}
+
+ /**
+ * Now access serviceIDs so that we can use discovery to gain
+ * HAGlue interface.
+ *
+ * Submit all requests for concurrent processing, then add
+ * results
+ */
+ List<Future<List<HALogInfo>>> results = new ArrayList<Future<List<HALogInfo>>>();
+ long batchStart = startCC;
+ long batchEnd = batchStart + batchSize - 1;
+ int tasks = 0;
+ while (true) {
+ if (batchEnd > endCC)
+ batchEnd = endCC;
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
+ if (log.isInfoEnabled())
+ log.info("Running batch start: " + batchStart
+ + ", end: " + batchEnd + " across " + services);
+
+ for (final HAGlue glue : services) {
+
+ results.add(glue.submit(new GetLogInfo(batchStart,
+ batchEnd, serviceThreads), false));
+
+ tasks++;
+ }
+
+ if (batchEnd == endCC)
+ break;
+
+ batchStart += batchSize;
+ batchEnd += batchSize;
}
- };
-
+
+ final ArrayList<ServiceLogWait> logs = new ArrayList<ServiceLogWait>();
+ for (int t = 0; t < tasks; t++) {
+ final int s = t % services.size();
+ logs.add(new ServiceLogWait(services.get(s)
+ .getServiceUUID().toString(), results.get(t), s,
+ services.size()));
+ }
+
+ // return an Iterator blocking on the Future value of the next
+ // source item before
+ // creating a return value
+ return new Iterator<ServiceLogs>() {
+ final Iterator<ServiceLogWait> src = logs.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return src.hasNext();
+ }
+
+ @Override
+ public ServiceLogs next() {
+ final ServiceLogWait data = src.next();
+
+ try {
+ // This will block on the future.get()
+ return new ServiceLogs(data.service,
+ data.waitlogInfos.get(), data.item,
+ data.batch);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ } finally {
+ for (final HAGlue pinner : pinners) {
+ try {
+ pinner.submit(new UnpinLogs(), false);
+ } catch (Throwable t) {
+ log.error("Problem submitting UnpinLogs", t);
+ }
+ }
+
+ }
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
- }
+ }
/**
* LogDigestParams with PinLogs and UnpinLogs tasks ensure that
@@ -226,13 +273,11 @@
*/
@SuppressWarnings("serial")
static public class LogDigestParams implements Serializable {
- final public long tx;
final public long startCC;
final public long endCC;
final public long snapshotCC;
- LogDigestParams(final long tx, final long startCC, final long endCC, long sscc) {
- this.tx = tx;
+ LogDigestParams(final long startCC, final long endCC, long sscc) {
this.startCC = startCC;
this.endCC = endCC;
this.snapshotCC = sscc;
@@ -246,10 +291,6 @@
public LogDigestParams call() throws Exception {
final HAJournal ha = (HAJournal) this.getIndexManager();
- final ITransactionService ts = ha.getTransactionService();
- final long relTime = ts.getReleaseTime();
- final long tx = ts.newTx(relTime+1);
-
final HALogNexus nexus = ha.getHALogNexus();
Iterator<IHALogRecord> logs = nexus.getHALogs();
final long startCC;
@@ -264,28 +305,23 @@
final ISnapshotRecord rec = ssmgr.getNewestSnapshot();
final long sscc = rec != null ? rec.getCommitCounter() : -1;
+ nexus.addAccessor();
+
// return new LogDigestParams(tx, startCC-3, endCC+3); // try asking for more logs than available
- return new LogDigestParams(tx, startCC, endCC, sscc);
+ return new LogDigestParams(startCC, endCC, sscc);
}
}
@SuppressWarnings("serial")
static class UnpinLogs extends IndexManagerCallable<Void> {
- long tx;
-
- UnpinLogs(long tx) {
- this.tx = tx;
- }
-
- @Override
+
+ @Override
public Void call() throws Exception {
final HAJournal ha = (HAJournal) this.getIndexManager();
- final ITransactionService ts = ha.getTransactionService();
+ ha.getHALogNexus().releaseAccessor();
- ts.abort(tx);
-
return null;
}
@@ -322,66 +358,70 @@
HAJournal ha = (HAJournal) this.getIndexManager();
final HALogNexus nexus = ha.getHALogNexus();
- nexus.protectDigest(startCC);
+ nexus.addAccessor();
try {
- long openCC = nexus.getCommitCounter();
- log.warn("Open Commit Counter: " + openCC + ", startCC: " + startCC + ", endCC: " + endCC);
-
- /**
- * Submit each computation as task to pooled executor service - say maximum of
- * five threads
- */
- final ThreadPoolExecutor es = (ThreadPoolExecutor) Executors
- .newFixedThreadPool(serviceThreads);
+ long openCC = nexus.getCommitCounter();
+ log.warn("Open Commit Counter: " + openCC + ", startCC: "
+ + startCC + ", endCC: " + endCC);
- final List<Future<Void>> results = new ArrayList<Future<Void>>();
-
- for (long cc = startCC; cc <= endCC; cc++) {
- final long cur = cc;
-
- final Future<Void> res = es.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- try {
- final File file = nexus.getHALogFile(cur);
-
- log.warn("Found log file: " + file.getName());
-
- // compute file digest
- final IHALogReader r = nexus.getReader(cur);
-
- final MessageDigest digest = MessageDigest.getInstance("MD5");
-
- r.computeDigest(digest);
-
- infos.add(new HALogInfo(cur, r.isLive(), digest.digest()));
- } catch (FileNotFoundException fnf) {
- // permitted
- infos.add(new HALogInfo(cur, false, null /*digest*/));
- } catch (Throwable t) {
- log.warn("Unexpected error", t);
-
- // FIXME: what to do here?
- infos.add(new HALogInfo(cur, false, "ERROR".getBytes()));
+ /**
+ * Submit each computation as task to pooled executor service -
+ * say maximum of five threads
+ */
+ final ThreadPoolExecutor es = (ThreadPoolExecutor) Executors
+ .newFixedThreadPool(serviceThreads);
+
+ final List<Future<Void>> results = new ArrayList<Future<Void>>();
+
+ for (long cc = startCC; cc <= endCC; cc++) {
+ final long cur = cc;
+
+ final Future<Void> res = es.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ final File file = nexus.getHALogFile(cur);
+
+ log.warn("Found log file: " + file.getName());
+
+ // compute file digest
+ final IHALogReader r = nexus.getReader(cur);
+
+ final MessageDigest digest = MessageDigest
+ .getInstance("MD5");
+
+ r.computeDigest(digest);
+
+ infos.add(new HALogInfo(cur, r.isLive(), digest
+ .digest()));
+ } catch (FileNotFoundException fnf) {
+ // permitted
+ infos.add(new HALogInfo(cur, false, null /* digest */));
+ } catch (Throwable t) {
+ log.warn("Unexpected error", t);
+
+ // FIXME: what to do here?
+ infos.add(new HALogInfo(cur, false, "ERROR"
+ .getBytes()));
+ }
+
+ return null;
}
-
- return null;
- }
-
- });
-
- results.add(res);
- }
-
- for (Future<Void> res : results) {
- res.get();
- }
-
- es.shutdown();
-
- return new ArrayList<HALogInfo>(infos);
+
+ });
+
+ results.add(res);
+ }
+
+ for (Future<Void> res : results) {
+ res.get();
+ }
+
+ es.shutdown();
+
+ return new ArrayList<HALogInfo>(infos);
} finally {
- nexus.releaseProtectDigest();
+ nexus.releaseAccessor();
}
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-09-12 17:03:57 UTC (rev 7400)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-09-12 17:07:59 UTC (rev 7401)
@@ -3696,7 +3696,7 @@
}); // runWithBarrierLock()
if (haLog.isInfoEnabled())
- haLog.info("TRANSITION", new RuntimeException());
+ haLog.info("TRANSITION", new StackInfoReport());
// Transition to RunMet.
enterRunState(new RunMetTask(token, leaderId));
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2013-09-12 17:03:57 UTC (rev 7400)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2013-09-12 17:07:59 UTC (rev 7401)
@@ -32,6 +32,7 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -120,7 +121,7 @@
* Set to protect log files against deletion while a digest is
* computed. This is checked by deleteHALogs.
*/
- private final AtomicLong digestLog = new AtomicLong(-1L);
+ private final AtomicInteger logAccessors = new AtomicInteger();
/**
* Filter visits all HALog files <strong>except</strong> the current HALog
@@ -819,15 +820,21 @@
* Protects logs from removal while a digest is being computed
* @param earliestDigest
*/
- void protectDigest(final long earliestDigest) {
- digestLog.set(earliestDigest);
+ void addAccessor() {
+ if (logAccessors.incrementAndGet() == 1) {
+ if (log.isInfoEnabled())
+ log.info("Access protection added");
+ }
}
/**
* Releases current protection against log removal
*/
- void releaseProtectDigest() {
- digestLog.set(-1L);
+ void releaseAccessor() {
+ if (logAccessors.decrementAndGet() == 0) {
+ if (log.isInfoEnabled())
+ log.info("Access protection removed");
+ }
}
/**
@@ -848,19 +855,13 @@
final Iterator<IHALogRecord> itr = getHALogs();
- while(itr.hasNext()) {
+ while(itr.hasNext() && logAccessors.get() == 0) {
final IHALogRecord r = itr.next();
final long closingCommitCounter = r.getCommitCounter();
- final boolean deleteFile;
- if (closingCommitCounter < earliestRetainedSnapshotCommitCounter) {
- // now check if protected by the digestLog field (set to -1 if not active)
- deleteFile = digestLog.get() == -1 || closingCommitCounter < digestLog.get();
- } else {
- deleteFile = false;
- }
+ final boolean deleteFile = closingCommitCounter < earliestRetainedSnapshotCommitCounter;
if (!deleteFile) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|