From: <tho...@us...> - 2010-11-03 14:33:59
|
Revision: 3877 http://bigdata.svn.sourceforge.net/bigdata/?rev=3877&view=rev Author: thompsonbry Date: 2010-11-03 14:33:52 +0000 (Wed, 03 Nov 2010) Log Message: ----------- Working through some issues with interrupts, reopen, and shadow journals with Martyn. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/AbstractInterruptsTestCase.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestWORMStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-03 13:17:41 UTC (rev 3876) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-03 14:33:52 UTC (rev 3877) @@ -269,27 +269,39 @@ checkReopen(); - if (data == null) { + if (data == null) throw new IllegalArgumentException(); - } - + + if (data.hasArray() && data.arrayOffset() != 0) { + /* + * FIXME [data] is not always backed by an array, the array may not + * be visible (read-only), the array offset may not be zero, etc. + * Try to drive the ByteBuffer into the RWStore.alloc() method + * instead. + * + * See https://sourceforge.net/apps/trac/bigdata/ticket/151 + */ + throw new AssertionError(); + } + final int nbytes = data.remaining(); - if (nbytes == 0) { + if (nbytes == 0) throw new IllegalArgumentException(); - } - try { /* FIXME [data] is not always backed by an array, the array may not be visible (read-only), the array offset may not be zero, etc. Try to drive the ByteBuffer into the RWStore.alloc() method instead. */ - if(data.hasArray()&&data.arrayOffset()!=0)throw new AssertionError(); - final long rwaddr = m_store.alloc(data.array(), nbytes, context); - data.position(nbytes); // update position to end of buffer + try { + + final long rwaddr = m_store.alloc(data.array(), nbytes, context); + + data.position(nbytes); // update position to end of buffer final long retaddr = encodeAddr(rwaddr, nbytes); return retaddr; - } catch (RuntimeException re) { + + } catch (RuntimeException re) { - re.printStackTrace(); + log.error(re,re);//re.printStackTrace(); m_needsReopen = true; @@ -521,7 +533,7 @@ m_fileMetadata.raf.close(); m_fileMetadata.raf = null; } catch (IOException e) { - e.printStackTrace(); + log.error(e,e);//e.printStackTrace(); } } @@ -641,7 +653,7 @@ m_needsReopen = false; m_open = true; } catch (Throwable t) { - t.printStackTrace(); + log.error(t,t); throw new RuntimeException(t); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java 2010-11-03 13:17:41 UTC (rev 3876) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java 2010-11-03 14:33:52 UTC (rev 3877) @@ -213,23 +213,23 @@ **/ public int bufferChainOffset(); - public void absoluteWriteLong(long addr, int threshold, long value); - - /*************************************************************************************** - * Needed by PSOutputStream for BLOB buffer chaining. - **/ - public void absoluteWriteInt(int addr, int offset, int value); +// public void absoluteWriteLong(long addr, int threshold, long value); +// +// /*************************************************************************************** +// * Needed by PSOutputStream for BLOB buffer chaining. +// **/ +// public void absoluteWriteInt(int addr, int offset, int value); +// +// /*************************************************************************************** +// * Needed to free Blob chains. +// **/ +// public int absoluteReadInt(int addr, int offset); +// +// /*************************************************************************************** +// * Needed to free Blob chains. +// **/ +// public int absoluteReadLong(long addr, int offset); - /*************************************************************************************** - * Needed to free Blob chains. - **/ - public int absoluteReadInt(int addr, int offset); - - /*************************************************************************************** - * Needed to free Blob chains. - **/ - public int absoluteReadLong(long addr, int offset); - // /*************************************************************************************** // * copies the store to a new file, this is not necessarily a byte for byte copy // * since the store could write only consolidated data - particulalry relevant for the @@ -264,7 +264,7 @@ **/ public File getStoreFile(); - public void absoluteWriteAddress(long addr, int threshold, long addr2); +// public void absoluteWriteAddress(long addr, int threshold, long addr2); public int getAddressSize(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-03 13:17:41 UTC (rev 3876) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-03 14:33:52 UTC (rev 3877) @@ -235,7 +235,7 @@ // RWStore Data // /////////////////////////////////////////////////////////////////////////////////////// - private File m_fd; + private final File m_fd; private RandomAccessFile m_raf; // protected FileMetadata m_metadata; // protected int m_transactionCount; @@ -967,7 +967,7 @@ return; } catch (IOException e) { - e.printStackTrace(); + log.error(e,e); throw new IllegalStateException("Unable to restore Blob allocation", e); } @@ -1435,11 +1435,10 @@ // } // } - // -------------------------------------------------------------------------------------------- - // reset - // - // Similar to rollbackTransaction but will force a re-initialization if transactions are not being - // used - update w/o commit protocol. + /** + * Toss away all buffered writes and then reload from the current root + * block. + */ public void reset() { if (log.isInfoEnabled()) { log.info("RWStore Reset"); @@ -1459,13 +1458,13 @@ try { m_writeCache.reset(); } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + throw new RuntimeException(e); } initfromRootBlock(); - m_writeCache.setExtent(convertAddr(m_fileSize)); // notify of current file length. + // notify of current file length. + m_writeCache.setExtent(convertAddr(m_fileSize)); } catch (Exception e) { throw new IllegalStateException("Unable reset the store", e); } finally { @@ -1607,67 +1606,67 @@ m_allocationLock.lock(); try { - - checkDeferredFrees(true, journal); // free now if possible + + checkDeferredFrees(true, journal); // free now if possible - // Allocate storage for metaBits - final long oldMetaBits = m_metaBitsAddr; - final int oldMetaBitsSize = (m_metaBits.length + m_allocSizes.length + 1) * 4; - m_metaBitsAddr = alloc(getRequiredMetaBitsStorage(), null); + // Allocate storage for metaBits + final long oldMetaBits = m_metaBitsAddr; + final int oldMetaBitsSize = (m_metaBits.length + m_allocSizes.length + 1) * 4; + m_metaBitsAddr = alloc(getRequiredMetaBitsStorage(), null); - // DEBUG SANITY CHECK! - if (physicalAddress(m_metaBitsAddr) == 0) { - throw new IllegalStateException("Returned MetaBits Address not valid!"); - } + // DEBUG SANITY CHECK! + if (physicalAddress(m_metaBitsAddr) == 0) { + throw new IllegalStateException("Returned MetaBits Address not valid!"); + } - // Call immediateFree - no need to defer freeof metaBits, this - // has to stop somewhere! - immediateFree((int) oldMetaBits, oldMetaBitsSize); + // Call immediateFree - no need to defer freeof metaBits, this + // has to stop somewhere! + immediateFree((int) oldMetaBits, oldMetaBitsSize); - // save allocation headers - final Iterator<Allocator> iter = m_commitList.iterator(); - while (iter.hasNext()) { - final Allocator allocator = iter.next(); - final int old = allocator.getDiskAddr(); - metaFree(old); - - final int naddr = metaAlloc(); - allocator.setDiskAddr(naddr); - - if (log.isTraceEnabled()) - log.trace("Update allocator " + allocator.getIndex() - + ", old addr: " + old + ", new addr: " + naddr); + // save allocation headers + final Iterator<Allocator> iter = m_commitList.iterator(); + while (iter.hasNext()) { + final Allocator allocator = iter.next(); + final int old = allocator.getDiskAddr(); + metaFree(old); + + final int naddr = metaAlloc(); + allocator.setDiskAddr(naddr); + + if (log.isTraceEnabled()) + log.trace("Update allocator " + allocator.getIndex() + + ", old addr: " + old + ", new addr: " + naddr); - try { - // do not use checksum - m_writeCache.write(metaBit2Addr(naddr), ByteBuffer - .wrap(allocator.write()), 0, false); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - m_commitList.clear(); - - writeMetaBits(); - try { - m_writeCache.flush(true); + // do not use checksum + m_writeCache.write(metaBit2Addr(naddr), ByteBuffer + .wrap(allocator.write()), 0, false); } catch (InterruptedException e) { - e.printStackTrace(); throw new RuntimeException(e); } + } + m_commitList.clear(); - // Should not write rootBlock, this is responsibility of client - // to provide control - // writeFileSpec(); + writeMetaBits(); - m_metaTransientBits = (int[]) m_metaBits.clone(); + try { + m_writeCache.flush(true); + } catch (InterruptedException e) { + log.error(e, e); + throw new RuntimeException(e); + } + // Should not write rootBlock, this is responsibility of client + // to provide control + // writeFileSpec(); + + m_metaTransientBits = (int[]) m_metaBits.clone(); + // if (m_commitCallback != null) { // m_commitCallback.commitComplete(); // } - m_raf.getChannel().force(false); // TODO, check if required! + m_raf.getChannel().force(false); // TODO, check if required! } catch (IOException e) { throw new StorageTerminalError("Unable to commit transaction", e); } finally { @@ -2385,35 +2384,35 @@ // } // } - /*************************************************************************************** - * Needed by PSOutputStream for BLOB buffer chaining. - **/ - public void absoluteWriteInt(final int addr, final int offset, final int value) { - try { - // must check write cache!!, or the write may be overwritten - just - // flush for now - m_writes.flush(); +// /*************************************************************************************** +// * Needed by PSOutputStream for BLOB buffer chaining. +// **/ +// public void absoluteWriteInt(final int addr, final int offset, final int value) { +// try { +// // must check write cache!!, or the write may be overwritten - just +// // flush for now +// m_writes.flush(); +// +// m_raf.seek(physicalAddress(addr) + offset); +// m_raf.writeInt(value); +// } catch (IOException e) { +// throw new StorageTerminalError("Unable to write integer", e); +// } +// } - m_raf.seek(physicalAddress(addr) + offset); - m_raf.writeInt(value); - } catch (IOException e) { - throw new StorageTerminalError("Unable to write integer", e); - } - } +// /*************************************************************************************** +// * Needed to free Blob chains. +// **/ +// public int absoluteReadInt(final int addr, final int offset) { +// try { +// m_raf.seek(physicalAddress(addr) + offset); +// return m_raf.readInt(); +// } catch (IOException e) { +// throw new StorageTerminalError("Unable to write integer", e); +// } +// } /*************************************************************************************** - * Needed to free Blob chains. - **/ - public int absoluteReadInt(final int addr, final int offset) { - try { - m_raf.seek(physicalAddress(addr) + offset); - return m_raf.readInt(); - } catch (IOException e) { - throw new StorageTerminalError("Unable to write integer", e); - } - } - - /*************************************************************************************** * Needed by PSOutputStream for BLOB buffer chaining. **/ public int bufferChainOffset() { @@ -2429,30 +2428,29 @@ return false; } - public int absoluteReadLong(long addr, int offset) { - throw new UnsupportedOperationException(); - } +// public int absoluteReadLong(long addr, int offset) { +// throw new UnsupportedOperationException(); +// } +// +// public void absoluteWriteLong(long addr, int threshold, long value) { +// throw new UnsupportedOperationException(); +// } - public void absoluteWriteLong(long addr, int threshold, long value) { - throw new UnsupportedOperationException(); - } +// public void absoluteWriteAddress(long addr, int threshold, long addr2) { +// absoluteWriteInt((int) addr, threshold, (int) addr2); +// } - public void absoluteWriteAddress(long addr, int threshold, long addr2) { - absoluteWriteInt((int) addr, threshold, (int) addr2); - } - public int getAddressSize() { return 4; } - // DiskStrategy Support - public RandomAccessFile getRandomAccessFile() { - return m_raf; - } +// public RandomAccessFile getRandomAccessFile() { +// return m_raf; +// } - public FileChannel getChannel() { - return m_raf.getChannel(); - } +// public FileChannel getChannel() { +// return m_raf.getChannel(); +// } public boolean requiresCommit() { return m_recentAlloc; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/AbstractInterruptsTestCase.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/AbstractInterruptsTestCase.java 2010-11-03 13:17:41 UTC (rev 3876) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/AbstractInterruptsTestCase.java 2010-11-03 14:33:52 UTC (rev 3877) @@ -145,14 +145,14 @@ * "get()" this task since that will block and the 'interrupt' task * will not run. */ + final long maxWaitMillis = 5*1000; journal.submit(new AbstractTask(journal,ITx.UNISOLATED,new String[]{}){ protected Object doTask() throws Exception { - // sleep for 10 seconds. + // sleep for a bit. + Thread.sleep(maxWaitMillis/*millis*/); - Thread.sleep(10*1000); - throw new AssertionError("Not expecting to wake up."); }}); @@ -182,12 +182,19 @@ log.warn("Waiting for the write service to commit or abort"); + final long begin = System.currentTimeMillis(); while (journal.getConcurrencyManager().writeService.getAbortCount() == 0 && journal.getConcurrencyManager().writeService .getGroupCommitCount() == 0) { - Thread.sleep(10); + final long elapsed = System.currentTimeMillis() - begin; + + if (elapsed > maxWaitMillis) { + fail("Did not abort/commit after " + elapsed + "ms"); + } + Thread.sleep(10/*ms*/); + } // did abort. @@ -241,7 +248,10 @@ final BTree ndx = (BTree) getIndex(getOnlyResource()); // write on the index. - ndx.insert(new byte[]{},new byte[]{}); +// final byte[] val = new byte[Bytes.kilobyte32]; +// for (int i = 0; i < (Bytes.megabyte32 / Bytes.kilobyte32) + 1; i++) +// ndx.insert(new byte[i], val); + ndx.insert(new byte[0], new byte[0]); /* * Now provoke a ClosedByInterruptException. @@ -260,10 +270,10 @@ } catch(Exception ex) { - assertTrue(isInnerCause(ex, ClosedByInterruptException.class)); +// log.warn("Provoked expected root cause exception: " + ex, ex); +// +// assertTrue(isInnerCause(ex, ClosedByInterruptException.class)); - log.info("Provoked expected root cause exception: " + ex); - throw ex; } catch(Throwable t) { @@ -283,7 +293,7 @@ * {@link FileChannel} after a {@link ClosedByInterruptException}. * <p> * The test uses the {@link IRawStore} API. It writes an initial record on - * the store and commits. It then interrupts the main thread and then + * the store. It then interrupts the main thread and then * performs another low level write on the store. The store is then forced * to disk to ensure that a {@link ClosedByInterruptException} is triggered * (during an IO), (alternatively, an {@link InterruptedException} can be @@ -308,14 +318,14 @@ final long addr1 = store.write(rec1); - if (store instanceof IAtomicStore) { - - assertNotSame(0L, ((IAtomicStore)store).commit()); - - } else if (store instanceof RWStrategy) { - RWStrategy rws = (RWStrategy)store; - rws.commit(null); - } +// if (store instanceof IAtomicStore) { +// +// assertNotSame(0L, ((IAtomicStore)store).commit()); +// +// } else if (store instanceof RWStrategy) { +// RWStrategy rws = (RWStrategy)store; +// rws.commit(null); +// } try { @@ -325,7 +335,7 @@ store.force(true); - fail("Expecting: " + ClosedByInterruptException.class); + fail("Expecting to be interrupted."); } catch (Throwable t) { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestWORMStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestWORMStrategy.java 2010-11-03 13:17:41 UTC (rev 3876) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestWORMStrategy.java 2010-11-03 14:33:52 UTC (rev 3877) @@ -240,7 +240,8 @@ properties.setProperty(Options.WRITE_CACHE_ENABLED, "" + writeCacheEnabled); - return new Journal(properties); + return new Journal(properties).getBufferStrategy(); +// return new Journal(properties); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-11-03 13:17:41 UTC (rev 3876) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-11-03 14:33:52 UTC (rev 3877) @@ -1149,8 +1149,8 @@ properties.setProperty(Options.WRITE_CACHE_ENABLED, "" + writeCacheEnabled); - // return new Journal(properties).getBufferStrategy(); - return new Journal(properties); + return new Journal(properties).getBufferStrategy(); +// return new Journal(properties); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |