From: <mar...@us...> - 2013-09-12 17:08:07
|
Revision: 7401 http://bigdata.svn.sourceforge.net/bigdata/?rev=7401&view=rev Author: martyncutcher Date: 2013-09-12 17:07:59 +0000 (Thu, 12 Sep 2013) Log Message: ----------- Amend HALogNexus to add an accessor increment pattern to guard against file removal - initially used by DumpLogDigest. And amend DumpLogDigest to access all joined services to determine the union of the range of halogs to test. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-09-12 17:03:57 UTC (rev 7400) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-09-12 17:07:59 UTC (rev 7401) @@ -563,7 +563,7 @@ /** * Close the file (does not flush). - */ + */ private void close() throws IOException { // Note: caller owns m_stateLock! try { if (m_state != null) { @@ -611,7 +611,7 @@ * * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/695"> * HAJournalServer reports "follower" but is in SeekConsensus and is - * not participating in commits\xA7</a> + * not participating in commits</a> */ private void remove() throws IOException { @@ -817,8 +817,8 @@ // One less reader/writer. --m_accessors; if (m_accessors == 0) { - if (haLog.isDebugEnabled()) - haLog.debug("Closing file"); + if (haLog.isInfoEnabled()) + haLog.info("Closing file", new StackInfoReport()); /* * Note: Close the RandomAccessFile rather than the * FileChannel. Potential fix for leaking open file @@ -938,7 +938,7 @@ // Note: Must be synchronized for visibility and atomicity! synchronized (m_state) { - m_state.m_accessors++; + m_state.m_accessors++; } @@ -1051,7 +1051,7 @@ */ synchronized(m_state) { - if(m_state.m_accessors == 0) { + if (m_state.m_accessors == 0) { /** * TODO This is a bit of a hack. The problem is that @@ -1070,9 +1070,9 @@ } - m_state.close(); + m_state.close(); + } } - } } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java 2013-09-12 17:03:57 UTC (rev 7400) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java 2013-09-12 17:07:59 UTC (rev 7401) @@ -118,99 +118,146 @@ return dump(serviceRoot, DEFAULT_BATCH, DEFAULT_SERVICE_THREADS); } - public Iterator<ServiceLogs> dump(final String serviceRoot, final int batchSize, final int serviceThreads) throws IOException, ExecutionException { - try { - // wait for zk services to register! - Thread.sleep(1000); - - List<HAGlue> services = services(serviceRoot); - - if (services.isEmpty()) - throw new IllegalArgumentException("No services found for " + serviceRoot); - - // Start by grabbing a nominal service to pin the logs - final HAGlue pinner = services.get(0); - - final LogDigestParams params = pinner.submit(new PinLogs(), false).get(); - - if (log.isInfoEnabled()) - log.info("Pinning startCC: " + params.startCC + ", endCC: " + params.endCC + ", last snapshot: " + params.snapshotCC); - - /** - * Now access serviceIDs so that we can use discovery to gain HAGlue interface. - * - * Submit all requests for concurrent processing, then add results - */ - List<Future<List<HALogInfo>>> results = new ArrayList<Future<List<HALogInfo>>>(); - long batchStart = params.startCC; - long batchEnd = batchStart + batchSize - 1; - int tasks = 0; - while (true) { - if (batchEnd > params.endCC) - batchEnd = params.endCC; - - if (log.isInfoEnabled()) - log.info("Running batch start: " + batchStart + ", end: " + batchEnd + " across " + services); - - for (final HAGlue glue : services) { - - results.add(glue.submit(new GetLogInfo(batchStart, batchEnd, serviceThreads), false)); - - tasks++; - } - - if (batchEnd == params.endCC) - break; - - batchStart += batchSize; - batchEnd += batchSize; - } - - final ArrayList<ServiceLogWait> logs = new ArrayList<ServiceLogWait>(); - for (int t = 0; t < tasks; t++) { - final int s = t % services.size(); - logs.add(new ServiceLogWait(services.get(s).getServiceUUID().toString(), results.get(t), s, services.size())); - } - - // now submit task to release the pinning transaction and wait for it to complete - pinner.submit(new UnpinLogs(params.tx), false).get(); - - // return an Iterator blocking on the Future value of the next source item before - // creating a return value - return new Iterator<ServiceLogs>() { - final Iterator<ServiceLogWait> src = logs.iterator(); - - @Override - public boolean hasNext() { - return src.hasNext(); - } + public Iterator<ServiceLogs> dump(final String serviceRoot, + final int batchSize, final int serviceThreads) throws IOException, + ExecutionException { + try { + // wait for zk services to register should no longer be necessary + // Thread.sleep(1000); - @Override - public ServiceLogs next() { - final ServiceLogWait data = src.next(); + // retrieve a list of joined services + List<HAGlue> services = services(serviceRoot); + + if (services.isEmpty()) + throw new IllegalArgumentException("No services found for " + + serviceRoot); + + // Retrieve a LogDigestParmas for each service with a PinLogs task + // Retrieve in sequential order and use a try finally pattern to + // invoke UnpinLogs for everything that was pinned. + // + // Note that the new accessor pattern is used to increment the + // access count in GetLogInfo so this ensures that while concurrent + // digest tasks are running the logs will not be removed. + final ArrayList<HAGlue> pinners = new ArrayList<HAGlue>(); + try { + + long startCC = -1; + long endCC = -1; + + for (HAGlue pinner : services) { + final LogDigestParams params = pinner.submit(new PinLogs(), + false).get(); + + if (log.isInfoEnabled()) + log.info("Pinning startCC: " + params.startCC + ", endCC: " + + params.endCC + ", last snapshot: " + + params.snapshotCC); - try { - // This will block on the future.get() - return new ServiceLogs(data.service, data.waitlogInfos.get(), data.item, data.batch); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); + if (params.startCC != -1) { // there are logs available + if (startCC == -1 || startCC > params.startCC){ + startCC = params.startCC; + } + if (endCC < params.endCC){ + endCC = params.endCC; + } } + + // only added if PinLogs is successful + pinners.add(pinner); // add as pinner to be unpinned later } + + /** + * Now access serviceIDs so that we can use discovery to gain + * HAGlue interface. + * + * Submit all requests for concurrent processing, then add + * results + */ + List<Future<List<HALogInfo>>> results = new ArrayList<Future<List<HALogInfo>>>(); + long batchStart = startCC; + long batchEnd = batchStart + batchSize - 1; + int tasks = 0; + while (true) { + if (batchEnd > endCC) + batchEnd = endCC; - @Override - public void remove() { - throw new UnsupportedOperationException(); + if (log.isInfoEnabled()) + log.info("Running batch start: " + batchStart + + ", end: " + batchEnd + " across " + services); + + for (final HAGlue glue : services) { + + results.add(glue.submit(new GetLogInfo(batchStart, + batchEnd, serviceThreads), false)); + + tasks++; + } + + if (batchEnd == endCC) + break; + + batchStart += batchSize; + batchEnd += batchSize; } - }; - + + final ArrayList<ServiceLogWait> logs = new ArrayList<ServiceLogWait>(); + for (int t = 0; t < tasks; t++) { + final int s = t % services.size(); + logs.add(new ServiceLogWait(services.get(s) + .getServiceUUID().toString(), results.get(t), s, + services.size())); + } + + // return an Iterator blocking on the Future value of the next + // source item before + // creating a return value + return new Iterator<ServiceLogs>() { + final Iterator<ServiceLogWait> src = logs.iterator(); + + @Override + public boolean hasNext() { + return src.hasNext(); + } + + @Override + public ServiceLogs next() { + final ServiceLogWait data = src.next(); + + try { + // This will block on the future.get() + return new ServiceLogs(data.service, + data.waitlogInfos.get(), data.item, + data.batch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + + } finally { + for (final HAGlue pinner : pinners) { + try { + pinner.submit(new UnpinLogs(), false); + } catch (Throwable t) { + log.error("Problem submitting UnpinLogs", t); + } + } + + } } catch (InterruptedException e) { throw new RuntimeException(e); } catch (KeeperException e) { throw new RuntimeException(e); } - } + } /** * LogDigestParams with PinLogs and UnpinLogs tasks ensure that @@ -226,13 +273,11 @@ */ @SuppressWarnings("serial") static public class LogDigestParams implements Serializable { - final public long tx; final public long startCC; final public long endCC; final public long snapshotCC; - LogDigestParams(final long tx, final long startCC, final long endCC, long sscc) { - this.tx = tx; + LogDigestParams(final long startCC, final long endCC, long sscc) { this.startCC = startCC; this.endCC = endCC; this.snapshotCC = sscc; @@ -246,10 +291,6 @@ public LogDigestParams call() throws Exception { final HAJournal ha = (HAJournal) this.getIndexManager(); - final ITransactionService ts = ha.getTransactionService(); - final long relTime = ts.getReleaseTime(); - final long tx = ts.newTx(relTime+1); - final HALogNexus nexus = ha.getHALogNexus(); Iterator<IHALogRecord> logs = nexus.getHALogs(); final long startCC; @@ -264,28 +305,23 @@ final ISnapshotRecord rec = ssmgr.getNewestSnapshot(); final long sscc = rec != null ? rec.getCommitCounter() : -1; + nexus.addAccessor(); + // return new LogDigestParams(tx, startCC-3, endCC+3); // try asking for more logs than available - return new LogDigestParams(tx, startCC, endCC, sscc); + return new LogDigestParams(startCC, endCC, sscc); } } @SuppressWarnings("serial") static class UnpinLogs extends IndexManagerCallable<Void> { - long tx; - - UnpinLogs(long tx) { - this.tx = tx; - } - - @Override + + @Override public Void call() throws Exception { final HAJournal ha = (HAJournal) this.getIndexManager(); - final ITransactionService ts = ha.getTransactionService(); + ha.getHALogNexus().releaseAccessor(); - ts.abort(tx); - return null; } @@ -322,66 +358,70 @@ HAJournal ha = (HAJournal) this.getIndexManager(); final HALogNexus nexus = ha.getHALogNexus(); - nexus.protectDigest(startCC); + nexus.addAccessor(); try { - long openCC = nexus.getCommitCounter(); - log.warn("Open Commit Counter: " + openCC + ", startCC: " + startCC + ", endCC: " + endCC); - - /** - * Submit each computation as task to pooled executor service - say maximum of - * five threads - */ - final ThreadPoolExecutor es = (ThreadPoolExecutor) Executors - .newFixedThreadPool(serviceThreads); + long openCC = nexus.getCommitCounter(); + log.warn("Open Commit Counter: " + openCC + ", startCC: " + + startCC + ", endCC: " + endCC); - final List<Future<Void>> results = new ArrayList<Future<Void>>(); - - for (long cc = startCC; cc <= endCC; cc++) { - final long cur = cc; - - final Future<Void> res = es.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - try { - final File file = nexus.getHALogFile(cur); - - log.warn("Found log file: " + file.getName()); - - // compute file digest - final IHALogReader r = nexus.getReader(cur); - - final MessageDigest digest = MessageDigest.getInstance("MD5"); - - r.computeDigest(digest); - - infos.add(new HALogInfo(cur, r.isLive(), digest.digest())); - } catch (FileNotFoundException fnf) { - // permitted - infos.add(new HALogInfo(cur, false, null /*digest*/)); - } catch (Throwable t) { - log.warn("Unexpected error", t); - - // FIXME: what to do here? - infos.add(new HALogInfo(cur, false, "ERROR".getBytes())); + /** + * Submit each computation as task to pooled executor service - + * say maximum of five threads + */ + final ThreadPoolExecutor es = (ThreadPoolExecutor) Executors + .newFixedThreadPool(serviceThreads); + + final List<Future<Void>> results = new ArrayList<Future<Void>>(); + + for (long cc = startCC; cc <= endCC; cc++) { + final long cur = cc; + + final Future<Void> res = es.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + try { + final File file = nexus.getHALogFile(cur); + + log.warn("Found log file: " + file.getName()); + + // compute file digest + final IHALogReader r = nexus.getReader(cur); + + final MessageDigest digest = MessageDigest + .getInstance("MD5"); + + r.computeDigest(digest); + + infos.add(new HALogInfo(cur, r.isLive(), digest + .digest())); + } catch (FileNotFoundException fnf) { + // permitted + infos.add(new HALogInfo(cur, false, null /* digest */)); + } catch (Throwable t) { + log.warn("Unexpected error", t); + + // FIXME: what to do here? + infos.add(new HALogInfo(cur, false, "ERROR" + .getBytes())); + } + + return null; } - - return null; - } - - }); - - results.add(res); - } - - for (Future<Void> res : results) { - res.get(); - } - - es.shutdown(); - - return new ArrayList<HALogInfo>(infos); + + }); + + results.add(res); + } + + for (Future<Void> res : results) { + res.get(); + } + + es.shutdown(); + + return new ArrayList<HALogInfo>(infos); } finally { - nexus.releaseProtectDigest(); + nexus.releaseAccessor(); } } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-09-12 17:03:57 UTC (rev 7400) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-09-12 17:07:59 UTC (rev 7401) @@ -3696,7 +3696,7 @@ }); // runWithBarrierLock() if (haLog.isInfoEnabled()) - haLog.info("TRANSITION", new RuntimeException()); + haLog.info("TRANSITION", new StackInfoReport()); // Transition to RunMet. enterRunState(new RunMetTask(token, leaderId)); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2013-09-12 17:03:57 UTC (rev 7400) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HALogNexus.java 2013-09-12 17:07:59 UTC (rev 7401) @@ -32,6 +32,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -120,7 +121,7 @@ * Set to protect log files against deletion while a digest is * computed. This is checked by deleteHALogs. */ - private final AtomicLong digestLog = new AtomicLong(-1L); + private final AtomicInteger logAccessors = new AtomicInteger(); /** * Filter visits all HALog files <strong>except</strong> the current HALog @@ -819,15 +820,21 @@ * Protects logs from removal while a digest is being computed * @param earliestDigest */ - void protectDigest(final long earliestDigest) { - digestLog.set(earliestDigest); + void addAccessor() { + if (logAccessors.incrementAndGet() == 1) { + if (log.isInfoEnabled()) + log.info("Access protection added"); + } } /** * Releases current protection against log removal */ - void releaseProtectDigest() { - digestLog.set(-1L); + void releaseAccessor() { + if (logAccessors.decrementAndGet() == 0) { + if (log.isInfoEnabled()) + log.info("Access protection removed"); + } } /** @@ -848,19 +855,13 @@ final Iterator<IHALogRecord> itr = getHALogs(); - while(itr.hasNext()) { + while(itr.hasNext() && logAccessors.get() == 0) { final IHALogRecord r = itr.next(); final long closingCommitCounter = r.getCommitCounter(); - final boolean deleteFile; - if (closingCommitCounter < earliestRetainedSnapshotCommitCounter) { - // now check if protected by the digestLog field (set to -1 if not active) - deleteFile = digestLog.get() == -1 || closingCommitCounter < digestLog.get(); - } else { - deleteFile = false; - } + final boolean deleteFile = closingCommitCounter < earliestRetainedSnapshotCommitCounter; if (!deleteFile) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |