From: <mar...@us...> - 2013-01-23 14:21:23
|
Revision: 6818 http://bigdata.svn.sourceforge.net/bigdata/?rev=6818&view=rev Author: martyncutcher Date: 2013-01-23 14:21:12 +0000 (Wed, 23 Jan 2013) Log Message: ----------- sync prior to review Modified Paths: -------------- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/WriteExecutorService.java branches/READ_CACHE/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/READ_CACHE/bigdata/src/test/com/bigdata/io/writecache/TestRWWriteCacheService.java branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestJournalShutdown.java branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -34,6 +34,7 @@ import java.security.DigestException; import java.security.MessageDigest; import java.util.Formatter; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -162,7 +163,7 @@ // protected by m_writeLock private long m_writePosition = -1; - private int m_sequence = 0; + private AtomicLong m_sequence = new AtomicLong(0); /** * This constructor is called by the log manager to create the file. A @@ -340,7 +341,7 @@ + m_openRootBlock.getLastCommitTime() + ", but msg=" + msg); - if (m_sequence != msg.getSequence()) + if (m_sequence.get() != msg.getSequence()) throw new IllegalStateException("nextSequence=" + m_sequence + ", but msg=" + msg); @@ -391,7 +392,7 @@ throw new AssertionError(); } - m_sequence++; + m_sequence.incrementAndGet(); m_fileChange.signalAll(); Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -2446,51 +2446,30 @@ /* * Take the buffer from the cleanList and set it has the * [current] buffer. - * - * Note: We use the [cleanListNotEmpty] Condition so we can - * notice a [halt]. */ - cleanListLock.lockInterruptibly(); + + // Grab buffer from clean list. + final WriteCache newBuffer = takeFromClean(); + + counters.get().nclean--; + // Clear the state on the new buffer and remove from + // cacheService map + newBuffer.resetWith(serviceMap);//, fileExtent.get()); - try { + // Set it as the new buffer. + current.set(cache = newBuffer); - if (log.isInfoEnabled() && cleanList.isEmpty()) - log.info("Waiting for clean buffer"); - - while (cleanList.isEmpty() && !halt) { - cleanListNotEmpty.await(); - } + // Try to write on the new buffer. + if (cache.write(offset, data, chk, useChecksum, latchedAddr)) { - if (halt) - throw new RuntimeException(firstCause.get()); - - // Take a buffer from the cleanList (guaranteed avail). - final WriteCache newBuffer = cleanList.take(); - counters.get().nclean--; - // Clear the state on the new buffer and remove from - // cacheService map - newBuffer.resetWith(serviceMap);//, fileExtent.get()); - - // Set it as the new buffer. - current.set(cache = newBuffer); - - // Try to write on the new buffer. - if (cache.write(offset, data, chk, useChecksum, latchedAddr)) { - - // This must be the only occurrence of this record. - if (serviceMap.put(offset, cache) != null) { - throw new AssertionError("Record already in cache: offset=" + offset + " " + addrDebugInfo(offset)); - } - - - return true; - + // This must be the only occurrence of this record. + if (serviceMap.put(offset, cache) != null) { + throw new AssertionError("Record already in cache: offset=" + offset + " " + addrDebugInfo(offset)); } - } finally { + + return true; - cleanListLock.unlock(); - } /* @@ -2513,7 +2492,44 @@ } } + + private WriteCache takeFromClean() throws InterruptedException { + cleanListLock.lockInterruptibly(); + try { + + while (true) { + + if (log.isInfoEnabled() && cleanList.isEmpty()) + log.info("Waiting for clean buffer"); + + /* + * Note: We use the [cleanListNotEmpty] Condition so we can + * notice a [halt]. + */ + while (cleanList.isEmpty() && !halt) { + cleanListNotEmpty.await(); + } + + if (halt) + throw new RuntimeException(firstCause.get()); + + // Poll() rather than take() since other methods poll() the list + // unprotected. + final WriteCache ret = cleanList.poll(); + + if (ret != null) { + return ret; + } + + } + + } finally { + cleanListLock.unlock(); + } + } + + // /** // * Caches data read from disk (or even read from "older" cache). // * The assumption is that we do not need a "reserve" buffer. Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -1389,7 +1389,7 @@ if (_bufferStrategy.isOpen()) { if (log.isInfoEnabled()) - log.info("Closing journal: " + getFile()); + log.info("Closing journal in finalize: " + getFile()); shutdownNow(); Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/WriteExecutorService.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/WriteExecutorService.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/WriteExecutorService.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -183,7 +183,7 @@ */ final boolean trackActiveSetInMDC = false; // MUST be false for deploy - private final IResourceManager resourceManager; + private final WeakReference<IResourceManager> resourceManagerRef; /** * The name of the service if the write service is running inside of a @@ -365,7 +365,7 @@ } - this.resourceManager = resourceManager; + this.resourceManagerRef = new WeakReference<IResourceManager>(resourceManager); /* * Tracks rejected executions on a counter. @@ -947,7 +947,7 @@ * conditions? */ - final AbstractJournal journal = resourceManager.getLiveJournal(); + final AbstractJournal journal = getLiveJournal(); if(journal.isOpen()) { @@ -1790,14 +1790,29 @@ * are met. */ private boolean isShouldOverflow() { - - return resourceManager.isOverflowEnabled() + final IResourceManager rm = getResourceManager(); + + return rm.isOverflowEnabled() // && (forceOverflow.get() || resourceManager.shouldOverflow()); - && resourceManager.shouldOverflow(); + && rm.shouldOverflow(); } /** + * Not sure if there is any utility in checking for null reference. + * + * @return + */ + private IResourceManager getResourceManager() { + return resourceManagerRef.get(); + } + + private AbstractJournal getLiveJournal() { + final IResourceManager rm = getResourceManager(); + return rm == null ? null : rm.getLiveJournal(); + } + + /** * Once an overflow condition has been recognized and NO tasks are * {@link #nrunning} then {@link IResourceManager#overflow()} MAY be invoked * to handle synchronous overflow processing, including putting a new @@ -1835,7 +1850,7 @@ * Note: This returns a Future. We could use that to cancel * asynchronous overflow processing if there were a reason to do so. */ - resourceManager.overflow(); + getResourceManager().overflow(); noverflow++; @@ -2301,7 +2316,7 @@ * so there is nothing to rollback). */ - if(!resourceManager.isOpen()) { + if(getResourceManager() == null || !getResourceManager().isOpen()) { log.warn("ResourceManager not open?"); @@ -2313,7 +2328,7 @@ // note: throws IllegalStateException if resource manager is not open. // final AbstractJournal journal = resourceManager.getLiveJournal(); - final AbstractJournal journal = resourceManager.getLiveJournal(); + final AbstractJournal journal = getLiveJournal(); if(!journal.isOpen()) { @@ -2592,9 +2607,9 @@ * abort(). */ - final AbstractJournal journal = resourceManager.getLiveJournal(); + final AbstractJournal journal = getLiveJournal(); - if(journal.isOpen()) { + if (journal.isOpen()) { // Abandon the write sets. @@ -2606,15 +2621,20 @@ log.info("Did abort"); } catch(Throwable t) { + + if (getResourceManager() == null) { + log.error("Abort with collected journal: " + serviceName, t); + } else { - AbstractJournal journal = resourceManager.getLiveJournal(); + AbstractJournal journal = getLiveJournal(); + + if(journal.isOpen()) { + + log.error("Problem with abort? : "+serviceName+" : "+t, t); + + } + } - if(journal.isOpen()) { - - log.error("Problem with abort? : "+serviceName+" : "+t, t); - - } - } finally { // increment the #of aborts. Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -2315,84 +2315,92 @@ * WriteCacheService, building BlobHeader as you go. **/ public long alloc(final byte buf[], final int size, - final IAllocationContext context) { + final IAllocationContext context) { - final long begin = System.nanoTime(); - - if (size > (m_maxFixedAlloc - 4)) { - - if (size > getMaxBlobSize()) - throw new IllegalArgumentException( - "Allocation request beyond maximum BLOB of " + getMaxBlobSize()); + m_allocationLock.lock(); + try { + final long begin = System.nanoTime(); - if (log.isTraceEnabled()) - log.trace("BLOB ALLOC: " + size); - - if (m_storageStats != null) { - m_storageStats.allocateBlob(size); - } + if (size > (m_maxFixedAlloc - 4)) { - final PSOutputStream psout = PSOutputStream.getNew(this, - m_maxFixedAlloc, context); - try { - - int i = 0; - final int blocks = size/512; - for (int b = 0; b < blocks; b++) { - psout.write(buf, i, 512); // add 512 bytes at a time - i += 512; - } - psout.write(buf, i, size - i); + if (size > getMaxBlobSize()) + throw new IllegalArgumentException( + "Allocation request beyond maximum BLOB of " + + getMaxBlobSize()); - return psout.save(); - - } catch (IOException e) { - - throw new RuntimeException("Closed Store?", e); - - } finally { - try { - psout.close(); // return stream - } catch (IOException ioe) { - // should not happen, since this should only be - // recycling - log.warn("Unexpected error closing PSOutputStream", ioe); - } - } + if (log.isTraceEnabled()) + log.trace("BLOB ALLOC: " + size); - } + if (m_storageStats != null) { + m_storageStats.allocateBlob(size); + } - final int newAddr = alloc(size + 4, context); // allow size for checksum - - if (newAddr == 0) - throw new IllegalStateException("NULL address allocated"); + final PSOutputStream psout = PSOutputStream.getNew(this, + m_maxFixedAlloc, context); + try { - final int chk = ChecksumUtility.getCHK().checksum(buf, size); - - final long pa = physicalAddress(newAddr); + int i = 0; + final int blocks = size / 512; + for (int b = 0; b < blocks; b++) { + psout.write(buf, i, 512); // add 512 bytes at a time + i += 512; + } + psout.write(buf, i, size - i); - try { - m_writeCacheService.write(pa, ByteBuffer.wrap(buf, 0, size), chk, true/*writeChecksum*/, newAddr/*latchedAddr*/); - } catch (InterruptedException e) { - throw new RuntimeException("Closed Store?", e); - } + return psout.save(); - // Update counters. - final StoreCounters<?> c = (StoreCounters<?>) storeCounters.get() - .acquire(); - try { - final int nwrite = size + 4;// size plus checksum. - c.nwrites++; - c.bytesWritten += nwrite; - c.elapsedWriteNanos += (System.nanoTime() - begin); - if (nwrite > c.maxWriteSize) { - c.maxWriteSize = nwrite; - } - } finally { - c.release(); - } + } catch (IOException e) { - return newAddr; + throw new RuntimeException("Closed Store?", e); + + } finally { + try { + psout.close(); // return stream + } catch (IOException ioe) { + // should not happen, since this should only be + // recycling + log.warn("Unexpected error closing PSOutputStream", ioe); + } + } + + } + + final int newAddr = alloc(size + 4, context); // allow size for + // checksum + + if (newAddr == 0) + throw new IllegalStateException("NULL address allocated"); + + final int chk = ChecksumUtility.getCHK().checksum(buf, size); + + final long pa = physicalAddress(newAddr); + + try { + m_writeCacheService.write(pa, ByteBuffer.wrap(buf, 0, size), + chk, true/* writeChecksum */, newAddr/* latchedAddr */); + } catch (InterruptedException e) { + throw new RuntimeException("Closed Store?", e); + } + + // Update counters. + final StoreCounters<?> c = (StoreCounters<?>) storeCounters.get() + .acquire(); + try { + final int nwrite = size + 4;// size plus checksum. + c.nwrites++; + c.bytesWritten += nwrite; + c.elapsedWriteNanos += (System.nanoTime() - begin); + if (nwrite > c.maxWriteSize) { + c.maxWriteSize = nwrite; + } + } finally { + c.release(); + } + + return newAddr; + } finally { + m_allocationLock.unlock(); + } } // /**************************************************************************** Modified: branches/READ_CACHE/bigdata/src/test/com/bigdata/io/writecache/TestRWWriteCacheService.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/io/writecache/TestRWWriteCacheService.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/io/writecache/TestRWWriteCacheService.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -150,7 +150,7 @@ final int hotCacheThreshold = 1; writeCache = new RWWriteCacheService(5/* nbuffers */, - 0/* maxDirtyListSize */, 0/*readCacheSize*/, prefixWrites, compactionThreshold, + 0/* maxDirtyListSize */, 5/*readCacheSize*/, prefixWrites, compactionThreshold, 0/*hotCacheSize*/, hotCacheThreshold, fileExtent, opener, quorum, null); Modified: branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestJournalShutdown.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestJournalShutdown.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestJournalShutdown.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -202,6 +202,13 @@ try { try { +// final Object tst = new Object() { +// protected void finalize() throws Throwable { +// super.finalize(); +// System.err.println("Finalizer called!!"); +// } +// }; + for (int i = 0; i < limit; i++) { final Journal jnl = new Journal(properties) { @@ -214,6 +221,7 @@ + ncreated + ", nalive=" + nunfinalized); + destroy(); } }; @@ -241,7 +249,7 @@ */ final AbstractTask task1 = new NOpTask( jnl.getConcurrencyManager(), ITx.UNISOLATED, - "name"); + "name"); /* * Task does not create an index. Since it accesses a @@ -265,16 +273,17 @@ */ final AbstractTask task2 = new RegisterIndexTask( jnl.getConcurrencyManager(), "name", - new IndexMetadata("name", UUID.randomUUID())); + new IndexMetadata("name", UUID.randomUUID()));; /* * Submit one of the tasks and *wait* for its Future. */ - jnl.getConcurrencyManager().submit(task1).get(); - jnl.getConcurrencyManager().submit(task1b).get(); - jnl.getConcurrencyManager().submit(task2).get(); + jnl.getConcurrencyManager().submit(task1).get(); + jnl.getConcurrencyManager().submit(task1b).get(); + jnl.getConcurrencyManager().submit(task2).get(); + - } catch (ExecutionException e) { + } catch (/*Execution*/Exception e) { log.error("Problem registering index: " + e, e); } @@ -369,12 +378,17 @@ /* * Ensure that all journals are destroyed by the end of the test. */ + int destroyed = 0; for (int i = 0; i < refs.length; i++) { final Journal jnl = refs[i] == null ? null : refs[i].get(); if (jnl != null) { + destroyed++; jnl.destroy(); } } + if (destroyed > 0) { + System.err.println("Destroyed " + destroyed + " non finalized journals"); + } } Modified: branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -172,6 +172,8 @@ protected Journal getStore(final Properties properties) { stores = new Journal[replicationCount]; + + System.err.println("Setting stores array"); for (int i = 0; i < replicationCount; i++) { Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -58,9 +58,9 @@ import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.ha.HAGlue; import com.bigdata.ha.QuorumService; -import com.bigdata.ha.halog.HALogReader; -import com.bigdata.ha.halog.HALogWriter; -import com.bigdata.ha.halog.IHALogReader; +import com.bigdata.ha.althalog.HALogFile; +import com.bigdata.ha.althalog.HALogManager; +import com.bigdata.ha.althalog.IHALogReader; import com.bigdata.ha.msg.HADigestResponse; import com.bigdata.ha.msg.HALogDigestResponse; import com.bigdata.ha.msg.HALogRootBlocksResponse; @@ -238,19 +238,9 @@ * @see Options#HA_LOG_DIR * @see HALogWriter */ - private final HALogWriter haLogWriter; + private final HALogManager haLogManager; /** - * The {@link HALogWriter} for this {@link HAJournal} and never - * <code>null</code>. - */ - HALogWriter getHALogWriter() { - - return haLogWriter; - - } - - /** * {@inheritDoc} * <p> * Overridden to strengthen the return type. @@ -297,10 +287,14 @@ } - // Set up the HA log writer. - haLogWriter = new HALogWriter(haLogDir); + // Set up the HA log manager. + haLogManager = new HALogManager(haLogDir); } + + public HALogManager getHALogManager() { + return haLogManager; + } /** * Perform some checks on the {@link HAJournal} configuration properties. @@ -416,9 +410,9 @@ */ @Override protected void _close() { - + try { - haLogWriter.disable(); + haLogManager.disable(); } catch (IOException e) { haLog.error(e, e); } @@ -445,7 +439,7 @@ if (f.isDirectory()) return true; - return f.getName().endsWith(HALogWriter.HA_LOG_EXT); + return f.getName().endsWith(HALogFile.HA_LOG_EXT); } }); @@ -589,7 +583,7 @@ final long commitCounter = msg.getCommitCounter(); final File logFile = new File(haLogDir, - HALogWriter.getHALogFileName(commitCounter)); + HALogFile.getHALogFileName(commitCounter)); if (!logFile.exists()) { @@ -598,10 +592,11 @@ } - final HALogReader r = new HALogReader(logFile); + final HALogFile haLogFile = new HALogFile(logFile); + final IHALogReader reader = haLogFile.getReader(); final HALogRootBlocksResponse resp = new HALogRootBlocksResponse( - r.getOpeningRootBlock(), r.getClosingRootBlock()); + reader.getOpeningRootBlock(), reader.getClosingRootBlock()); if (haLog.isDebugEnabled()) haLog.debug("msg=" + msg + ", resp=" + resp); @@ -625,7 +620,7 @@ * file needs to be an atomic decision and thus MUST be made by the * HALogManager. */ - final IHALogReader r = getHALogWriter().getReader(commitCounter); + final IHALogReader r = haLogManager.getReader(commitCounter); // Task sends an HALog file along the pipeline. final FutureTask<Void> ft = new FutureTaskMon<Void>( @@ -882,7 +877,7 @@ * file needs to be an atomic decision and thus MUST be made by the * HALogManager. */ - final IHALogReader r = getHALogWriter().getReader(commitCounter); + final IHALogReader r = haLogManager.getReader(commitCounter); final MessageDigest digest = MessageDigest.getInstance("MD5"); Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -47,7 +47,8 @@ import com.bigdata.ha.QuorumService; import com.bigdata.ha.QuorumServiceBase; import com.bigdata.ha.RunState; -import com.bigdata.ha.halog.HALogWriter; +import com.bigdata.ha.althalog.HALogManager; +import com.bigdata.ha.althalog.IHALogWriter; import com.bigdata.ha.msg.HALogRequest; import com.bigdata.ha.msg.HALogRootBlocksRequest; import com.bigdata.ha.msg.HARebuildRequest; @@ -814,7 +815,7 @@ try { - journal.getHALogWriter().disable(); + journal.getHALogManager().disable(); } catch (IOException e) { @@ -879,7 +880,7 @@ try { - journal.getHALogWriter().createLog( + journal.getHALogManager().createLog( journal.getRootBlockView()); } catch (IOException e) { @@ -1408,9 +1409,9 @@ // Make sure we have the correct HALogWriter open. logLock.lock(); try { - final HALogWriter logWriter = journal.getHALogWriter(); - logWriter.disable(); - logWriter.createLog(openRootBlock); + final HALogManager logManager = journal.getHALogManager(); + logManager.disable(); + logManager.createLog(openRootBlock); } finally { logLock.unlock(); } @@ -1552,8 +1553,8 @@ // Close out the current HALog writer. logLock.lock(); try { - final HALogWriter logWriter = journal.getHALogWriter(); - logWriter.closeLog(closeRootBlock); + final IHALogWriter logWriter = journal.getHALogManager().getOpenLogFile().getWriter(); + logWriter.close(closeRootBlock); } finally { logLock.unlock(); } @@ -1643,7 +1644,7 @@ } - final HALogWriter logWriter = journal.getHALogWriter(); + final IHALogWriter logWriter = journal.getHALogManager().getOpenLogFile().getWriter(); if (haLog.isDebugEnabled()) haLog.debug("HALog.commitCounter=" @@ -1746,7 +1747,7 @@ if (HA_LOG_ENABLED) { - final HALogWriter logWriter = journal.getHALogWriter(); + final IHALogWriter logWriter = journal.getHALogManager().getOpenLogFile().getWriter(); if (msg.getCommitCounter() == logWriter.getCommitCounter() && msg.getSequence() == (logWriter.getSequence() - 1)) { @@ -1878,7 +1879,7 @@ * files. */ - final HALogWriter logWriter = journal.getHALogWriter(); + final IHALogWriter logWriter = journal.getHALogManager().getOpenLogFile().getWriter(); final long journalCommitCounter = journal.getRootBlockView() .getCommitCounter(); @@ -1961,7 +1962,7 @@ private void resyncTransitionToMetQuorum(final IHAWriteMessage msg, final ByteBuffer data) throws IOException, InterruptedException { - final HALogWriter logWriter = journal.getHALogWriter(); + final IHALogWriter logWriter = journal.getHALogManager().getOpenLogFile().getWriter(); final IRootBlockView rootBlock = journal.getRootBlockView(); @@ -2030,7 +2031,7 @@ if (HA_LOG_ENABLED) { - final HALogWriter logWriter = journal.getHALogWriter(); + final IHALogWriter logWriter = journal.getHALogManager().getOpenLogFile().getWriter(); if (msg.getCommitCounter() != logWriter.getCommitCounter()) { @@ -2067,7 +2068,9 @@ try { - journal.getHALogWriter().write(msg, data); + final IHALogWriter logWriter = journal.getHALogManager().getOpenLogFile().getWriter(); + + logWriter.write(msg, data); } finally { @@ -2132,12 +2135,14 @@ logLock.lock(); try { - + final HALogManager logManager = journal.getHALogManager(); + final IHALogWriter logWriter = logManager.getOpenLogFile().getWriter(); + // Close off the old log file with the root block. - journal.getHALogWriter().closeLog(rootBlock); + logWriter.close(rootBlock); // Open up a new log file with this root block. - journal.getHALogWriter().createLog(rootBlock); + logManager.createLog(rootBlock); } finally { @@ -2161,49 +2166,7 @@ logLock.lock(); try { - - final File logDir = journal.getHALogDir(); - - final File[] files = logDir.listFiles(new FilenameFilter() { - - @Override - public boolean accept(final File dir, final String name) { - - return name.endsWith(HALogWriter.HA_LOG_EXT); - - } - }); - - int ndeleted = 0; - long totalBytes = 0L; - - final File currentFile = journal.getHALogWriter().getFile(); - - for (File file : files) { - - final long len = file.length(); - - final boolean delete = includeCurrent - || currentFile != null - && file.getName().equals(currentFile.getName()); - - if (delete && !file.delete()) { - - haLog.warn("COULD NOT DELETE FILE: " + file); - - continue; - - } - - ndeleted++; - - totalBytes += len; - - } - - haLog.info("PURGED LOGS: ndeleted=" + ndeleted - + ", totalBytes=" + totalBytes); - + journal.getHALogManager().removeAllLogFiles(includeCurrent); } finally { logLock.unlock(); Modified: branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-01-22 20:46:21 UTC (rev 6817) +++ branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-01-23 14:21:12 UTC (rev 6818) @@ -163,6 +163,8 @@ // Start 3rd service. final HAGlue serverC = startC(); + awaitKBExists(serverC); + // Wait until the quorum is fully met. The token should not change. assertEquals(token, awaitFullyMetQuorum()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |