This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <btm...@us...> - 2010-11-10 14:01:32
|
Revision: 3924 http://bigdata.svn.sourceforge.net/bigdata/?rev=3924&view=rev Author: btmurphy Date: 2010-11-10 14:01:26 +0000 (Wed, 10 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - phase 1 of callable executor (client service) smart proxy work. Backed out the temporary debug changes made to com.bigdata.transaction.AbstractTransactionService that were unintentionally included in changeset 3923 Modified Paths: -------------- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-10 00:40:29 UTC (rev 3923) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-10 14:01:26 UTC (rev 3924) @@ -317,7 +317,7 @@ * until existing transactions (both read-write and read-only) are complete * (either aborted or committed). */ - public void shutdown0() { + public void shutdown() { if(log.isInfoEnabled()) log.info(""); @@ -338,8 +338,7 @@ try { // wait for running transactions to complete. -//BTM - FOR_CLIENT_SERVICE awaitRunningTx(10/* logTimeout */, TimeUnit.MILLISECONDS); -awaitRunningTx(10L*1000L/* logTimeout */, TimeUnit.MILLISECONDS); + awaitRunningTx(10/* logTimeout */, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-11-10 00:40:36
|
Revision: 3923 http://bigdata.svn.sourceforge.net/bigdata/?rev=3923&view=rev Author: btmurphy Date: 2010-11-10 00:40:29 +0000 (Wed, 10 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - phase 1 of callable executor (client service) smart proxy work. Includes fixes to issues identified by the rdf tests Modified Paths: -------------- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/QuorumPeerStateV0.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/LocalTripleStore.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -317,7 +317,7 @@ * until existing transactions (both read-write and read-only) are complete * (either aborted or committed). */ - public void shutdown() { + public void shutdown0() { if(log.isInfoEnabled()) log.info(""); @@ -338,7 +338,8 @@ try { // wait for running transactions to complete. - awaitRunningTx(10/* logTimeout */, TimeUnit.MILLISECONDS); +//BTM - FOR_CLIENT_SERVICE awaitRunningTx(10/* logTimeout */, TimeUnit.MILLISECONDS); +awaitRunningTx(10L*1000L/* logTimeout */, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/QuorumPeerStateV0.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/QuorumPeerStateV0.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/QuorumPeerStateV0.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -55,7 +55,7 @@ private int initLimit = 5; private int syncLimit = 2; private int electionAlg = 3;//0=udp, 3=tcp - private int maxClientCnxns = 10; + private int maxClientCnxns = 0;//0 ==> unlimited, 10 is default private Map<Long, QuorumPeerData> peerDataMap = new TreeMap<Long, QuorumPeerData>();//order by peerId Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -740,12 +740,31 @@ final ITextIndexer tmp; try { final Class<?> vfc = determineTextIndexerClass(); - final Method gi = vfc.getMethod("getInstance", - IIndexManager.class, String.class, Long.class, - Properties.class); - tmp = (ITextIndexer) gi.invoke(null/* object */, - getIndexManager(), getNamespace(), - getTimestamp(), getProperties()); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE final Method gi = vfc.getMethod("getInstance", +//BTM - PRE_CLIENT_SERVICE IIndexManager.class, String.class, Long.class, +//BTM - PRE_CLIENT_SERVICE Properties.class); +//BTM - PRE_CLIENT_SERVICE tmp = (ITextIndexer) gi.invoke(null/* object */, +//BTM - PRE_CLIENT_SERVICE getIndexManager(), getNamespace(), +//BTM - PRE_CLIENT_SERVICE getTimestamp(), getProperties()); + final Method gi = + vfc.getMethod("getInstance", + IIndexManager.class, + IConcurrencyManager.class, + IBigdataDiscoveryManagement.class, + String.class, + Long.class, + Properties.class); + tmp = + (ITextIndexer) gi.invoke + (null,//object + getIndexManager(), + getConcurrencyManager(), + getDiscoveryManager(), + getNamespace(), + getTimestamp(), + getProperties()); +//BTM - PRE_CLIENT_SERVICE - END if(tmp instanceof ILocatableResource<?>) { ((ILocatableResource<?>)tmp).init(); } Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -502,12 +502,7 @@ if (indexManager == null) throw new IllegalArgumentException(); //BTM - FOR_CLIENT_SERVICE - BEGIN - if (concurrencyManager == null) { - throw new IllegalArgumentException("null concurrencyManager"); - } - if (discoveryManager == null) { - throw new IllegalArgumentException("null discoveryManager"); - } + //allowed to be null this.concurrencyManager = concurrencyManager; this.discoveryManager = discoveryManager; //BTM - FOR_CLIENT_SERVICE - END Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/LocalTripleStore.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/LocalTripleStore.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/LocalTripleStore.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -36,6 +36,10 @@ import com.bigdata.rdf.spo.SPORelation; import com.bigdata.relation.locator.DefaultResourceLocator; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; + /** * A triple store based on the <em>bigdata</em> architecture. This class * offers extremely low latency for index operations. All indices are local @@ -152,7 +156,7 @@ //BTM - PRE_CLIENT_SERVICE - BEGIN //BTM - PRE_CLIENT_SERVICE - NOTE: the super class (AbstractLocalTripleStore --> AbstractTripleStore) now takes an //BTM - PRE_CLIENT_SERVICE - IConcurrencyManager and an IBigdataDiscoveryManagment instance for scale out. -//BTM - PRE_CLIENT_SERVICE - It's not clear whether passing null for those parameters will be a problem +//BTM - PRE_CLIENT_SERVICE - It's not clear whether passing null for those parameters will be a problem or //BTM - PRE_CLIENT_SERVICE - not. Need to monitor the tests for NullPointerExceptions. //BTM - PRE_CLIENT_SERVICE //BTM - PRE_CLIENT_SERVICE super(indexManager, namespace, timestamp, properties); @@ -166,6 +170,31 @@ } +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE - NOTE: DefaultResourceLocator now uses reflection +//BTM - FOR_CLIENT_SERVICE - to invoke a constructor with the arguments +//BTM - FOR_CLIENT_SERVICE - specified below, rather than the original +//BTM - FOR_CLIENT_SERVICE - constructor shown above. This constructor +//BTM - FOR_CLIENT_SERVICE - was added to allow for that reflection case. + public LocalTripleStore + (final IIndexManager indexManager, + final IConcurrencyManager concurrencyManager, + final IBigdataDiscoveryManagement discoveryManager, + final String namespace, + final Long timestamp, + final Properties properties) + { + super(indexManager, + concurrencyManager, + discoveryManager, + namespace, + timestamp, + properties); + + store = (Journal) indexManager; + } +//BTM - FOR_CLIENT_SERVICE - END + /** * Create or re-open a triple store using a local embedded database. */ Modified: branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -163,6 +163,8 @@ import com.bigdata.journal.TransactionService; //BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; import com.bigdata.relation.locator.IResourceLocator; /** * <p> @@ -1224,6 +1226,13 @@ .getTransactionManager().getTransactionService(); final String namespace = database.getNamespace(); + +//BTM - FOR_CLIENT_SERVICE - BEGIN + final IConcurrencyManager concurrencyManager = + database.getConcurrencyManager(); + final IBigdataDiscoveryManagement discoveryManager = + database.getDiscoveryManager(); +//BTM - FOR_CLIENT_SERVICE - END final Lock readLock = lock.readLock(); readLock.lock(); @@ -1255,11 +1264,12 @@ //BTM - FOR_CLIENT_SERVICE final AbstractTripleStore txView = (AbstractTripleStore) indexManager //BTM - FOR_CLIENT_SERVICE .getResourceLocator().locate(namespace, tx); IResourceLocator locator = indexManager.getResourceLocator(); +log.warn("\n*** concurrencyManager = "+concurrencyManager+", discoveryManager = "+discoveryManager+"\n"); final AbstractTripleStore txView = (AbstractTripleStore) locator.locate (indexManager, - database.getConcurrencyManager(), - database.getDiscoveryManager(), + concurrencyManager, + discoveryManager, namespace, tx); //BTM - FOR_CLIENT_SERVICE - END This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-09 14:16:20
|
Revision: 3922 http://bigdata.svn.sourceforge.net/bigdata/?rev=3922&view=rev Author: thompsonbry Date: 2010-11-09 14:16:13 +0000 (Tue, 09 Nov 2010) Log Message: ----------- Some edits to PSOutputStream (final declarations, factoring out error messages into constants). A Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-09 14:05:41 UTC (rev 3921) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-09 14:16:13 UTC (rev 3922) @@ -75,8 +75,20 @@ private static final Logger log = Logger.getLogger(FixedAllocator.class); -// protected static java.util.logging.Logger cat = java.util.logging.Logger.getLogger(PSOutputStream.class.getName()); + private static final transient String ERR_NO_STORE = "PSOutputStream with unitilialized store"; + private static final transient String ERR_ALREADY_SAVED = "Writing to saved PSOutputStream"; + + /* + * PSOutputStream pooling. + * + * @todo I would like to see this lifted into a class. The RWStore could + * then use an instance of that class to have a per-store pool of a given + * capacity. This should simplify this class (PSOutputStream), make the use + * of pooling optional, and allow greater concurrency if more than one + * RWStore is running since they will have distinct pools. (I also do not + * like the notion of JVM wide pools where they can be readily avoided). + */ private static PSOutputStream m_poolHead = null; private static PSOutputStream m_poolTail = null; private static int m_streamCount = 0; @@ -123,6 +135,10 @@ m_poolTail = stream; m_streamCount++; } + + /* + * PSOutputStream impl. + */ private int[] m_blobHeader = null; private byte[] m_buf = null; @@ -153,7 +169,7 @@ /**************************************************************** * resets private state variables for reuse of stream **/ - void init(IStore store, int maxAlloc, IAllocationContext context) { + void init(final IStore store, final int maxAlloc, final IAllocationContext context) { m_store = store; m_context = context; m_next = null; @@ -191,13 +207,13 @@ * We no longer store continuation addresses, instead we allocate * blob allocations via a blob header block. **/ - public void write(int b) throws IOException { + public void write(final int b) throws IOException { if (m_store == null) { - throw new IllegalStateException("NULL store"); + throw new IllegalStateException(ERR_NO_STORE); } if (m_isSaved) { - throw new IllegalStateException("Writing to saved PSOutputStream"); + throw new IllegalStateException(ERR_ALREADY_SAVED); } if (m_count == m_blobThreshold && !m_writingHdr) { @@ -206,7 +222,7 @@ m_blobHdrIdx = 0; } - int curAddr = (int) m_store.alloc(m_buf, m_count, m_context); + final int curAddr = (int) m_store.alloc(m_buf, m_count, m_context); m_blobHeader[m_blobHdrIdx++] = curAddr; m_count = 0; @@ -220,16 +236,16 @@ /**************************************************************** * write a single 4 byte integer **/ - public void writeInt(int b) throws IOException { + public void writeInt(final int b) throws IOException { write((b >>> 24) & 0xFF); write((b >>> 16) & 0xFF); write((b >>> 8) & 0xFF); write(b & 0xFF); } - public void writeLong(long b) throws IOException { - int hi = (int) (b >> 32); - int lo = (int) (b & 0xFFFFFFFF); + public void writeLong(final long b) throws IOException { + final int hi = (int) (b >> 32); + final int lo = (int) (b & 0xFFFFFFFF); writeInt(hi); writeInt(lo); } @@ -240,13 +256,13 @@ * we need to be able to efficiently handle large arrays beyond size * of the blobThreshold, so **/ - public void write(byte b[], int off, int len) throws IOException { + public void write(final byte b[], final int off, final int len) throws IOException { if (m_store == null) { - throw new IllegalStateException("PSOutputStream with unitilialized store"); + throw new IllegalStateException(ERR_NO_STORE); } if (m_isSaved) { - throw new IllegalStateException("PSOutputStream: already been saved"); + throw new IllegalStateException(ERR_ALREADY_SAVED); } if ((m_count + len) > m_blobThreshold) { @@ -270,12 +286,12 @@ * This method can be used to stream external files into * the store. **/ - public void write(InputStream instr) throws IOException { + public void write(final InputStream instr) throws IOException { if (m_isSaved) { - throw new IllegalStateException("PSOutputStream: already been saved"); + throw new IllegalStateException(ERR_ALREADY_SAVED); } - byte b[] = new byte[512]; + final byte b[] = new byte[512]; int r = instr.read(b); while (r == 512) { @@ -296,7 +312,7 @@ **/ public long save() { if (m_isSaved) { - throw new IllegalStateException("PSOutputStream: already been saved"); + throw new IllegalStateException(ERR_ALREADY_SAVED); } if (m_store == null) { @@ -309,18 +325,22 @@ try { m_writingHdr = true; // ensure that header CAN be a BLOB m_blobHeader[m_blobHdrIdx++] = addr; - int precount = m_count; + final int precount = m_count; m_count = 0; try { writeInt(m_blobHdrIdx); for (int i = 0; i < m_blobHdrIdx; i++) { - writeInt(m_blobHeader[i]); + writeInt(m_blobHeader[i]); } addr = (int) m_store.alloc(m_buf, m_count, m_context); - if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count)/m_blobThreshold)) { - throw new IllegalStateException("PSOutputStream.save at : " + addr + ", bytes: "+ m_bytesWritten + ", blocks: " + m_blobHdrIdx + ", last alloc: " + precount); - } + if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count) / m_blobThreshold)) { + throw new IllegalStateException( + "PSOutputStream.save at : " + addr + + ", bytes: " + m_bytesWritten + + ", blocks: " + m_blobHdrIdx + + ", last alloc: " + precount); + } if (log.isDebugEnabled()) log.debug("Writing BlobHdrIdx with " + m_blobHdrIdx + " allocations"); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-09 14:05:48
|
Revision: 3921 http://bigdata.svn.sourceforge.net/bigdata/?rev=3921&view=rev Author: thompsonbry Date: 2010-11-09 14:05:41 +0000 (Tue, 09 Nov 2010) Log Message: ----------- Added 'final' declarations. Groups all variables together at the head of the file. Added some @todos for javadoc clarification. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-09 11:48:21 UTC (rev 3920) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-09 14:05:41 UTC (rev 3921) @@ -13,20 +13,21 @@ import com.bigdata.util.ChecksumUtility; /** - * BlobAllocator + * BlobAllocator. * - * Manages Blob allocations using a list of FixedAllocators. + * Manages Blob allocations using a list of {@link FixedAllocator}s. * - * The main advantage of this is for re-allocation, since the FixedAllocators can be - * efficiently re-cycled where a fixed Blob creates issues of best fit and fragmentation. + * The main advantage of this is for re-allocation, since the + * {@link FixedAllocator}s can be efficiently re-cycled where a fixed Blob + * creates issues of best fit and fragmentation. * - * Some simple patterns would cause un-reallocatable storage, consider a Blob that always - * re-allocated to a larger size, or a pattern where several blobs got larger together, in these - * scenarios, smaller allocations would never be re-used, whilst the mechanism of component - * based allocation is easily re-used. + * Some simple patterns would cause un-reallocatable storage, consider a Blob + * that always re-allocated to a larger size, or a pattern where several blobs + * got larger together, in these scenarios, smaller allocations would never be + * re-used, whilst the mechanism of component based allocation is easily + * re-used. * * @author mgc - * */ public class BlobAllocator implements Allocator { @@ -37,6 +38,10 @@ private int m_diskAddr; private int m_index; private int m_sortAddr; + private ArrayList m_freeList; + private long m_startAddr; + // @todo javadoc. why 254? + private int m_freeSpots = 254; public BlobAllocator(final RWStore store, final int sortAddr) { m_store = store; @@ -46,47 +51,49 @@ log.info("New BlobAllocator"); } - public void addAddresses(ArrayList addrs) { + public void addAddresses(final ArrayList addrs) { // not relevant for BlobAllocators } - public boolean addressInRange(int addr) { + public boolean addressInRange(final int addr) { // not relevant for BlobAllocators return false; } - public int alloc(RWStore store, int size, IAllocationContext context) { + // @todo javadoc. Why is this method a NOP (other than the assert). + public int alloc(final RWStore store, final int size, final IAllocationContext context) { assert size > (m_store.m_maxFixedAlloc-4); return 0; } - public boolean free(int addr, int sze) { + // @todo why does this return false on all code paths? + public boolean free(final int addr, final int sze) { if (sze < (m_store.m_maxFixedAlloc-4)) throw new IllegalArgumentException("Unexpected address size"); - int alloc = m_store.m_maxFixedAlloc-4; - int blcks = (alloc - 1 + sze)/alloc; + final int alloc = m_store.m_maxFixedAlloc-4; + final int blcks = (alloc - 1 + sze)/alloc; int hdr_idx = (-addr) & RWStore.OFFSET_BITS_MASK; if (hdr_idx > m_hdrs.length) throw new IllegalArgumentException("free BlobAllocation problem, hdr offset: " + hdr_idx + ", avail:" + m_hdrs.length); - int hdr_addr = m_hdrs[hdr_idx]; + final int hdr_addr = m_hdrs[hdr_idx]; if (hdr_addr == 0) { return false; } // read in header block, then free each reference - byte[] hdr = new byte[(blcks+1) * 4 + 4]; // add space for checksum + final byte[] hdr = new byte[(blcks+1) * 4 + 4]; // add space for checksum m_store.getData(hdr_addr, hdr); + final DataInputStream instr = new DataInputStream( + new ByteArrayInputStream(hdr, 0, hdr.length-4) ); try { - DataInputStream instr = new DataInputStream( - new ByteArrayInputStream(hdr, 0, hdr.length-4) ); - int allocs = instr.readInt(); + final int allocs = instr.readInt(); for (int i = 0; i < allocs; i++) { - int nxt = instr.readInt(); + final int nxt = instr.readInt(); m_store.free(nxt, m_store.m_maxFixedAlloc); } m_store.free(hdr_addr, hdr.length); @@ -96,38 +103,38 @@ } } catch (IOException ioe) { - + throw new RuntimeException(ioe); } return false; } - public int getFirstFixedForBlob(int addr, int sze) { + public int getFirstFixedForBlob(final int addr, final int sze) { if (sze < (m_store.m_maxFixedAlloc-4)) throw new IllegalArgumentException("Unexpected address size: " + sze); - int alloc = m_store.m_maxFixedAlloc-4; - int blcks = (alloc - 1 + sze)/alloc; + final int alloc = m_store.m_maxFixedAlloc-4; + final int blcks = (alloc - 1 + sze)/alloc; - int hdr_idx = (-addr) & RWStore.OFFSET_BITS_MASK; + final int hdr_idx = (-addr) & RWStore.OFFSET_BITS_MASK; if (hdr_idx > m_hdrs.length) throw new IllegalArgumentException("free BlobAllocation problem, hdr offset: " + hdr_idx + ", avail:" + m_hdrs.length); - int hdr_addr = m_hdrs[hdr_idx]; + final int hdr_addr = m_hdrs[hdr_idx]; if (hdr_addr == 0) { throw new IllegalArgumentException("getFirstFixedForBlob called with unallocated address"); } // read in header block, then free each reference - byte[] hdr = new byte[(blcks+1) * 4 + 4]; // add space for checksum + final byte[] hdr = new byte[(blcks+1) * 4 + 4]; // add space for checksum m_store.getData(hdr_addr, hdr); + final DataInputStream instr = new DataInputStream( + new ByteArrayInputStream(hdr, 0, hdr.length-4) ); try { - DataInputStream instr = new DataInputStream( - new ByteArrayInputStream(hdr, 0, hdr.length-4) ); - int nallocs = instr.readInt(); - int faddr = instr.readInt(); + final int nallocs = instr.readInt(); + final int faddr = instr.readInt(); return faddr; @@ -148,7 +155,7 @@ /** * returns physical address of blob header if any. */ - public long getPhysicalAddress(int offset) { + public long getPhysicalAddress(final int offset) { return m_store.physicalAddress(m_hdrs[offset]); } @@ -156,7 +163,7 @@ * Since the Blob Allocator simply manages access to FixedAllocation blocks it does not manage any * allocations directly. */ - public int getPhysicalSize(int offset) { + public int getPhysicalSize(final int offset) { return 0; } @@ -183,36 +190,31 @@ // all data held by fixed allocators } - int m_freeSpots = 254; - /** * FIXME: There is a symmetry problem with read/write where one takes a Stream and the other * return a byte[]. This is problematical with using the checksums. */ - public void read(DataInputStream str) { + public void read(final DataInputStream str) { m_freeSpots = 0; try { for (int i = 0; i < 254; i++) { m_hdrs[i] = str.readInt(); if (m_hdrs[i] == 0) m_freeSpots++; } - int chk = str.readInt(); + final int chk = str.readInt(); // checksum int chk = ChecksumUtility.getCHK().checksum(buf, str.size()); } catch (IOException e) { - e.printStackTrace(); + log.error(e,e); throw new IllegalStateException(e); } } - public void setDiskAddr(int addr) { + public void setDiskAddr(final int addr) { m_diskAddr = addr; } - private ArrayList m_freeList; - private long m_startAddr; - - public void setFreeList(ArrayList list) { + public void setFreeList(final ArrayList list) { m_freeList = list; if (hasFree()) { @@ -230,28 +232,29 @@ * can safely be used to sort a BlobAllocator against the previous (and subsequent) allocators we * access the previous allocators address. */ - public void setIndex(int index) { + public void setIndex(final int index) { m_index = index; } - public boolean verify(int addr) { + // @todo why is this a NOP? Javadoc. + public boolean verify(final int addr) { // TODO Auto-generated method stub return false; } public byte[] write() { try { - byte[] buf = new byte[1024]; - DataOutputStream str = new DataOutputStream(new FixedOutputStream(buf)); + final byte[] buf = new byte[1024]; // @todo why this const? + final DataOutputStream str = new DataOutputStream(new FixedOutputStream(buf)); str.writeInt(m_sortAddr); - for (int i = 0; i < 254; i++) { + for (int i = 0; i < 254; i++) { // @todo why this const? str.writeInt(m_hdrs[i]); } // add checksum - int chk = ChecksumUtility.getCHK().checksum(buf, str.size()); + final int chk = ChecksumUtility.getCHK().checksum(buf, str.size()); str.writeInt(chk); return buf; @@ -260,15 +263,15 @@ } } - public int compareTo(Object o) { - Allocator alloc = (Allocator) o; + public int compareTo(final Object o) { + final Allocator alloc = (Allocator) o; assert getStartAddr() != alloc.getStartAddr(); return (getStartAddr() < alloc.getStartAddr()) ? -1 : 1; } - public int register(int addr) { + public int register(final int addr) { assert m_freeSpots > 0; m_store.addToCommit(this); @@ -281,7 +284,7 @@ m_freeList.remove(this); } - int ret = -((m_index << RWStore.OFFSET_BITS) + i); + final int ret = -((m_index << RWStore.OFFSET_BITS) + i); if (((-ret) & RWStore.OFFSET_BITS_MASK) > m_hdrs.length) throw new IllegalStateException("Invalid blob offset: " + ((-ret) & RWStore.OFFSET_BITS_MASK)); @@ -300,15 +303,15 @@ return m_index; } - public int getBlobHdrAddress(int hdrIndex) { + public int getBlobHdrAddress(final int hdrIndex) { return m_hdrs[hdrIndex]; } - public void appendShortStats(StringBuilder str, AllocationStats[] stats) { + public void appendShortStats(final StringBuilder str, final AllocationStats[] stats) { str.append("Index: " + m_index + ", address: " + getStartAddr() + ", BLOB\n"); } - public boolean isAllocated(int offset) { + public boolean isAllocated(final int offset) { return m_hdrs[offset] != 0; } @@ -316,7 +319,7 @@ * This is okay as a NOP. The true allocation is managed by the * FixedAllocators. */ - public void detachContext(IAllocationContext context) { + public void detachContext(final IAllocationContext context) { // NOP } @@ -324,8 +327,8 @@ * Since the real allocation is in the FixedAllocators, this should delegate * to the first address, in which case */ - public boolean canImmediatelyFree(int addr, int size, IAllocationContext context) { - int faddr = this.getFirstFixedForBlob(addr, size); + public boolean canImmediatelyFree(final int addr, final int size, final IAllocationContext context) { + final int faddr = this.getFirstFixedForBlob(addr, size); return m_store.getBlockByAddress(faddr).canImmediatelyFree(faddr, 0, context); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-09 11:48:28
|
Revision: 3920 http://bigdata.svn.sourceforge.net/bigdata/?rev=3920&view=rev Author: martyncutcher Date: 2010-11-09 11:48:21 +0000 (Tue, 09 Nov 2010) Log Message: ----------- Add version info to MetaBits header, correctly recycle PSOutputStreams, and fix erroneous maxAlloc assertions Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-08 23:56:01 UTC (rev 3919) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-09 11:48:21 UTC (rev 3920) @@ -56,13 +56,13 @@ } public int alloc(RWStore store, int size, IAllocationContext context) { - assert size > m_store.m_maxFixedAlloc; + assert size > (m_store.m_maxFixedAlloc-4); return 0; } public boolean free(int addr, int sze) { - if (sze < m_store.m_maxFixedAlloc) + if (sze < (m_store.m_maxFixedAlloc-4)) throw new IllegalArgumentException("Unexpected address size"); int alloc = m_store.m_maxFixedAlloc-4; int blcks = (alloc - 1 + sze)/alloc; @@ -103,8 +103,8 @@ } public int getFirstFixedForBlob(int addr, int sze) { - if (sze < m_store.m_maxFixedAlloc) - throw new IllegalArgumentException("Unexpected address size"); + if (sze < (m_store.m_maxFixedAlloc-4)) + throw new IllegalArgumentException("Unexpected address size: " + sze); int alloc = m_store.m_maxFixedAlloc-4; int blcks = (alloc - 1 + sze)/alloc; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-08 23:56:01 UTC (rev 3919) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-09 11:48:21 UTC (rev 3920) @@ -79,27 +79,24 @@ private static PSOutputStream m_poolHead = null; private static PSOutputStream m_poolTail = null; - private static Integer m_lock = new Integer(42); private static int m_streamCount = 0; - public static PSOutputStream getNew(final IStore store, final int maxAlloc, final IAllocationContext context) { - synchronized (m_lock) { - PSOutputStream ret = m_poolHead; - if (ret != null) { - m_streamCount--; - - m_poolHead = ret.next(); - if (m_poolHead == null) { - m_poolTail = null; - } - } else { - ret = new PSOutputStream(); - } + public static synchronized PSOutputStream getNew(final IStore store, final int maxAlloc, final IAllocationContext context) { + PSOutputStream ret = m_poolHead; + if (ret != null) { + m_streamCount--; - ret.init(store, maxAlloc, context); - - return ret; + m_poolHead = ret.next(); + if (m_poolHead == null) { + m_poolTail = null; + } + } else { + ret = new PSOutputStream(); } + + ret.init(store, maxAlloc, context); + + return ret; } /******************************************************************* @@ -110,23 +107,21 @@ * maximum of 10 streams are maintained - adding up to 80K to the * garbage collect copy. **/ - static void returnStream(PSOutputStream stream) { - synchronized (m_lock) { - if (m_streamCount > 10) { - return; - } - - stream.m_count = 0; // avoid overflow - - if (m_poolTail != null) { - m_poolTail.setNext(stream); - } else { - m_poolHead = stream; - } - - m_poolTail = stream; - m_streamCount++; + static synchronized void returnStream(PSOutputStream stream) { + if (m_streamCount > 10) { + return; } + + stream.m_count = 0; // avoid overflow + + if (m_poolTail != null) { + m_poolTail.setNext(stream); + } else { + m_poolHead = stream; + } + + m_poolTail = stream; + m_streamCount++; } private int[] m_blobHeader = null; @@ -161,6 +156,7 @@ void init(IStore store, int maxAlloc, IAllocationContext context) { m_store = store; m_context = context; + m_next = null; m_blobThreshold = maxAlloc-4; // allow for checksum @@ -357,11 +353,6 @@ return m_bytesWritten; } - protected void finalize() throws Throwable { - close(); - super.finalize(); - } - public OutputStream getFilterWrapper(final boolean saveBeforeClose) { return new FilterOutputStream(this) { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-08 23:56:01 UTC (rev 3919) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-09 11:48:21 UTC (rev 3920) @@ -283,7 +283,7 @@ static final int ALLOCATION_SCALEUP = 16; // multiplier to convert allocations based on minimum allocation of 32k static private final int META_ALLOCATION = 8; // 8 * 32K is size of meta Allocation - static final int BLOB_FIXED_ALLOCS = 1024; + static final int BLOB_FIXED_ALLOCS = 2048; // private ICommitCallback m_commitCallback; // // public void setCommitCallback(final ICommitCallback callback) { @@ -760,6 +760,10 @@ final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); + final int storeVersion = strBuf.readInt(); + if (storeVersion != cVersion) { + throw new IllegalStateException("Incompatible RWStore header version"); + } m_lastDeferredReleaseTime = strBuf.readLong(); cDefaultMetaBitsSize = strBuf.readInt(); @@ -768,7 +772,7 @@ for (int i = 0; i < allocBlocks; i++) { m_allocSizes[i] = strBuf.readInt(); } - m_metaBitsSize = metaBitsStore - allocBlocks - 4; // allow for deferred free + m_metaBitsSize = metaBitsStore - allocBlocks - cMetaHdrFields; // allow for header fields m_metaBits = new int[m_metaBitsSize]; if (log.isInfoEnabled()) { log.info("Raw MetaBitsAddr: " + rawmbaddr); @@ -1594,6 +1598,14 @@ throw new RuntimeException("Closed Store?", e); + } finally { + try { + psout.close(); // return stream + } catch (IOException ioe) { + // should not happen, since this should only be + // recycling + log.warn("Unexpected error closing PSOutputStream", ioe); + } } } @@ -1750,11 +1762,12 @@ // frees. // the cDefaultMetaBitsSize is also written since this can now be // parameterized. - final int len = 4 * (2 + 1 + 1 + m_allocSizes.length + m_metaBits.length); + final int len = 4 * (cMetaHdrFields + m_allocSizes.length + m_metaBits.length); final byte buf[] = new byte[len]; final FixedOutputStream str = new FixedOutputStream(buf); try { + str.writeInt(cVersion); str.writeLong(m_lastDeferredReleaseTime); str.writeInt(cDefaultMetaBitsSize); @@ -1941,6 +1954,7 @@ ints += 9 * allocBlocks; ints += 2; // for deferredFreeListAddr and size + ints += 1; // for version return ints*4; // return as bytes } @@ -1960,6 +1974,17 @@ */ /** + * MetaBits HEADER version must be changed when the header or allocator + * serialization changes + * + * Use BCD-style numbering so + * 0x0200 == 2.00 + * 0x0320 == 3.20 + */ + final private int cVersion = 0x0200; + + final private int cMetaHdrFields = 5; // version, deferredFree(long), + /** * @see Options#META_BITS_SIZE */ private int cDefaultMetaBitsSize; @@ -2393,11 +2418,12 @@ str.append("RWStore Allocation Summary\n"); str.append("-------------------------\n"); str.append(padRight("Allocator", 10)); - str.append(padLeft("Slots used", 12)); - str.append(padLeft("available", 12)); - str.append(padLeft("Store used", 14)); - str.append(padLeft("available", 14)); + str.append(padLeft("SlotsUsed", 12)); + str.append(padLeft("reserved", 12)); + str.append(padLeft("StoreUsed", 14)); + str.append(padLeft("reserved", 14)); str.append(padLeft("Usage", 8)); + str.append(padLeft("Store", 8)); str.append("\n"); long treserved = 0; long treservedSlots = 0; @@ -2410,11 +2436,16 @@ final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; tfilled += filled; tfilledSlots += stats[i].m_filledSlots; + } + for (int i = 0; i < stats.length; i++) { + final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; + final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; str.append(padRight("" + stats[i].m_blockSize, 10)); str.append(padLeft("" + stats[i].m_filledSlots, 12) + padLeft("" + stats[i].m_reservedSlots, 12)); str.append(padLeft("" + filled, 14) + padLeft("" + reserved, 14)); str.append(padLeft("" + (reserved==0?0:(filled * 100 / reserved)) + "%", 8)); + str.append(padLeft("" + (treserved==0?0:(reserved * 100 / treserved)) + "%", 8)); str.append("\n"); } str.append(padRight("Totals", 10)); @@ -2704,16 +2735,15 @@ } /** - * Note that the representation of the - * + * The * @return long representation of metaBitsAddr PLUS the size */ public long getMetaBitsAddr() { long ret = physicalAddress((int) m_metaBitsAddr); ret <<= 16; - // include space for allocSizes and deferred free info AND cDefaultMetaBitsSize - final int metaBitsSize = 2 + 1 + m_metaBits.length + m_allocSizes.length + 1; + // include space for version, allocSizes and deferred free info AND cDefaultMetaBitsSize + final int metaBitsSize = cMetaHdrFields + m_metaBits.length + m_allocSizes.length; ret += metaBitsSize; if (log.isTraceEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-11-08 23:56:10
|
Revision: 3919 http://bigdata.svn.sourceforge.net/bigdata/?rev=3919&view=rev Author: btmurphy Date: 2010-11-08 23:56:01 +0000 (Mon, 08 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT FOR SAFETY - phase 1 of callable executor (client service) smart proxy work. Includes fixes to issues identified by the tests; in particular the com.bigdata.service and com.bigdata.jini tests Modified Paths: -------------- branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/ScaleOutIndexManager.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/AbstractResource.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/IResourceLocator.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/ResourceLocatorMap.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractStepTask.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractFederation.java branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractIndexCache.java branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientService.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientTaskWrapper.java branches/dev-btm/bigdata/src/java/com/bigdata/service/IClientServiceCallable.java branches/dev-btm/bigdata/src/java/com/bigdata/service/IndexCache.java branches/dev-btm/bigdata/src/java/com/bigdata/service/MetadataService.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/ClientIndexView.java branches/dev-btm/bigdata/src/test/com/bigdata/relation/locator/TestDefaultResourceLocator.java branches/dev-btm/bigdata/src/test/com/bigdata/service/AbstractEmbeddedFederationTestCase.java branches/dev-btm/bigdata/src/test/com/bigdata/service/TestEDS.java branches/dev-btm/bigdata/src/test/com/bigdata/service/TestEmbeddedClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/EmbeddedCallableExecutor.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/ZookeeperServerConfiguration.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/process/ZookeeperProcessHelper.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/loadbalancer/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/AbstractServer.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/DataServer.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/JiniClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/lookup/ServiceCache.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/AbstractClientTask.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/ResourceBufferTask.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/TaskMaster.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/ConfigDeployUtil.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/zookeeper/ZooHelper.java branches/dev-btm/bigdata-jini/src/resources/config/bigdataStandaloneTesting.config branches/dev-btm/bigdata-jini/src/test/com/bigdata/jini/start/AbstractFedZooTestCase.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestAll.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestBigdataClient.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestAll.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/magic/IRISUtils.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/magic/MagicAccessPath.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/magic/TempMagicStore.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/AsynchronousStatementBufferFactory.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/AbstractRuleFastClosure_3_5_6_7_9.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOAccessPath.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPORelation.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/TripleStoreUtility.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/EDSAsyncLoader.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/StressTestCentos.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStoreTransactionSemantics.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/TestRelationLocator.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailHelper.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java branches/dev-btm/bigdata-sails/src/test/com/bigdata/rdf/stress/LoadClosureAndQueryTest.java Modified: branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -107,14 +107,30 @@ /** * {@link ITx#READ_COMMITTED} view. */ - public BigdataFileSystem getReadCommitted() { - - if (INFO) +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE public BigdataFileSystem getReadCommitted() { +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE if (INFO) +//BTM - PRE_CLIENT_SERVICE log.info(""); +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE return (BigdataFileSystem) indexManager.getResourceLocator().locate( +//BTM - PRE_CLIENT_SERVICE GLOBAL_FILE_SYSTEM_NAMESPACE, ITx.READ_COMMITTED); +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE } + public BigdataFileSystem getReadCommitted + (IConcurrencyManager concurrencyManager, + IBigdataDiscoveryManagement discoveryManager) + { + if (INFO) { log.info(""); - - return (BigdataFileSystem) indexManager.getResourceLocator().locate( - GLOBAL_FILE_SYSTEM_NAMESPACE, ITx.READ_COMMITTED); - + } + return (BigdataFileSystem) indexManager.getResourceLocator() + .locate( indexManager, + concurrencyManager, + discoveryManager, + GLOBAL_FILE_SYSTEM_NAMESPACE, + ITx.READ_COMMITTED ); } +//BTM - PRE_CLIENT_SERVICE - END } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -75,7 +75,12 @@ try { EventReceivingService serviceRef = (EventReceivingService)(discoveryMgr.getLoadBalancerService()); - + if (serviceRef == null) { + logger.log(Level.WARN, "cannot send events to load " + +"balancer from "+serviceName + +" - load balancer unavailable"); + return; + } final long begin = System.currentTimeMillis();//for logging final LinkedList<Event> queuedEvents = new LinkedList<Event>(); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -2325,15 +2325,22 @@ // unisolated view - will create if it does not exist. //BTM - PRE_CLIENT_SERVICE - BEGIN //BTM - PRE_CLIENT_SERVICE return new GlobalFileSystemHelper(this).getGlobalFileSystem(); +//BTM - PRE_CLIENT_SERVICE } +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE // read committed view IFF it exists otherwise [null] +//BTM - PRE_CLIENT_SERVICE return new GlobalFileSystemHelper(this).getReadCommitted(); return new GlobalFileSystemHelper(this) .getGlobalFileSystem (concurrencyManager, resourceManager.getDiscoveryManager()); -//BTM - PRE_CLIENT_SERVICE - END } // read committed view IFF it exists otherwise [null] - return new GlobalFileSystemHelper(this).getReadCommitted(); + return new GlobalFileSystemHelper(this) + .getReadCommitted + (concurrencyManager, + resourceManager.getDiscoveryManager()); +//BTM - PRE_CLIENT_SERVICE - END } @@ -2711,7 +2718,13 @@ }; - return new GlobalFileSystemHelper(tmp).getReadCommitted(); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE return new GlobalFileSystemHelper(tmp).getReadCommitted(); + return new GlobalFileSystemHelper(tmp) + .getReadCommitted + (concurrencyManager, + resourceManager.getDiscoveryManager()); +//BTM - PRE_CLIENT_SERVICE - END } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/journal/ScaleOutIndexManager.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/journal/ScaleOutIndexManager.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/journal/ScaleOutIndexManager.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -276,7 +276,7 @@ tempStoreFactory.closeAll(); } - // Required IIndexManager + // Required by IIndexManager public void registerIndex(IndexMetadata metadata) { registerIndex(metadata, null); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/relation/AbstractResource.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/relation/AbstractResource.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/relation/AbstractResource.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -542,9 +542,18 @@ } - container = getIndexManager() - .getResourceLocator() - .locate(getContainerNamespace(), getTimestamp()); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE container = getIndexManager() +//BTM - PRE_CLIENT_SERVICE .getResourceLocator() +//BTM - PRE_CLIENT_SERVICE .locate(getContainerNamespace(), getTimestamp()); + container = + getIndexManager().getResourceLocator() + .locate( getIndexManager(), + getConcurrencyManager(), + getDiscoveryManager(), + getContainerNamespace(), + getTimestamp() ); +//BTM - PRE_CLIENT_SERVICE - END } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -52,6 +52,10 @@ import com.bigdata.service.IBigdataFederation; import com.bigdata.sparse.SparseRowStore; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; + /** * Generic implementation relies on a ctor for the resource with the following * method signature: @@ -181,7 +185,15 @@ } // @todo hotspot 2% total query time. - public T locate(final String namespace, final long timestamp) { +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE public T locate(final String namespace, final long timestamp) { + public T locate(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, + IBigdataDiscoveryManagement discoveryManager, + final String namespace, + final long timestamp) + { +//BTM - FOR_CLIENT_SERVICE - END if (namespace == null) throw new IllegalArgumentException(); @@ -258,7 +270,14 @@ } // pass request to delegate. - resource = delegate.locate(namespace, timestamp); +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE resource = delegate.locate(namespace, timestamp); + resource = delegate.locate(indexManager, + concurrencyManager, + discoveryManager, + namespace, + timestamp); +//BTM - FOR_CLIENT_SERVICE - END if (resource != null) { @@ -317,8 +336,17 @@ } // create a new instance of the relation. - resource = newInstance(cls, foundOn.get(), namespace, timestamp, - properties); +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE resource = newInstance(cls, foundOn.get(), namespace, timestamp, +//BTM - FOR_CLIENT_SERVICE properties); + resource = newInstance(cls, + foundOn.get(),//indexManager + concurrencyManager, + discoveryManager, + namespace, + timestamp, + properties); +//BTM - FOR_CLIENT_SERVICE - END // Add to the cache. put(resource); @@ -544,9 +572,19 @@ * * @return A new instance of the identifed resource. */ +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE protected T newInstance(final Class<? extends T> cls, +//BTM - FOR_CLIENT_SERVICE final IIndexManager indexManager, final String namespace, +//BTM - FOR_CLIENT_SERVICE final long timestamp, final Properties properties) { protected T newInstance(final Class<? extends T> cls, - final IIndexManager indexManager, final String namespace, - final long timestamp, final Properties properties) { + final IIndexManager indexManager, + final IConcurrencyManager concurrencyManager, + final IBigdataDiscoveryManagement discoveryManager, + final String namespace, + final long timestamp, + final Properties properties) + { +//BTM - FOR_CLIENT_SERVICE - END if (cls == null) throw new IllegalArgumentException(); @@ -563,12 +601,24 @@ final Constructor<? extends T> ctor; try { - ctor = cls.getConstructor(new Class[] {// - IIndexManager.class,// - String.class,// relation namespace - Long.class, // timestamp of the view - Properties.class // configuration properties. - }); +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE ctor = cls.getConstructor(new Class[] {// +//BTM - FOR_CLIENT_SERVICE IIndexManager.class,// +//BTM - FOR_CLIENT_SERVICE String.class,// relation namespace +//BTM - FOR_CLIENT_SERVICE Long.class, // timestamp of the view +//BTM - FOR_CLIENT_SERVICE Properties.class // configuration properties. +//BTM - FOR_CLIENT_SERVICE }); + ctor = cls.getConstructor + (new Class[] + { IIndexManager.class, + IConcurrencyManager.class, + IBigdataDiscoveryManagement.class, + String.class, // relation namespace + Long.class, // timestamp of the view + Properties.class // configuration properties. + } + ); +//BTM - FOR_CLIENT_SERVICE - END } catch (Exception e) { @@ -580,12 +630,24 @@ final T r; try { - r = ctor.newInstance(new Object[] {// - indexManager,// - namespace, // - timestamp, // - properties // - }); +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE r = ctor.newInstance(new Object[] {// +//BTM - FOR_CLIENT_SERVICE indexManager,// +//BTM - FOR_CLIENT_SERVICE namespace, // +//BTM - FOR_CLIENT_SERVICE timestamp, // +//BTM - FOR_CLIENT_SERVICE properties // +//BTM - FOR_CLIENT_SERVICE }); + r = ctor.newInstance + (new Object[] + { indexManager, + concurrencyManager, + discoveryManager, + namespace, + timestamp, + properties + } + ); +//BTM - FOR_CLIENT_SERVICE - END r.init(); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/IResourceLocator.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/IResourceLocator.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/IResourceLocator.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -32,6 +32,11 @@ import com.bigdata.relation.IRelation; import com.bigdata.service.IBigdataFederation; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; +import com.bigdata.journal.IIndexManager; + /** * An object that knows how to resolve a resource identifier (aka namespace) to * an {@link ILocatableResource} instance. "Locating" a relation means (a) @@ -68,6 +73,13 @@ * <code>null</code> if the resource declaration could not be * resolved. */ - public T locate(String namespace, long timestamp); +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE public T locate(String namespace, long timestamp); + public T locate(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, + IBigdataDiscoveryManagement discoveryManager, + String namespace, + long timestamp); +//BTM - FOR_CLIENT_SERVICE - END } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/ResourceLocatorMap.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/ResourceLocatorMap.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/ResourceLocatorMap.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -33,6 +33,11 @@ import com.bigdata.relation.IRelation; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; +import com.bigdata.journal.IIndexManager; + /** * A mapping between {@link String}s and {@link IResourceLocator}s. * This can be used to locate local, temporary or virtual relations. @@ -77,7 +82,15 @@ } - public T locate(String relationName, long timestamp) { +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE public T locate(String relationName, long timestamp) { + public T locate(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, + IBigdataDiscoveryManagement discoveryManager, + String relationName, + long timestamp) + { +//BTM - PRE_CLIENT_SERVICE - END if (relationName == null) throw new IllegalArgumentException(); @@ -90,7 +103,14 @@ } - return relationLocator.locate(relationName, timestamp); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE return relationLocator.locate(relationName, timestamp); + return relationLocator.locate(indexManager, + concurrencyManager, + discoveryManager, + relationName, + timestamp); +//BTM - PRE_CLIENT_SERVICE - END } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractStepTask.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractStepTask.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractStepTask.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -766,9 +766,18 @@ if (!c.containsKey(relationIdentifier)) { - final IRelation relation = (IRelation) indexManager - .getResourceLocator().locate(relationIdentifier, - timestamp); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE final IRelation relation = (IRelation) indexManager +//BTM - PRE_CLIENT_SERVICE .getResourceLocator().locate(relationIdentifier, +//BTM - PRE_CLIENT_SERVICE timestamp); + final IRelation relation = + (IRelation) indexManager.getResourceLocator() + .locate(indexManager, + concurrencyManager, + discoveryManager, + relationIdentifier, + timestamp); +//BTM - PRE_CLIENT_SERVICE - BEGIN c.put(relationIdentifier, relation); @@ -839,9 +848,18 @@ if (!c.containsKey(relationName)) { - final IRelation relation = (IRelation) indexManager - .getResourceLocator().locate(relationName, - timestamp); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE final IRelation relation = (IRelation) indexManager +//BTM - PRE_CLIENT_SERVICE .getResourceLocator().locate(relationName, +//BTM - PRE_CLIENT_SERVICE timestamp); + final IRelation relation = + (IRelation) indexManager.getResourceLocator() + .locate(indexManager, + concurrencyManager, + discoveryManager, + relationName, + timestamp); +//BTM - PRE_CLIENT_SERVICE - END c.put(relationName, relation); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -419,13 +419,17 @@ } //false ==> allow in-progress tasks to complete - queueStatsTaskFuture.cancel(false); + if (queueStatsTaskFuture != null) { + queueStatsTaskFuture.cancel(false); + } Util.shutdownExecutorService (scheduledExecutor, timeout, serviceName+".scheduledExecutor", logger); - threadPool.shutdownNow(); + if (threadPool != null) { + threadPool.shutdownNow(); + } //send one last event report (same logic as in AbstractFederation) new EventQueueSenderTask Modified: branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -746,6 +746,7 @@ */ public boolean awaitRunning() { +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","*** StoreManager.awaitRunning >>> isOpen="+isOpen()+", isStarting="+isStarting()); while (isOpen() && isStarting()) { try { @@ -762,8 +763,10 @@ } +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","*** StoreManager.awaitRunning >>> END LOOP: isOpen="+isOpen()+", isStarting="+isStarting()); } +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","*** StoreManager.awaitRunning >>> RETURN isRunning="+isRunning()+"\n\n"); return isRunning(); } @@ -1358,21 +1361,21 @@ try { -System.out.println("\nStoreManager#Startup >>> start()"); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager#Startup >>> start()"); start(); // successful startup -System.out.println("StoreManager#Startup >>> set isStarting to FALSE"); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager#Startup >>> set isStarting to FALSE"); starting.set(false); // Purge any resources that we no longer require. -System.out.println("StoreManager#Startup >>> PURGE old resources during startup"); if(purgeOldResourcesDuringStartup) purgeOldResources(); } catch (Throwable ex) { -System.out.println("StoreManager#Startup >>> EXCEPTION >>> "+ex); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager#Startup >>> EXCEPTION >>> "+ex+"\n"+com.bigdata.util.Util.getThrowableStackTrace(ex)+"\n"); + // avoid possibility that isRunning() could become true. open.set(false); @@ -1392,7 +1395,7 @@ * flag is turned off. */ -System.out.println("StoreManager#Startup >>> FINALLY >>> set isStarting to FALSE"); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager#Startup >>> FINALLY >>> set isStarting to FALSE"); starting.set(false); if (log.isInfoEnabled()) @@ -1430,30 +1433,29 @@ final private void start() throws InterruptedException { if (!isStarting()) { - throw new IllegalStateException(); - } /* * Verify that the concurrency manager has been set and wait a while * it if is not available yet. */ - { - int nwaits = 0; - while (true) { - try { - getConcurrencyManager(); - break; - } catch (IllegalStateException ex) { - Thread.sleep(100/* ms */); - if (++nwaits % 50 == 0) - log.warn("Waiting for concurrency manager"); - } - } - } + {//begin block + int nwaits = 0; + while (true) { + try { + getConcurrencyManager(); + break; + } catch (IllegalStateException ex) { + Thread.sleep(100/* ms */); + if (++nwaits % 50 == 0) + log.warn("Waiting for concurrency manager"); + } + }//end loop + }//end block - try { +//BTM - PRE_CLIENT_SERVICE try { +//BTM - PRE_CLIENT_SERVICE //BTM - BEGIN - PRE_CLIENT_SERVICE //BTM - PRE_CLIENT_SERVICE final IBigdataFederation<?> fed = getFederation(); //BTM - PRE_CLIENT_SERVICE if (fed == null) { @@ -1496,65 +1498,66 @@ //BTM - PRE_CLIENT_SERVICE log.warn("\n"+stackTrace+"\n"); //BTM - PRE_CLIENT_SERVICE} //BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE } catch (UnsupportedOperationException ex) { +//BTM - PRE_CLIENT_SERVICE log.warn("Federation not available - running in test case?"); +//BTM - PRE_CLIENT_SERVICE } //BTM - maintain original logic for now - final IBigdataDiscoveryManagement discoveryMgr = + try { + final IBigdataDiscoveryManagement discoveryMgr = getDiscoveryManager(); - if (discoveryMgr == null) { - // Some of the unit tests do not start - // the txs until after the shard service. - // For those tests getDiscoveryManager() - // will return null during startup() of - // the shard service. To have a common - // code path, an exception is thrown - // here, but caught below. - throw new UnsupportedOperationException(); - } - // Wait no more than N seconds for discovery - int nWait = 30; - boolean discoveredTxnSrvc = false; - for(int i=0; i<nWait; i++) { - if (discoveryMgr.getTransactionService() - != null) - { - discoveredTxnSrvc = true; - break; - } - try { - Thread.sleep(1000L); - } catch(InterruptedException ie) { } - if (log.isDebugEnabled()) { - log.debug - ("waiting for transaction " - +"service discovery"); - } - } - if(discoveredTxnSrvc) { - if (log.isDebugEnabled()) { - log.debug - ("discovered transaction " - +"service"); - } - } else { - log.warn("transaction service " - +"unreachable"); -StackTraceElement[] e = (Thread.currentThread()).getStackTrace(); -StringBuffer buf = new StringBuffer(" "+(e[0]).toString()+"\n"); -for(int i=1;i<e.length;i++) { - buf.append(" "+(e[i]).toString()+"\n"); -} -String stackTrace = buf.toString(); -log.warn("\n"+stackTrace+"\n"); - }//endif(discoveredTxnSrvc) +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> DISCOVERY MGR = "+discoveryMgr+"\n"); + if (discoveryMgr == null) { + // Some of the unit tests do not start + // the txs until after the shard service. + // For those tests getDiscoveryManager() + // will return null during startup() of + // the shard service. To have a common + // code path, an exception is thrown + // here, but caught below. + + throw new UnsupportedOperationException + ("null discoveryMgr"); + } + + // Wait no more than N seconds for discovery + int nWait = 120; + boolean discoveredTxnSrvc = false; + for(int i=0; i<nWait; i++) { + if (discoveryMgr.getTransactionService() != null) { + discoveredTxnSrvc = true; + break; + } + try { + Thread.sleep(1000L); + } catch(InterruptedException ie) { } + + if (log.isDebugEnabled()) { + log.debug("waiting for transaction " + +"service discovery"); + } + if(discoveredTxnSrvc) { + if (log.isDebugEnabled()) { + log.debug("discovered transaction service"); + } + } else { + log.warn("transaction service unreachable"); + }//endif(discoveredTxnSrvc) + }//endloop(nWait) +//BTM if(discoveredTxnSrvc) { +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> TRANSACTION SERVICE DISCOVERED"); +//BTM }else{ +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> TRANSACTION SERVICE UNREACHABLE\n"); +//BTM } + } catch (UnsupportedOperationException ex) { +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> FEDERATION UNAVAILABLE - test case?\n"); + log.warn("Federation not available - running in test case?"); + } //BTM - END - PRE_CLIENT_SERVICE - } catch (UnsupportedOperationException ex) { - log.warn("Federation not available - running in test case?"); - } - - /* - * Look for pre-existing data files. - */ + /* + * Look for pre-existing data files. + */ if (!isTransient) { if (log.isInfoEnabled()) @@ -1562,7 +1565,9 @@ final Stats stats = new Stats(); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> scanDataDirectory [dataDir="+dataDir+"]"); scanDataDirectory(dataDir, stats); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> SCAN DONE [dataDir="+dataDir+", stats="+stats+"]"); final int nbad = stats.badFiles.size(); @@ -2122,7 +2127,7 @@ private void scanDataDirectory(File dir, Stats stats) throws InterruptedException { -System.out.println("\nStoreManager.scanDataDirectory >>> dir="+dir); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.scanDataDirectory >>> [dataDir="+dir+"]"); if (dir == null) throw new IllegalArgumentException(); @@ -2138,11 +2143,12 @@ if (file.isDirectory()) { -System.out.println("\nStoreManager.scanDataDirectory >>> dir="+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanDataDirectory >>> PROCESSING DIRECTORY [dir="+file+"]"); scanDataDirectory(file, stats); } else { -System.out.println("\nStoreManager.scanDataDirectory >>> scanFile: "+file); + +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanDataDirectory >>> scanFile [file="+file+"]"); scanFile(file, stats); } @@ -2152,7 +2158,6 @@ } private void scanFile(File file, Stats stats) throws InterruptedException { -System.out.println("\nStoreManager.scanFile >>> "+file); if (Thread.interrupted()) throw new InterruptedException(); @@ -2167,6 +2172,7 @@ // #of bytes in the file as reported by the OS. final long len = file.length(); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.scanFile >>> [file="+file+", length="+len+"]"); if (len > 0 && name.endsWith(Options.JNL)) { @@ -2182,7 +2188,7 @@ try { -System.out.println("\nStoreManager.scanFile >>> NEW ManagedJournal: "+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> JNL >>> NEW ManagedJournal"); tmp = new ManagedJournal(properties); } catch (Exception ex) { @@ -2194,6 +2200,7 @@ stats.badFiles.add(file.getAbsolutePath()); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> JNL >>> EXCEPTION - "+ex+"\n"+com.bigdata.util.Util.getThrowableStackTrace(ex)+"\n"); return; } @@ -2222,7 +2229,7 @@ final IndexSegmentStore segStore; try { -System.out.println("\nStoreManager.scanFile >>> NEW IndexSegmentStore: "+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> SEG >>> NEW IndexSegmentStore("+file+")"); segStore = new IndexSegmentStore(file); } catch (Exception ex) { @@ -2234,6 +2241,7 @@ stats.badFiles.add(file.getAbsolutePath()); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> SEG >>> EXCEPTION - "+ex+"\n"+com.bigdata.util.Util.getThrowableStackTrace(ex)+"\n"); return; } @@ -2270,7 +2278,7 @@ && (name.endsWith(Options.JNL) || name .endsWith(Options.SEG))) { -System.out.println("\nStoreManager.scanFile >>> Ignoring empty file: "+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> Ignoring empty file: "+file); log.warn("Ignoring empty file: " + file); } else { @@ -2279,7 +2287,7 @@ * This file is not relevant to the resource manager. */ -System.out.println("\nStoreManager.scanFile >>> Ignoring irrelevant file: "+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> Ignoring irrelevant file: "+file); log.warn("Ignoring file: " + file); } @@ -2305,7 +2313,7 @@ // } // addResource(resource, file.getAbsoluteFile()); -System.out.println("\nStoreManager.scanFile >>> addResource: file="+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> addResource: file="+file); addResource(resource, file); } @@ -3349,7 +3357,7 @@ this.releaseTime = txService.getReleaseTime(); //BTM -log.warn("\n*** StoreManager.purgeOldResources: this.releaseTime="+this.releaseTime+"\n"); +log.warn("*** StoreManager.purgeOldResources: this.releaseTime="+this.releaseTime); } else { Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractFederation.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractFederation.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractFederation.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -89,6 +89,7 @@ import com.bigdata.journal.ConcurrencyManager; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.LocalTransactionManager; +import java.io.File; /** * Abstract base class for {@link IBigdataFederation} implementations. @@ -113,6 +114,7 @@ //BTM - FOR_CLIENT_SERVICE - BEGIN protected ResourceManager fedResourceMgr; protected IConcurrencyManager fedConcurrencyMgr; +private static int dataDirCounter = 0; //BTM - FOR_CLIENT_SERVICE - END /** @@ -674,16 +676,50 @@ // tempStoreFactory = new TemporaryStoreFactory(this.client // .getTempStoreMaxExtent()); -//BTM - FOR_CLIENT_SERVICE - BEGIN - for passing concurrencyMgr to getGlobalFileSystem and TemporaryStoreFactory.getTempStore +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE - NOTE: getGlobalFileSystem and TemporaryStoreFactory.getTempStore +//BTM - FOR_CLIENT_SERVICE - now expect a ConcurrencyManager to be passed into them. +//BTM - FOR_CLIENT_SERVICE - Thus, the code below was added to provide a ConcurrencyManager +//BTM - FOR_CLIENT_SERVICE - that this AbstractFederation can input when those methods +//BTM - FOR_CLIENT_SERVICE - are invoked. It is not expected that the ConcurrencyManager +//BTM - FOR_CLIENT_SERVICE - created here will be employed in other places in the +//BTM - FOR_CLIENT_SERVICE - code. Note also that in order to create a ConcurrencyManager, +//BTM - FOR_CLIENT_SERVICE - a ResourceManager must first be created and, that ResourceManager +//BTM - FOR_CLIENT_SERVICE - expects the properties that are input to its constructor +//BTM - FOR_CLIENT_SERVICE - will include a valid value for the property named, +//BTM - FOR_CLIENT_SERVICE - "com.bigdata.resources.StoreManager.dataDir". Without this +//BTM - FOR_CLIENT_SERVICE - property, the StoreManager on which the ResourceManager +//BTM - FOR_CLIENT_SERVICE - is based will fail to start. Additionally, it is very +//BTM - FOR_CLIENT_SERVICE - important that the value to which that property is set +//BTM - FOR_CLIENT_SERVICE - be unique for each instance of the ResourceManager/StoreManager +//BTM - FOR_CLIENT_SERVICE - that is created; otherwise, the StoreManager will again +//BTM - FOR_CLIENT_SERVICE - fail to start, and will ultimately throw an +//BTM - FOR_CLIENT_SERVICE - OverlappingFileLockException. Thus, to satisfy this +//BTM - FOR_CLIENT_SERVICE - requirement, the static variable named, 'dataDirCounter' +//BTM - FOR_CLIENT_SERVICE - is declared and incremented for each instantiation of +//BTM - FOR_CLIENT_SERVICE - this class; which provides a unique token that is used +//BTM - FOR_CLIENT_SERVICE - to modify the value of the system property relative to +//BTM - FOR_CLIENT_SERVICE - the other instances of the ResourceManager that are +//BTM - FOR_CLIENT_SERVICE - are created. + Properties resourceMgrProps = (Properties) (client.getProperties()).clone(); + resourceMgrProps.setProperty + ("com.bigdata.resources.StoreManager.dataDir", + System.getProperty("java.io.tmpdir") + +File.separator + +(FedResourceManager.class).getName() + +File.separator + +"StoreManager" + +File.separator + +"dataDir_"+(dataDirCounter++)); this.fedResourceMgr = new FedResourceManager ( (IBigdataDiscoveryManagement)this, (ILocalResourceManagement)this, (IIndexManager)this, - client.getProperties() ); + resourceMgrProps ); this.fedConcurrencyMgr = new ConcurrencyManager - (client.getProperties(), + (resourceMgrProps, new LocalTransactionManager ( (IBigdataDiscoveryManagement)this ), this.fedResourceMgr); @@ -798,7 +834,6 @@ assertOpen(); try { - UUID indexUUID = getMetadataService().registerScaleOutIndex( metadata, separatorKeys, dataServiceUUIDs); @@ -839,22 +874,18 @@ public void dropIndex(String name) { -String dbgFlnm = "TestEmbeddedClient.txt"; if (log.isInfoEnabled()) log.info("name=" + name); assertOpen(); -com.bigdata.util.Util.printStr(dbgFlnm, " AbstractFederation.dropIndex[name="+name+"] - assertOpen = OK"); try { -com.bigdata.util.Util.printStr(dbgFlnm, " AbstractFederation.dropIndex - metadataService = "+getMetadataService()); getMetadataService().dropScaleOutIndex(name); if (log.isInfoEnabled()) log.info("dropped scale-out index."); -com.bigdata.util.Util.printStr(dbgFlnm, " AbstractFederation.dropIndex - getIndexCache = "+getIndexCache()); getIndexCache().dropIndexFromCache(name); } catch (Exception e) { @@ -1752,7 +1783,7 @@ this.shutdown(); } - // For tests + // For tasks started by ClientService and for tests public IConcurrencyManager getConcurrencyManager() { return fedConcurrencyMgr; Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractIndexCache.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractIndexCache.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractIndexCache.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -105,14 +105,16 @@ final NT nt = new NT(name, timestamp); // test cache before synchronization. +log.warn("\nAbstractIndexCache.getIndex >>> 1a. ndx = indexCache.get("+nt+")\n"); T ndx = indexCache.get(nt); if (ndx != null) { +log.warn("\nAbstractIndexCache.getIndex >>> 1b. NOT NULL - ndx = "+ndx+")\n"); return ndx; } -log.warn("\nAbstractIndexCache.constructor >>> 1. ndx == NULL\n"); +log.warn("\nAbstractIndexCache.getIndex >>> 1b. ndx == NULL\n"); /* * Acquire a lock for the index name and timestamp. This allows @@ -123,25 +125,29 @@ try { +log.warn("\nAbstractIndexCache.getIndex >>> 2a. ndx == indexCache.get("+nt+")\n"); ndx = indexCache.get(nt); if (ndx == null) { -log.warn("\nAbstractIndexCache.constructor >>> 2. ndx == NULL\n"); +log.warn("\nAbstractIndexCache.getIndex >>> 2b. ndx == NULL\n"); +log.warn("\nAbstractIndexCache.getIndex >>> 3a. ndx = newView("+nt+")\n"); if ((ndx = newView(name, timestamp)) == null) { -log.warn("\nAbstractIndexCache.constructor >>> 3. newView -- indx == NULL\n"); if (INFO) log.info("name=" + name + " @ " + timestamp + " : no such index."); +log.warn("\nAbstractIndexCache.getIndex >>> 3b. newView("+nt+") = NULL >>> RETURN\n"); return null; - } + }//(newView == null) +log.warn("\nAbstractIndexCache.getIndex >>> 3b. NOT NULL - newView("+nt+") = "+ndx+"\n"); // add to the cache. // indexCache.put(nt, ndx, false/* dirty */); indexCache.put(nt, ndx); +log.warn("\nAbstractIndexCache.getIndex >>> 3c. indexCache.put("+nt+", "+ndx+")\n"); if (INFO) log.info("name=" + name + " @ " @@ -153,8 +159,9 @@ log.info("name=" + name + " @ " + timestamp + " : cache hit."); - } + }//endif(ndx == null) +log.warn("\nAbstractIndexCache.getIndex >>> 4. FINAL RETURN - ndx = "+ndx+"\n"); return ndx; } finally { @@ -191,8 +198,11 @@ final Map.Entry<NT, WeakReference<T>> entry = itr.next(); final T ndx = entry.getValue().get(); +log.warn("\nAbstractIndexCache.dropIndexFromCache >>> 1a. entry.getValue().get() = "+ndx+"\n"); + if(ndx == null) { +log.warn("\nAbstractIndexCache.dropIndexFromCache >>> 1b. ndx = NULL >>> NEXT ndx\n"); /* * The entry under the key has been cleared so we just skip @@ -220,16 +230,21 @@ + name + " @ " + timestamp); // remove from the cache. - indexCache.remove(entry.getKey()); +//BTM indexCache.remove(entry.getKey()); +Object retVal = indexCache.remove(entry.getKey()); +log.warn("\nAbstractIndexCache.dropIndexFromCache >>> 2. indexCache.remove("+entry.getKey()+") >>> DROPPED [KEY="+entry.getKey()+", VAL="+retVal+"]\n"); - } + }//endif(timestamp == ITx.UNISOLATED || ITx.READ_COMMITTED) - } + }//endif(name.equals(nt.getName) - } + }//end loop - } +log.warn("\nAbstractIndexCache.dropIndexFromCache >>> 3a. VERIFY - getIndex("+name+", ITx.READ_COMMITTED) = "+getIndex(name, ITx.READ_COMMITTED)+"]"); +log.warn("AbstractIndexCache.dropIndexFromCache >>> 3b. VERIFY - getIndex("+name+", ITx.UNISOLATED) = "+getIndex(name, ITx.UNISOLATED)+"]\n"); + }//end sync(indexCache) + } protected void shutdown() { Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -269,14 +269,14 @@ assertOpen(); -//BTM return getMetadataIndexCache().getIndex(name, timestamp); + return getMetadataIndexCache().getIndex(name, timestamp); -MetadataIndexCache cache = getMetadataIndexCache(); -IMetadataIndex index = cache.getIndex(name, timestamp); -log.warn("\n>>>>AbstractScaleOutFederation.getMetadataIndex: name="+name+", timestamp="+timestamp+", metadataIndexCache="+cache+", metadataIndex="+index+"\n"); -return index; +//BTM - PRE_CLIENT_SERVICE MetadataIndexCache cache = getMetadataIndexCache(); +//BTM - PRE_CLIENT_SERVICE IMetadataIndex index = cache.getIndex(name, timestamp); +//BTM - PRE_CLIENT_SERVICE log.warn("\n>>>>AbstractScaleOutFederation.getMetadataIndex: name="+name+", timestamp="+timestamp+", metadataIndexCache="+cache+", metadataIndex="+index+"\n"); +//BTM - PRE_CLIENT_SERVICE return index; } - + /** * Returns an iterator that will visit the {@link PartitionLocator}s for * the specified scale-out index key range. @@ -477,8 +477,7 @@ int ntries = 0; // updated each time through the loop. -//BTM IMetadataService metadataService = null; -ShardLocator metadataService = null; + ShardLocator metadataService = null; // updated each time through the loop. UUID[] dataServiceUUIDs = null; @@ -646,6 +645,20 @@ } +//BTM - FOR_CLIENT_SERVICE - BEGIN ----------------------------------------------------------- +//BTM - FOR_CLIENT_SERVICE - NOTE: this method was added to address trac issue #190 + public void dropIndex(String name) { + super.dropIndex(name); + try { + getMetadataIndexCache().dropIndexFromCache(name); + getIndexCache().dropIndexFromCache(name); + } catch (Exception e) {//maintain same logic as super.dropIndex? + throw new RuntimeException( e ); + } + } +//BTM - FOR_CLIENT_SERVICE - END ------------------------------------------------------------- + + //BTM - PRE_CLIENT_SERVICE - BEGIN - moved to standalone classes ------------------------------------------------------------------- //BTM - PRE_CLIENT_SERVICE /** //BTM - PRE_CLIENT_SERVICE //BTM - PRE_CLIENT_SERVICE * Task directs a {@link ShardService} to purge any unused resources and to Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientService.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientService.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientService.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -37,6 +37,7 @@ import com.bigdata.Banner; //BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; import com.bigdata.journal.IIndexManager; import com.bigdata.service.jini.JiniFederation; import com.bigdata.resources.ILocalResourceManagement; @@ -184,7 +185,9 @@ String zkRoot = fed.getZooConfig().zroot; return getFederation().getExecutorService().submit ( new ClientTaskWrapper( (IIndexManager)fed, + fed.getConcurrencyManager(), (ILocalResourceManagement)fed, + (IBigdataDiscoveryManagement)fed, this,//embeddedCallableExecutor task, zkClient, zkAcl, zkRoot) ); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientTaskWrapper.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientTaskWrapper.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientTaskWrapper.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -24,6 +24,8 @@ package com.bigdata.service; +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; import com.bigdata.journal.IIndexManager; import com.bigdata.resources.ILocalResourceManagement; @@ -37,7 +39,9 @@ public class ClientTaskWrapper<T> implements Callable<T> { private IIndexManager indexMgr; + private IConcurrencyManager concurrencyMgr; private ILocalResourceManagement localResourceMgr; + private IBigdataDiscoveryManagement discoveryMgr; private CallableExecutor embeddedCallableExecutor; private IClientServiceCallable<T> task; private ZooKeeper zkClient; @@ -45,7 +49,9 @@ private String zkRoot; public ClientTaskWrapper(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, ILocalResourceManagement localResourceManager, + IBigdataDiscoveryManagement discoveryManager, CallableExecutor embeddedCallableExecutor, IClientServiceCallable<T> task, ZooKeeper zookeeperClient, @@ -53,7 +59,9 @@ String zookeeperRoot) { this.indexMgr = indexManager; + this.concurrencyMgr = concurrencyManager; this.localResourceMgr = localResourceManager; + this.discoveryMgr = discoveryManager; this.embeddedCallableExecutor = embeddedCallableExecutor; this.task = task; this.zkClient = zookeeperClient; @@ -63,7 +71,8 @@ public T call() throws Exception { return task.startClientTask - (indexMgr, localResourceMgr, + (indexMgr, concurrencyMgr, + localResourceMgr, discoveryMgr, embeddedCallableExecutor, zkClient, zkAcl, zkRoot); } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/IClientServiceCallable.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/IClientServiceCallable.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/IClientServiceCallable.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -17,6 +17,8 @@ package com.bigdata.service; +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; import com.bigdata.journal.IIndexManager; import com.bigdata.resources.ILocalResourceManagement; @@ -50,7 +52,9 @@ * @throws Exception if unable to compute a result */ V startClientTask(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, ILocalResourceManagement localResourceManager, + IBigdataDiscoveryManagement discoveryManager, CallableExecutor embeddedCallableExecutor, ZooKeeper zookeeperClient, List<ACL> zookeeperAcl, Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/IndexCache.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/big... [truncated message content] |
From: <tho...@us...> - 2010-11-08 21:31:24
|
Revision: 3918 http://bigdata.svn.sourceforge.net/bigdata/?rev=3918&view=rev Author: thompsonbry Date: 2010-11-08 21:31:17 +0000 (Mon, 08 Nov 2010) Log Message: ----------- Added a utility class for exploring adaptive query optimization and wrote the initialization logic for the JGraph (in JoinGraph). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestJoinGraph.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/AdaptiveQueryOptimization.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-08 21:30:29 UTC (rev 3917) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-08 21:31:17 UTC (rev 3918) @@ -28,24 +28,51 @@ package com.bigdata.bop.controller; import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; +import org.apache.log4j.Logger; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpBase; import com.bigdata.bop.BOpContext; -import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.BOpContextBase; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.Constant; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IElement; import com.bigdata.bop.IPredicate; import com.bigdata.bop.IVariable; import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; +import com.bigdata.bop.ap.SampleIndex; +import com.bigdata.bop.bindingSet.HashBindingSet; +import com.bigdata.bop.engine.LocalChunkMessage; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.engine.RunningQuery; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.relation.IRelation; +import com.bigdata.relation.accesspath.IAccessPath; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.relation.rule.Rule; +import com.bigdata.striterator.Dechunkerator; /** * A join graph with annotations for estimated cardinality and other details in * support of runtime query optimization. A join graph is a collection of - * relations and joins which connect those relations. + * relations and joins which connect those relations. This boils down to a + * collection of {@link IPredicate}s (selects on relations) and shared variables + * (which identify joins). * <p> * * @see http://arxiv.org/PS_cache/arxiv/pdf/0810/0810.4809v1.pdf, XQuery Join @@ -86,163 +113,548 @@ */ public class JoinGraph extends PipelineOp { + private static final transient Logger log = Logger.getLogger(JoinGraph.class); + private static final long serialVersionUID = 1L; /** * Known annotations. */ public interface Annotations extends PipelineOp.Annotations { - /** - * The default sample size (100 is a good value). - */ - String SAMPLE_SIZE = "sampleSize"; + /** + * The vertices of the join graph expressed an an {@link IPredicate}[]. + */ + String VERTICES = JoinGraph.class.getName() + ".vertices"; + + /** + * The initial sample size (default {@value #DEFAULT_SAMPLE_SIZE}). + */ + String SAMPLE_SIZE = JoinGraph.class.getName() + ".sampleSize"; + + int DEFAULT_SAMPLE_SIZE = 100; } - /** - * Vertices of the join graph. + /** + * @see Annotations#VERTICES */ - private final Vertex[] V; + public IPredicate[] getVertices() { + + return (IPredicate[]) getRequiredProperty(Annotations.VERTICES); + + } /** - * Edges of the join graph. + * @see Annotations#SAMPLE_SIZE */ - private final Edge[] E; - - /** - * A vertex of the join graph is an annotated relation (this corresponds to - * an {@link IPredicate} with additional annotations to support the adaptive - * query optimization algorithm). - */ - private static class Vertex implements Serializable { + public int getSampleSize() { + + return getProperty(Annotations.SAMPLE_SIZE, Annotations.DEFAULT_SAMPLE_SIZE); + + } + + public JoinGraph(final NV ...anns) { - /** - * - */ - private static final long serialVersionUID = 1L; + this(BOpBase.NOARGS, NV.asMap(anns)); + + } - final IPredicate<?> pred; + /** + * + * @todo We can derive the vertices from the join operators or the join + * operators from the vertices. However, if a specific kind of join + * operator is required then the question is whether we have better + * information to make that choice when the join graph is evaluated or + * before it is constructed. + * + * @todo How we will handle optional joins? Presumably they are outside of + * the code join graph as part of the tail attached to that join + * graph. + * + * @todo How can join constraints be moved around? Just attach them where + * ever a variable becomes bound? And when do we filter out variables + * which are not required downstream? Once we decide on a join path + * and execute it fully (rather than sampling that join path). + */ + public JoinGraph(final BOp[] args, final Map<String,Object> anns) { - Vertex(final IPredicate<?> pred) { - if (pred == null) - throw new IllegalArgumentException(); - this.pred = pred; + super(args,anns); + + switch (getEvaluationContext()) { + case CONTROLLER: + break; + default: + throw new UnsupportedOperationException( + Annotations.EVALUATION_CONTEXT + "=" + + getEvaluationContext()); } + } - /** - * An edge of the join graph is an annotated join operator. The edges of the - * join graph are undirected. Edges exist when the vertices share at least - * one variable. - */ - private static class Edge implements Serializable { - - /** + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new JoinGraphTask(context)); + + } + + /** + * A vertex of the join graph is an annotated relation (this corresponds to + * an {@link IPredicate} with additional annotations to support the adaptive + * query optimization algorithm). + */ + public static class Vertex implements Serializable { + + /** * */ - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - /** - * The vertices connected by that edge. - */ - final Vertex v1, v2; + final IPredicate<?> pred; - /** - * A weight representing the estimated cardinality of the join. + /** + * The limit used to produce the {@link #sample}. + */ + int limit; + + /** + * Fast range count and <code>null</code> until initialized. + */ + Long rangeCount; + + /** + * Sample (when not-null). + */ + Object[] sample; + + Vertex(final IPredicate<?> pred) { + + if (pred == null) + throw new IllegalArgumentException(); + + this.pred = pred; + + } + + public String toString() { + + return "\nVertex{pred=" + pred + ",rangeCount=" + rangeCount + + ",sampleSize=" + (sample == null ? "N/A" : sample.length) + + "}"; + + } + + public void sample(final BOpContextBase context,final int limit) { + + final IRelation r = context.getRelation(pred); + + final IAccessPath ap = context.getAccessPath(r, pred); + + if (rangeCount == null) { + + rangeCount = ap.rangeCount(false/* exact */); + + } + + if (sample == null) { // @todo new sample each time? + + final SampleIndex sampleOp = new SampleIndex(new BOp[] {}, // + NV.asMap(// + new NV(SampleIndex.Annotations.PREDICATE, pred),// + new NV(SampleIndex.Annotations.LIMIT, limit))); + + sample = sampleOp.eval(context); + + this.limit = limit; + + } + + } + + } + + /** + * An edge of the join graph is an annotated join operator. The edges of the + * join graph are undirected. Edges exist when the vertices share at least + * one variable. + */ + public static class Edge implements Serializable { + + /** + * */ - double w; + private static final long serialVersionUID = 1L; - public Edge(final Vertex v1, final Vertex v2) { - if (v1 == null) + /** + * The vertices connected by that edge. + */ + final Vertex v1, v2; + + /** + * The set of shared variables. + */ + final Set<IVariable<?>> shared; + + class EdgeSample { + + /** + * The fast range count (aka cardinality) for the source vertex of + * the edge (whichever vertex has the lower cardinality). + */ + final long inputRangeCount; + /** + * The limit used to sample the edge (this is the limit on the #of + * solutions generated by the cutoff join used when this sample was + * taken). + */ + final int limit; + /** + * The #of binding sets out of the source sample vertex sample which + * were consumed. + */ + final int inputCount; + /** + * The #of binding sets generated before the join was cutoff. + */ + final int outputCount; + /** + * The ratio of the #of input samples consumed to the #of output + * samples generated. + */ + final double f; + /** + * The estimated cardinality of the join. + */ + final long estimatedCardinality; + + /** + * @param limit + * The limit used to sample the edge (this is the limit + * on the #of solutions generated by the cutoff join used + * when this sample was taken). + * @param inputRangeCount + * The fast range count (aka cardinality) for the source + * vertex of the edge (whichever vertex has the lower + * cardinality). + * @param inputCount + * The #of binding sets out of the source sample vertex + * sample which were consumed. + * @param outputCount + * The #of binding sets generated before the join was + * cutoff. + * + * @todo If the outputCount is zero then this is a good indicator + * that there is an error in the query such that the join will + * not select anything. This is not 100%, merely indicative. + */ + EdgeSample(final long inputRangeCount, final int limit, final int inputCount, + final int outputCount) { + + this.inputRangeCount = inputRangeCount; + + this.limit = limit; + + this.inputCount = inputCount; + + this.outputCount = outputCount; + + f = outputCount == 0 ? 0 : (outputCount / (double) inputCount); + + estimatedCardinality = (long) (inputRangeCount * f); + + } + + public String toString() { + return "EdgeSample" + "{inputRangeCount=" + inputRangeCount + + ", limit=" + limit + ", inputCount=" + inputCount + + ", outputCount=" + outputCount + ", f=" + f + + ", estimatedCardinality=" + estimatedCardinality + + "}"; + } + + }; + + /** + * The last sample for this edge and <code>null</code> if the edge has + * not been sampled. + */ + EdgeSample sample = null; + + public Edge(final Vertex v1, final Vertex v2, final Set<IVariable<?>> shared) { + if (v1 == null) + throw new IllegalArgumentException(); + if (v2 == null) + throw new IllegalArgumentException(); + if (shared==null) + throw new IllegalArgumentException(); + if (shared.isEmpty()) + throw new IllegalArgumentException(); + this.v1 = v1; + this.v2 = v2; + this.shared = shared; + } + + public String toString() { + + return "\nEdge{v1=" + v1.pred.getId() + ",v2=" + v2.pred.getId() + + ",shared=" + shared.toString() + ", sample=" + sample + "}"; + + } + + /** + * Estimate the cardinality of the edge. + * + * @param context + * @throws Exception + */ + public void estimateCardinality(final QueryEngine queryEngine, + final int limit) throws Exception { + + if (limit <= 0) + throw new IllegalArgumentException(); + + /* + * Figure out which vertex has the smaller cardinality. The sample + * of that vertex is used since it is more representative than the + * sample of the other vertex. + */ + // vertex v, vprime + final Vertex v, vp; + if (v1.rangeCount < v2.rangeCount) { + v = v1; + vp = v2; + } else { + v = v2; + vp = v1; + } + + /* + * @todo This is difficult to setup because we do not have a concept + * (or class) corresponding to a fly weight relation and we do not + * have a general purpose relation, just arrays or sequences of + * IBindingSets. Also, all relations are persistent. Temporary + * relations are on a temporary store and are locatable by their + * namespace rather than being Objects. + * + * The algorithm presupposes fly weight / temporary relations this + * both to wrap the sample and to store the computed intermediate + * results. + * + * Note: The PipelineJoin does not have a means to halt after a + * limit is satisfied. In order to achieve this, we have to wrap it + * with a SliceOp. + * + * Together, this means that we are dealing with IBindingSet[]s for + * both the input and the output of the cutoff evaluation of the + * edge rather than rows of the materialized relation. + * + * @todo On subsequent iterations we would probably re-sample [v] + * and we would run against the materialized intermediate result for + * [v']. + */ + + /* + * Convert the source sample into an IBindingSet[], injecting a + * rowid column. + */ + final IVariable<Integer> ROWID = Var.var("__rowid"); + final IBindingSet[] sample = new IBindingSet[v.sample.length]; + { + for (int i = 0; i < sample.length; i++) { + final IBindingSet bset = new HashBindingSet(); + BOpContext.copyValues((IElement) v.sample[i], v.pred, bset); + bset.set(ROWID, new Constant<Integer>(Integer.valueOf(i))); + sample[i] = bset; + } + } + + /* + * @todo Any constraints on the edge (other than those implied by + * shared variables) need to be annotated on the join. Constraints + * (other than range constraints which are directly coded by the + * predicate) will not reduce the effort to compute the join, but + * they can reduce the cardinality of the join and that is what we + * are trying to estimate here. + */ + final PipelineJoin joinOp = new PipelineJoin(new BOp[] {}, // + new NV(BOp.Annotations.BOP_ID, 1),// + new NV(PipelineJoin.Annotations.PREDICATE,vp.pred.setBOpId(3)) + ); + + final SliceOp sliceOp = new SliceOp(new BOp[] { joinOp },// + NV.asMap(// + new NV(BOp.Annotations.BOP_ID, 2), // + new NV(SliceOp.Annotations.LIMIT, (long)limit), // + new NV( + BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER))); + + // run the cutoff sampling of the edge. + final UUID queryId = UUID.randomUUID(); + final RunningQuery runningQuery = queryEngine.eval(queryId, + sliceOp, new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, joinOp.getId()/* startId */, + -1 /* partitionId */, + new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { sample }))); + + // #of source samples consumed. + int inputCount = 0; + // #of output samples generated. + int outputCount = 0; + try { + try { + IBindingSet bset = null; + // Figure out the #of source samples consumed. + final Iterator<IBindingSet> itr = new Dechunkerator<IBindingSet>( + runningQuery.iterator()); + while (itr.hasNext()) { + bset = itr.next(); + outputCount++; + } + // #of input rows consumed. Note: +1 since origin ZERO. + inputCount = bset == null ? 0 : ((Integer) bset.get(ROWID) + .get()) + 1; + } finally { + // verify no problems. FIXME Restore test of the query. +// runningQuery.get(); + } + } finally { + runningQuery.cancel(true/* mayInterruptIfRunning */); + } + + this.sample = new EdgeSample(v.rangeCount, limit, inputCount, + outputCount); + + if (log.isInfoEnabled()) + log.info("edge=" + this + sample); + + } + + } + + /** + * A join graph (data structure and methods only). + */ + public static class JGraph { + + /** + * Vertices of the join graph. + */ + private final Vertex[] V; + + /** + * Edges of the join graph. + */ + private final Edge[] E; + + public List<Vertex> getVertices() { + return Collections.unmodifiableList(Arrays.asList(V)); + } + + public List<Edge> getEdges() { + return Collections.unmodifiableList(Arrays.asList(E)); + } + + public String toString() { + return super.toString() + "{V=" + Arrays.toString(V) + ",E=" + + Arrays.toString(E) + "}"; + } + + public JGraph(final IPredicate[] v) { + + if (v == null) throw new IllegalArgumentException(); - if (v2 == null) - throw new IllegalArgumentException(); - this.v1 = v1; - this.v2 = v2; - } - } - /** - * - * @param joinNexus - * @param v - * @param sampleSize - * The default sample size to use when sampling a vertex of the - * join graph (100). - * - * @todo We can derive the vertices from the join operators or the join - * operators from the vertices. However, if a specific kind of join - * operator is required then the question is whether we have better - * information to make that choice when the join graph is evaluated or - * before it is constructed. - */ - public JoinGraph(final IPredicate<?>[] v, final int sampleSize) { + if (v.length < 2) + throw new IllegalArgumentException(); - super(v/* args */, NV.asMap(new NV[] {// - new NV(Annotations.SAMPLE_SIZE, Integer.valueOf(sampleSize))// - })); + V = new Vertex[v.length]; - if (v == null) - throw new IllegalArgumentException(); + for (int i = 0; i < v.length; i++) { - if (sampleSize <= 0) - throw new IllegalArgumentException(); + V[i] = new Vertex(v[i]); - switch (getEvaluationContext()) { - case CONTROLLER: - break; - default: - throw new UnsupportedOperationException( - Annotations.EVALUATION_CONTEXT + "=" - + getEvaluationContext()); - } + } - V = new Vertex[v.length]; + /* + * Identify the edges by looking for shared variables among the + * predicates. + */ + { - for (int i = 0; i < v.length; i++) { + final List<Edge> tmp = new LinkedList<Edge>(); - V[i] = new Vertex(v[i]); - - } + for (int i = 0; i < v.length; i++) { - /* - * Identify the edges by looking for shared variables among the - * predicates. - */ - { + final IPredicate<?> p1 = v[i]; - final List<Edge> tmp = new LinkedList<Edge>(); + for (int j = i + 1; j < v.length; j++) { - for (int i = 0; i < v.length; i++) { + final IPredicate<?> p2 = v[j]; - final IPredicate<?> p1 = v[i]; + final Set<IVariable<?>> shared = Rule.getSharedVars(p1, + p2); - for (int j = i + 1; j < v.length; j++) { + if (shared != null && !shared.isEmpty()) { - final IPredicate<?> p2 = v[j]; + tmp.add(new Edge(V[i], V[j], shared)); - final Set<IVariable<?>> shared = Rule.getSharedVars(p1, p2); + } - if (shared != null) { + } - tmp.add(new Edge(V[i], V[j])); + } - } + E = tmp.toArray(new Edge[0]); - } + } - } - - E = tmp.toArray(new Edge[0]); - - } + } - } + /** + * Obtain a sample and estimated cardinality (fast range count) for each vertex. + * + * @param context + * @param limit + * The sample size. + */ + public void sampleVertices(final BOpContextBase context, final int limit) { - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + for (Vertex v : V) { - return new FutureTask<Void>(new JoinGraphTask(context)); + v.sample(context, limit); + + } + + } - } + /** + * Estimate the cardinality of each edge. + * + * @param context + * + * @throws Exception + */ + public void estimateEdgeWeights(final QueryEngine queryEngine, final int limit) throws Exception { + + for(Edge e : E) { + + if (e.v1.sample == null || e.v2.sample == null) { + + /* + * We can only estimate the cardinality of edges connecting + * vertices for which samples were obtained. + */ + continue; + + } + + e.estimateCardinality(queryEngine, limit); + + } + + } + + } // class JGraph /** * Evaluation of a {@link JoinGraph}. @@ -254,6 +666,8 @@ private final BOpContext<IBindingSet> context; + private final JGraph g; + JoinGraphTask(final BOpContext<IBindingSet> context) { if (context == null) @@ -261,6 +675,15 @@ this.context = context; + final IPredicate[] v = getVertices(); + + final int sampleSize = getSampleSize(); + + if (sampleSize <= 0) + throw new IllegalArgumentException(); + + g = new JGraph(v); + } public Void call() throws Exception { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestJoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestJoinGraph.java 2010-11-08 21:30:29 UTC (rev 3917) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestJoinGraph.java 2010-11-08 21:31:17 UTC (rev 3918) @@ -27,8 +27,6 @@ package com.bigdata.bop.controller; -import com.bigdata.bop.controller.JoinGraph; - import junit.framework.TestCase2; /** @@ -52,8 +50,142 @@ super(name); } +// @Override +// public Properties getProperties() { +// +// final Properties p = new Properties(super.getProperties()); +// +// p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient +// .toString()); +// +// return p; +// +// } +// +// static private final String namespace = "ns"; +// +// Journal jnl; +// +// R rel; +// +// public void setUp() throws Exception { +// +// jnl = new Journal(getProperties()); +// +// } +// +// /** +// * Create and populate relation in the {@link #namespace}. +// * +// * @return The #of distinct entries. +// */ +// private int loadData(final int scale) { +// +// final String[] names = new String[] { "John", "Mary", "Saul", "Paul", +// "Leon", "Jane", "Mike", "Mark", "Jill", "Jake", "Alex", "Lucy" }; +// +// final Random rnd = new Random(); +// +// // #of distinct instances of each name. +// final int populationSize = Math.max(10, (int) Math.ceil(scale / 10.)); +// +// // #of trailing zeros for each name. +// final int nzeros = 1 + (int) Math.ceil(Math.log10(populationSize)); +// +//// System.out.println("scale=" + scale + ", populationSize=" +//// + populationSize + ", nzeros=" + nzeros); +// +// final NumberFormat fmt = NumberFormat.getIntegerInstance(); +// fmt.setMinimumIntegerDigits(nzeros); +// fmt.setMaximumIntegerDigits(nzeros); +// fmt.setGroupingUsed(false); +// +// // create the relation. +// final R rel = new R(jnl, namespace, ITx.UNISOLATED, new Properties()); +// rel.create(); +// +// // data to insert. +// final E[] a = new E[scale]; +// +// for (int i = 0; i < scale; i++) { +// +// final String n1 = names[rnd.nextInt(names.length)] +// + fmt.format(rnd.nextInt(populationSize)); +// +// final String n2 = names[rnd.nextInt(names.length)] +// + fmt.format(rnd.nextInt(populationSize)); +// +//// System.err.println("i=" + i + ", n1=" + n1 + ", n2=" + n2); +// +// a[i] = new E(n1, n2); +// +// } +// +// // sort before insert for efficiency. +// Arrays.sort(a,R.primaryKeyOrder.getComparator()); +// +// // insert data (the records are not pre-sorted). +// final long ninserts = rel.insert(new ChunkedArrayIterator<E>(a.length, a, null/* keyOrder */)); +// +// // Do commit since not scale-out. +// jnl.commit(); +// +// // should exist as of the last commit point. +// this.rel = (R) jnl.getResourceLocator().locate(namespace, +// ITx.READ_COMMITTED); +// +// assertNotNull(rel); +// +// return (int) ninserts; +// +// } +// +// public void tearDown() throws Exception { +// +// if (jnl != null) { +// jnl.destroy(); +// jnl = null; +// } +// +// // clear reference. +// rel = null; +// +// } + public void test_something() { - + +//// final int scale = 10000; +//// +//// final int nrecords = loadData(scale); +// +// final IVariable<?> x = Var.var("x"); +// +// final IVariable<?> y = Var.var("y"); +// +// final IPredicate<E> p1 = new Predicate<E>(new BOp[] { x, y }, +// new NV(IPredicate.Annotations.RELATION_NAME, +// new String[] { namespace }),// +// new NV(IPredicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED)// +// ); +// +// final IPredicate<E> p2 = new Predicate<E>(new BOp[] { x, y }, +// new NV(IPredicate.Annotations.RELATION_NAME, +// new String[] { namespace }),// +// new NV(IPredicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED)// +// ); +// +// final IPredicate<E> p3 = new Predicate<E>(new BOp[] { x, y }, +// new NV(IPredicate.Annotations.RELATION_NAME, +// new String[] { namespace }),// +// new NV(IPredicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED)// +// ); +// +// new JoinGraph(// +// new NV(BOp.Annotations.BOP_ID, 1),// +// new NV(JoinGraph.Annotations.VERTICES,new IPredicate[]{}),// +// new NV(JoinGraph.Annotations.SAMPLE_SIZE, 100)// +// ); + fail("write tests"); } Added: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/AdaptiveQueryOptimization.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/AdaptiveQueryOptimization.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/AdaptiveQueryOptimization.java 2010-11-08 21:31:17 UTC (rev 3918) @@ -0,0 +1,335 @@ +package com.bigdata.rdf.sail.bench; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContextBase; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IPredicate; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.Var; +import com.bigdata.bop.controller.JoinGraph.JGraph; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.fed.QueryEngineFactory; +import com.bigdata.journal.Journal; +import com.bigdata.rdf.model.BigdataURI; +import com.bigdata.rdf.model.BigdataValue; +import com.bigdata.rdf.model.BigdataValueFactory; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.spo.SPOPredicate; +import com.bigdata.rdf.store.AbstractTripleStore; + +/** + * Hard codes LUBM UQ. + * + * <pre> + * [query2] + * PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> + * PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> + * SELECT ?x ?y ?z + * WHERE{ + * ?x a ub:GraduateStudent . + * ?y a ub:University . + * ?z a ub:Department . + * ?x ub:memberOf ?z . + * ?z ub:subOrganizationOf ?y . + * ?x ub:undergraduateDegreeFrom ?y + * } + * </pre> + * + * Re-ordered joins to cluster by shared variables. This makes a nicer graph if + * you draw it. + * + * <pre> + * v2 ?z a ub:Department . + * v3 ?x ub:memberOf ?z . + * v4 ?z ub:subOrganizationOf ?y . + * v1 ?y a ub:University . + * v5 ?x ub:undergraduateDegreeFrom ?y + * v0 ?x a ub:GraduateStudent . + * </pre> + * + * <pre> + * http://www.w3.org/1999/02/22-rdf-syntax-ns#type (TermId(8U)) + * + * http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#UndergraduateStudent (TermId(324U)) + * </pre> + */ +public class AdaptiveQueryOptimization { + + public static void main(String[] args) throws Exception { + + final String namespace = "LUBM_U50"; + final String propertyFile = "/root/workspace/bigdata-quads-query-branch/bigdata-perf/lubm/ant-build/bin/WORMStore.properties"; + final String journalFile = "/data/lubm/U50/bigdata-lubm.WORM.jnl"; + + final Properties properties = new Properties(); + { + // Read the properties from the file. + final InputStream is = new BufferedInputStream(new FileInputStream( + propertyFile)); + try { + properties.load(is); + } finally { + is.close(); + } + if (System.getProperty(BigdataSail.Options.FILE) != null) { + // Override/set from the environment. + properties.setProperty(BigdataSail.Options.FILE, System + .getProperty(BigdataSail.Options.FILE)); + } + if (properties.getProperty(BigdataSail.Options.FILE) == null) { + properties.setProperty(BigdataSail.Options.FILE, journalFile); + } + } + + final Journal jnl = new Journal(properties); + try { + + final AbstractTripleStore database = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, + jnl.getLastCommitTime()); + + if (database == null) + throw new RuntimeException("Not found: " + namespace); + + /* + * Resolve terms against the lexicon. + */ + final BigdataValueFactory f = database.getLexiconRelation() + .getValueFactory(); + + final BigdataURI rdfType = f + .createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"); + + final BigdataURI graduateStudent = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#GraduateStudent"); + + final BigdataURI university = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#University"); + + final BigdataURI department = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#Department"); + + final BigdataURI memberOf = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#memberOf"); + + final BigdataURI subOrganizationOf = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#subOrganizationOf"); + + final BigdataURI undergraduateDegreeFrom = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#undergraduateDegreeFrom"); + + final BigdataValue[] terms = new BigdataValue[] { rdfType, + graduateStudent, university, department, memberOf, + subOrganizationOf, undergraduateDegreeFrom }; + + // resolve terms. + database.getLexiconRelation() + .addTerms(terms, terms.length, true/* readOnly */); + + { + for (BigdataValue tmp : terms) { + System.out.println(tmp + " : " + tmp.getIV()); + if (tmp.getIV() == null) + throw new RuntimeException("Not defined: " + tmp); + } + } + + final IVariable<?> x = Var.var("x"); + final IVariable<?> y = Var.var("y"); + final IVariable<?> z = Var.var("z"); + + // The name space for the SPO relation. + final String[] relation = new String[] {namespace + ".spo"}; + + final long timestamp = jnl.getLastCommitTime(); + + int nextId = 0; + + // ?x a ub:GraduateStudent . + final IPredicate p0 = new SPOPredicate(new BOp[] { x, + new Constant(rdfType.getIV()), new Constant(graduateStudent.getIV()) },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // ?y a ub:University . + final IPredicate p1 = new SPOPredicate(new BOp[] { y, + new Constant(rdfType.getIV()), new Constant(university.getIV()) },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // ?z a ub:Department . + final IPredicate p2 = new SPOPredicate(new BOp[] { z, + new Constant(rdfType.getIV()), new Constant(department.getIV()) },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // ?x ub:memberOf ?z . + final IPredicate p3 = new SPOPredicate(new BOp[] { x, + new Constant(memberOf.getIV()), z },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // ?z ub:subOrganizationOf ?y . + final IPredicate p4 = new SPOPredicate(new BOp[] { z, + new Constant(subOrganizationOf.getIV()), y },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // ?x ub:undergraduateDegreeFrom ?y + final IPredicate p5 = new SPOPredicate(new BOp[] { x, + new Constant(undergraduateDegreeFrom.getIV()), y },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // the vertices of the join graph (the predicates). + final IPredicate[] preds = new IPredicate[] { p0, p1, p2, p3, p4, + p5 }; + +// final JoinGraph op = new JoinGraph(// +// new NV(JoinGraph.Annotations.VERTICES, preds),// +// new NV(JoinGraph.Annotations.SAMPLE_SIZE, 100) // +// ); + + final JGraph g = new JGraph(preds); + + final int limit = 100; + + final QueryEngine queryEngine = QueryEngineFactory + .getQueryController(jnl/* indexManager */); + + final BOpContextBase context = new BOpContextBase(queryEngine); + + System.err.println("joinGraph=" + g.toString()); + + /* + * Sample the vertices. + * + * @todo Sampling for scale-out not yet finished. + * + * @todo Re-sampling might always produce the same sample depending + * on the sample operator impl (it should be random, but it is not). + */ + g.sampleVertices(context, limit); + + System.err.println("joinGraph=" + g.toString()); + + /* + * Estimate the cardinality and weights for each edge. + * + * @todo It would be very interesting to see the variety and/or + * distribution of the values bound when the edge is sampled. This + * can be easily done using a hash map with a counter. That could + * tell us a lot about the cardinality of the next join path + * (sampling the join path also tells us a lot, but it does not + * explain it as much as seeing the histogram of the bound values). + * I believe that there are some interesting online algorithms for + * computing the N most frequent observations and the like which + * could be used here. + */ + g.estimateEdgeWeights(queryEngine, limit); + + System.err.println("joinGraph=" + g.toString()); + + /* + * @todo choose starting vertex (most selective). see if there are + * any paths which are fully determined based on static optimization + * (doubtful). + */ + + /* + * @todo iteratively chain sample to choose best path, then execute + * that path. this is where most of the complex bits are. + * constraints must be applied to appropriate joins, variables must + * be filtered when no longer required, edges which are must be + * dropped from paths in which they have become redundant, etc., + * etc. + * + * @todo a simpler starting place is just to explore the cost of the + * query under different join orderings. e.g., Choose(N), where N is + * the #of predicates (full search). Or dynamic programming (also + * full search, just a little smarter). + */ +// g.run(); + + +// /* +// * Run the index scan without materializing anything from the +// * lexicon. +// */ +// if (true) { +// System.out.println("Running SPO only access path."); +// final long begin = System.currentTimeMillis(); +// final IAccessPath<ISPO> accessPath = database.getAccessPath( +// null/* s */, rdfType, undergraduateStudent); +// final IChunkedOrderedIterator<ISPO> itr = accessPath.iterator(); +// try { +// while (itr.hasNext()) { +// itr.next(); +// } +// } finally { +// itr.close(); +// } +// final long elapsed = System.currentTimeMillis() - begin; +// System.err.println("Materialize SPOs : elapsed=" + elapsed +// + "ms"); +// } + +// /* +// * Open the sail and run Q14. +// * +// * @todo It would be interesting to run this using a lexicon join. +// * Also, given the changes in the various defaults which were +// * recently made, it is worth while to again explore the parameter +// * space for this query. +// */ +// if (true) { +// final BigdataSail sail = new BigdataSail(database); +// sail.initialize(); +// final BigdataSailConnection conn = sail.getReadOnlyConnection(); +// try { +// System.out.println("Materializing statements."); +// final long begin = System.currentTimeMillis(); +// final CloseableIteration<? extends Statement, SailException> itr = conn +// .getStatements(null/* s */, rdfType, +// undergraduateStudent, true/* includeInferred */); +// try { +// while (itr.hasNext()) { +// itr.next(); +// } +// } finally { +// itr.close(); +// } +// final long elapsed = System.currentTimeMillis() - begin; +// System.err.println("Materialize statements: elapsed=" +// + elapsed + "ms"); +// } finally { +// conn.close(); +// } +// sail.shutDown(); +// } + + } finally { + jnl.close(); + } + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-08 21:30:35
|
Revision: 3917 http://bigdata.svn.sourceforge.net/bigdata/?rev=3917&view=rev Author: thompsonbry Date: 2010-11-08 21:30:29 +0000 (Mon, 08 Nov 2010) Log Message: ----------- Modified to stop propagating chunks (and not throw new errors) when the query is already halted (not all cases in which this occurs have been addressed so far). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java 2010-11-08 21:29:20 UTC (rev 3916) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java 2010-11-08 21:30:29 UTC (rev 3917) @@ -44,32 +44,32 @@ */ public interface IChunkHandler { - /** - * Take an {@link IBindingSet}[] chunk generated by some pass over an - * operator and make it available to the target operator. How this is done - * depends on whether the query is running against a standalone database or - * the scale-out database. - * <p> - * Note: The return value is used as part of the termination criteria for - * the query which depends on (a) the #of running operator tasks and (b) the - * #of {@link IChunkMessage}s generated (available) and consumed. The return - * value of this method increases the #of {@link IChunkMessage} available to - * the query. - * - * @param query - * The query. - * @param bopId - * The operator which wrote on the sink. - * @param sinkId - * The identifier of the target operator. - * @param chunk - * The intermediate results to be passed to that target operator. - * - * @return The #of {@link IChunkMessage} sent. This will always be ONE (1) - * for scale-up. For scale-out, there will be at least one - * {@link IChunkMessage} per index partition over which the - * intermediate results were mapped. - */ + /** + * Take an {@link IBindingSet}[] chunk generated by some pass over an + * operator and make it available to the target operator. How this is done + * depends on whether the query is running against a standalone database or + * the scale-out database. + * <p> + * Note: The return value is used as part of the termination criteria for + * the query which depends on (a) the #of running operator tasks and (b) the + * #of {@link IChunkMessage}s generated (available) and consumed. The return + * value of this method increases the #of {@link IChunkMessage}s available + * to the query. + * + * @param query + * The query. + * @param bopId + * The operator which wrote on the sink. + * @param sinkId + * The identifier of the target operator. + * @param chunk + * The intermediate results to be passed to that target operator. + * + * @return The #of {@link IChunkMessage} sent. This will always be ONE (1) + * for scale-up. For scale-out, there will be at least one + * {@link IChunkMessage} per index partition over which the + * intermediate results were mapped. + */ int handleChunk(RunningQuery query, int bopId, int sinkId, IBindingSet[] chunk); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-11-08 21:29:20 UTC (rev 3916) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-11-08 21:30:29 UTC (rev 3917) @@ -412,6 +412,12 @@ } + protected boolean isRunning() { + + return engineFuture.get() != null && !shutdown; + + } + protected void execute(final Runnable r) { localIndexManager.getExecutorService().execute(r); @@ -492,20 +498,24 @@ } } // QueryEngineTask - /** - * Add a chunk of intermediate results for consumption by some query. The - * chunk will be attached to the query and the query will be scheduled for - * execution. - * - * @param msg - * A chunk of intermediate results. - * - * @throws IllegalArgumentException - * if the chunk is <code>null</code>. - * @throws IllegalStateException - * if the chunk is not materialized. - */ - protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { + /** + * Add a chunk of intermediate results for consumption by some query. The + * chunk will be attached to the query and the query will be scheduled for + * execution. + * + * @param msg + * A chunk of intermediate results. + * + * @return <code>true</code> if the chunk was accepted. This will return + * <code>false</code> if the query is done (including cancelled) or + * the query engine is shutdown. + * + * @throws IllegalArgumentException + * if the chunk is <code>null</code>. + * @throws IllegalStateException + * if the chunk is not materialized. + */ + protected boolean acceptChunk(final IChunkMessage<IBindingSet> msg) { if (msg == null) throw new IllegalArgumentException(); @@ -515,17 +525,37 @@ final RunningQuery q = runningQueries.get(msg.getQueryId()); - if(q == null) + if(q == null) { + /* + * The query is not registered on this node. + * + * FIXME We should recognize the difference between a query which + * was never registered (and throw an error here) and a query which + * is done and has been removed from runningQueries. One way to do + * this is with an LRU of recently completed queries. + */ +// return false; throw new IllegalStateException(); + } - // add chunk to the query's input queue on this node. - q.acceptChunk(msg); + // add chunk to the query's input queue on this node. + if (!q.acceptChunk(msg)) { + // query is no longer running. + return false; + + } - assertRunning(); - - // add query to the engine's task queue. - priorityQueue.add(q); + if(!isRunning()) { + // query engine is no longer running. + return false; + + } + // add query to the engine's task queue. + priorityQueue.add(q); + + return true; + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-08 21:29:20 UTC (rev 3916) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-08 21:30:29 UTC (rev 3917) @@ -1076,15 +1076,17 @@ // } // } - /** - * Make a chunk of binding sets available for consumption by the query. - * <p> - * Note: this is invoked by {@link QueryEngine#acceptChunk(IChunkMessage)} - * - * @param msg - * The chunk. - */ - protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { + /** + * Make a chunk of binding sets available for consumption by the query. + * <p> + * Note: this is invoked by {@link QueryEngine#acceptChunk(IChunkMessage)} + * + * @param msg + * The chunk. + * + * @return <code>true</code> if the message was accepted. + */ + protected boolean acceptChunk(final IChunkMessage<IBindingSet> msg) { if (msg == null) throw new IllegalArgumentException(); @@ -1099,9 +1101,11 @@ try { - // verify still running. - if (future.isDone()) - throw new RuntimeException(ERR_QUERY_DONE, future.getCause()); + if (future.isDone()) { + // The query is no longer running. + return false; + //throw new RuntimeException(ERR_QUERY_DONE, future.getCause()); + } BlockingQueue<IChunkMessage<IBindingSet>> queue = operatorQueues .get(bundle); @@ -1116,6 +1120,8 @@ queue.add(msg); + return true; + } finally { lock.unlock(); @@ -2092,8 +2098,9 @@ try { - log.error(toString(), t); - + if (!InnerCause.isInnerCause(t, InterruptedException.class)) + log.error(toString(), t); + try { // signal error condition. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-11-08 21:29:20 UTC (rev 3916) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-11-08 21:30:29 UTC (rev 3917) @@ -288,9 +288,9 @@ * {@inheritDoc} */ @Override - protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { + protected boolean acceptChunk(final IChunkMessage<IBindingSet> msg) { - super.acceptChunk(msg); + return super.acceptChunk(msg); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-08 21:29:26
|
Revision: 3916 http://bigdata.svn.sourceforge.net/bigdata/?rev=3916&view=rev Author: thompsonbry Date: 2010-11-08 21:29:20 +0000 (Mon, 08 Nov 2010) Log Message: ----------- BOpBase : more information in exception BOpContext : made method public, static. it should be moved to BOpUtility. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-11-08 21:28:39 UTC (rev 3915) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-11-08 21:29:20 UTC (rev 3916) @@ -394,7 +394,8 @@ final Object tmp = annotations.get(name); if (tmp == null) - throw new IllegalStateException("Required property: " + name); + throw new IllegalStateException("Required property: " + name + + " : " + this); return tmp; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-11-08 21:28:39 UTC (rev 3915) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-11-08 21:29:20 UTC (rev 3916) @@ -261,9 +261,11 @@ * The predicate. * @param bindingSet * The binding set, which is modified as a side-effect. + * + * @todo move to {@link BOpUtility}? */ @SuppressWarnings("unchecked") - final private void copyValues(final IElement e, final IPredicate<?> pred, + static public void copyValues(final IElement e, final IPredicate<?> pred, final IBindingSet bindingSet) { for (int i = 0; i < pred.arity(); i++) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-08 21:28:45
|
Revision: 3915 http://bigdata.svn.sourceforge.net/bigdata/?rev=3915&view=rev Author: thompsonbry Date: 2010-11-08 21:28:39 +0000 (Mon, 08 Nov 2010) Log Message: ----------- javadoc Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/SampleIndex.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/SampleIndex.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/SampleIndex.java 2010-11-08 21:28:21 UTC (rev 3914) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/SampleIndex.java 2010-11-08 21:28:39 UTC (rev 3915) @@ -308,6 +308,12 @@ * Taking a clustered sample really requires knowing where the * leaf boundaries are in the index, e.g., using * {@link ILeafCursor}. + * + * @todo Rather than evenly spaced samples, we should be taking a random + * sample. This could be achieved using a random initial offset + * and random increment as long as the initial offset was in the + * range of a single increment and we compute the increment such + * that N+1 intervals exist. */ @Override protected void advance(final ITuple<E> tuple) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-08 21:28:27
|
Revision: 3914 http://bigdata.svn.sourceforge.net/bigdata/?rev=3914&view=rev Author: thompsonbry Date: 2010-11-08 21:28:21 +0000 (Mon, 08 Nov 2010) Log Message: ----------- Allowing joins with no operands (so you do not need to feed them with a StartOp). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-11-08 21:27:47 UTC (rev 3913) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-11-08 21:28:21 UTC (rev 3914) @@ -271,23 +271,23 @@ super(args, annotations); - if (arity() != 1) - throw new IllegalArgumentException(); +// if (arity() != 1) +// throw new IllegalArgumentException(); - if (left() == null) - throw new IllegalArgumentException(); +// if (left() == null) +// throw new IllegalArgumentException(); } - /** - * The sole operand, which is the previous join in the pipeline join path. - */ - public PipelineOp left() { +// /** +// * The sole operand, which is the previous join in the pipeline join path. +// */ +// public PipelineOp left() { +// +// return (PipelineOp) get(0); +// +// } - return (PipelineOp) get(0); - - } - /** * {@inheritDoc} * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-08 21:27:53
|
Revision: 3913 http://bigdata.svn.sourceforge.net/bigdata/?rev=3913&view=rev Author: thompsonbry Date: 2010-11-08 21:27:47 +0000 (Mon, 08 Nov 2010) Log Message: ----------- javadoc Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOAccessPath.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOAccessPath.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOAccessPath.java 2010-11-08 15:46:21 UTC (rev 3912) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOAccessPath.java 2010-11-08 21:27:47 UTC (rev 3913) @@ -132,6 +132,8 @@ * Strengthened return type. * <p> * {@inheritDoc} + * + * @todo Remove reliance on the {@link SPOPredicate}. It only adds the s(), p(), o(), and c() methods. */ @Override public SPOPredicate getPredicate() { Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java 2010-11-08 15:46:21 UTC (rev 3912) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java 2010-11-08 21:27:47 UTC (rev 3913) @@ -41,6 +41,10 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ + * + * + * @todo Remove reliance on the {@link SPOPredicate}. It only adds the s(), p(), + * o(), and c() methods. */ public class SPOPredicate extends Predicate<ISPO> { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-08 15:46:27
|
Revision: 3912 http://bigdata.svn.sourceforge.net/bigdata/?rev=3912&view=rev Author: thompsonbry Date: 2010-11-08 15:46:21 +0000 (Mon, 08 Nov 2010) Log Message: ----------- javadoc Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-11-08 15:12:25 UTC (rev 3911) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-11-08 15:46:21 UTC (rev 3912) @@ -358,7 +358,7 @@ /* * Note: getIndex() sets the listener on the BTree. That listener is - * reponsible for putting dirty indices onto the commit list. + * responsible for putting dirty indices onto the commit list. */ commitList = new ConcurrentHashMap<String,DirtyListener>(resource.length); @@ -1256,7 +1256,7 @@ * which the operation isolated by that transaction has * requested. Transaction commits are placed into a partial * order to avoid deadlocks where that ordered is determined - * by sorting the resources declared by the tx througout its + * by sorting the resources declared by the tx throughout its * life cycle and the obtaining locks on all of those * resources (in the distributed transaction service) before * the commit may start. This is very similar to how we Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java 2010-11-08 15:12:25 UTC (rev 3911) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java 2010-11-08 15:46:21 UTC (rev 3912) @@ -1373,7 +1373,7 @@ )); /* - * Create the BTree to aborb writes for the target index + * Create the BTree to absorb writes for the target index * partition. The metadata for this BTree was configured above * and is associated with a view that captures all data received * from the source index partition. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-08 15:12:35
|
Revision: 3911 http://bigdata.svn.sourceforge.net/bigdata/?rev=3911&view=rev Author: thompsonbry Date: 2010-11-08 15:12:25 +0000 (Mon, 08 Nov 2010) Log Message: ----------- Made various things private. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-08 13:10:14 UTC (rev 3910) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-08 15:12:25 UTC (rev 3911) @@ -24,12 +24,10 @@ package com.bigdata.rwstore; -import java.io.OutputStream; -import java.io.ObjectOutputStream; import java.io.FilterOutputStream; +import java.io.IOException; import java.io.InputStream; -import java.io.IOException; -import java.util.ArrayList; +import java.io.OutputStream; import org.apache.log4j.Logger; @@ -70,20 +68,21 @@ * To this end, the output stream has a fixed buffer size, and they are recycled * from a pool of output streams. * - * It is ** important ** that output streams are bound to the IStore they + * It is <em>important</em> that output streams are bound to the IStore they * are requested for. **/ public class PSOutputStream extends OutputStream { - protected static final Logger log = Logger.getLogger(FixedAllocator.class); + + private static final Logger log = Logger.getLogger(FixedAllocator.class); - protected static java.util.logging.Logger cat = java.util.logging.Logger.getLogger(PSOutputStream.class.getName()); +// protected static java.util.logging.Logger cat = java.util.logging.Logger.getLogger(PSOutputStream.class.getName()); - static PSOutputStream m_poolHead = null; - static PSOutputStream m_poolTail = null; - static Integer m_lock = new Integer(42); - static int m_streamCount = 0; + private static PSOutputStream m_poolHead = null; + private static PSOutputStream m_poolTail = null; + private static Integer m_lock = new Integer(42); + private static int m_streamCount = 0; - public static PSOutputStream getNew(IStore store, int maxAlloc, IAllocationContext context) { + public static PSOutputStream getNew(final IStore store, final int maxAlloc, final IAllocationContext context) { synchronized (m_lock) { PSOutputStream ret = m_poolHead; if (ret != null) { @@ -107,8 +106,8 @@ * maintains pool of streams - in a normal situation there will only * me a single stream continually re-used, but with some patterns * there could be many streams. For this reason it is worth checking - * that the pool is not maintained at an unnecessaily large value, so - * maximum of 10 streams are maintained - adding upto 80K to the + * that the pool is not maintained at an unnecessarily large value, so + * maximum of 10 streams are maintained - adding up to 80K to the * garbage collect copy. **/ static void returnStream(PSOutputStream stream) { @@ -130,17 +129,17 @@ } } - int[] m_blobHeader = null; - byte[] m_buf = null; - boolean m_isSaved = false; - long m_headAddr = 0; - long m_prevAddr = 0; - int m_count = 0; - int m_bytesWritten = 0; - int m_blobThreshold = 0; - IStore m_store; + private int[] m_blobHeader = null; + private byte[] m_buf = null; + private boolean m_isSaved = false; +// private long m_headAddr = 0; +// private long m_prevAddr = 0; + private int m_count = 0; + private int m_bytesWritten = 0; + private int m_blobThreshold = 0; + private IStore m_store; - IAllocationContext m_context = null; + private IAllocationContext m_context; private PSOutputStream m_next = null; @@ -177,8 +176,8 @@ public void reset() { m_isSaved = false; - m_headAddr = 0; - m_prevAddr = 0; +// m_headAddr = 0; +// m_prevAddr = 0; m_count = 0; m_bytesWritten = 0; m_isSaved = false; @@ -190,7 +189,7 @@ /**************************************************************** * write a single byte * - * this is the one place where the blobthreshold is handled + * this is the one place where the blob threshold is handled * and its done one byte at a time so should be easy enough, * * We no longer store continuation addresses, instead we allocate @@ -360,6 +359,7 @@ protected void finalize() throws Throwable { close(); + super.finalize(); } public OutputStream getFilterWrapper(final boolean saveBeforeClose) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-08 13:10:20
|
Revision: 3910 http://bigdata.svn.sourceforge.net/bigdata/?rev=3910&view=rev Author: martyncutcher Date: 2010-11-08 13:10:14 +0000 (Mon, 08 Nov 2010) Log Message: ----------- add formatting to showAllocators Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-08 11:09:09 UTC (rev 3909) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-08 13:10:14 UTC (rev 3910) @@ -2392,6 +2392,13 @@ str.append("\n-------------------------\n"); str.append("RWStore Allocation Summary\n"); str.append("-------------------------\n"); + str.append(padRight("Allocator", 10)); + str.append(padLeft("Slots used", 12)); + str.append(padLeft("available", 12)); + str.append(padLeft("Store used", 14)); + str.append(padLeft("available", 14)); + str.append(padLeft("Usage", 8)); + str.append("\n"); long treserved = 0; long treservedSlots = 0; long tfilled = 0; @@ -2404,17 +2411,47 @@ tfilled += filled; tfilledSlots += stats[i].m_filledSlots; - str.append("Allocation: " + stats[i].m_blockSize); - str.append(", slots: " + stats[i].m_filledSlots + "/" + stats[i].m_reservedSlots); - str.append(", storage: " + filled + "/" + reserved); - str.append(", usage: " + (reserved==0?0:(filled * 100 / reserved)) + "%"); + str.append(padRight("" + stats[i].m_blockSize, 10)); + str.append(padLeft("" + stats[i].m_filledSlots, 12) + padLeft("" + stats[i].m_reservedSlots, 12)); + str.append(padLeft("" + filled, 14) + padLeft("" + reserved, 14)); + str.append(padLeft("" + (reserved==0?0:(filled * 100 / reserved)) + "%", 8)); str.append("\n"); } - str.append("Total - file: " + convertAddr(m_fileSize) + // - ", slots: " + tfilledSlots + "/" + treservedSlots + // - ", storage: " + tfilled + "/" + treserved + // - "\n"); + str.append(padRight("Totals", 10)); + str.append(padLeft("" + tfilledSlots, 12)); + str.append(padLeft("" + treservedSlots, 12)); + str.append(padLeft("" + tfilled, 14)); + str.append(padLeft("" + treserved, 14)); + str.append(padLeft("" + (treserved==0?0:(tfilled * 100 / treserved)) + "%", 8)); + str.append("\nFile size: " + convertAddr(m_fileSize) + "bytes\n"); } + + private String padLeft(String str, int minlen) { + if (str.length() >= minlen) + return str; + + StringBuffer out = new StringBuffer(); + int pad = minlen - str.length(); + while (pad-- > 0) { + out.append(' '); + } + out.append(str); + + return out.toString(); + } + private String padRight(String str, int minlen) { + if (str.length() >= minlen) + return str; + + StringBuffer out = new StringBuffer(); + out.append(str); + int pad = minlen - str.length(); + while (pad-- > 0) { + out.append(' '); + } + + return out.toString(); + } // public ArrayList<Allocator> getStorageBlockAddresses() { // final ArrayList<Allocator> addrs = new ArrayList<Allocator>(m_allocs.size()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-08 11:09:15
|
Revision: 3909 http://bigdata.svn.sourceforge.net/bigdata/?rev=3909&view=rev Author: martyncutcher Date: 2010-11-08 11:09:09 +0000 (Mon, 08 Nov 2010) Log Message: ----------- Handle BlobHeaders larger than maximum fixed allocation - where the header must itself be a BLOB. This is possible with lower allocation settings, eg with 1K max fixed allocators, a 500K BLOB would require a 2K header to hold the 500 fixed allocation references. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-07 12:49:25 UTC (rev 3908) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-08 11:09:09 UTC (rev 3909) @@ -414,7 +414,7 @@ if (m_freeBits++ == 0 && false) { m_freeWaiting = false; m_freeList.add(this); - } else if (m_freeWaiting && m_freeBits == 3000) { + } else if (m_freeWaiting && m_freeBits == m_store.cDefaultFreeBitsThreshold) { m_freeWaiting = false; m_freeList.add(this); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-07 12:49:25 UTC (rev 3908) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-08 11:09:09 UTC (rev 3909) @@ -145,6 +145,8 @@ private PSOutputStream m_next = null; private int m_blobHdrIdx; + + private boolean m_writingHdr = false; private PSOutputStream next() { return m_next; @@ -162,8 +164,12 @@ m_context = context; m_blobThreshold = maxAlloc-4; // allow for checksum - if (m_buf == null || m_buf.length != m_blobThreshold) - m_buf = new byte[m_blobThreshold]; + + final int maxHdrSize = RWStore.BLOB_FIXED_ALLOCS * 4; + final int bufSize = m_blobThreshold > maxHdrSize ? m_blobThreshold : maxHdrSize; + + if (m_buf == null || m_buf.length != bufSize) + m_buf = new byte[bufSize]; reset(); } @@ -199,9 +205,9 @@ throw new IllegalStateException("Writing to saved PSOutputStream"); } - if (m_count == m_blobThreshold) { + if (m_count == m_blobThreshold && !m_writingHdr) { if (m_blobHeader == null) { - m_blobHeader = new int[RWStore.BLOB_FIXED_ALLOCS]; + m_blobHeader = new int[RWStore.BLOB_FIXED_ALLOCS]; // max 16K m_blobHdrIdx = 0; } @@ -305,27 +311,32 @@ int addr = (int) m_store.alloc(m_buf, m_count, m_context); if (m_blobHeader != null) { - m_blobHeader[m_blobHdrIdx++] = addr; - int precount = m_count; - m_count = 0; - try { - writeInt(m_blobHdrIdx); - for (int i = 0; i < m_blobHdrIdx; i++) { - writeInt(m_blobHeader[i]); - } - addr = (int) m_store.alloc(m_buf, m_count, m_context); - - if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count)/m_blobThreshold)) { - throw new IllegalStateException("PSOutputStream.save at : " + addr + ", bytes: "+ m_bytesWritten + ", blocks: " + m_blobHdrIdx + ", last alloc: " + precount); - } - - if (log.isDebugEnabled()) - log.debug("Writing BlobHdrIdx with " + m_blobHdrIdx + " allocations"); - - addr = m_store.registerBlob(addr); // returns handle - } catch (IOException e) { - e.printStackTrace(); - } + try { + m_writingHdr = true; // ensure that header CAN be a BLOB + m_blobHeader[m_blobHdrIdx++] = addr; + int precount = m_count; + m_count = 0; + try { + writeInt(m_blobHdrIdx); + for (int i = 0; i < m_blobHdrIdx; i++) { + writeInt(m_blobHeader[i]); + } + addr = (int) m_store.alloc(m_buf, m_count, m_context); + + if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count)/m_blobThreshold)) { + throw new IllegalStateException("PSOutputStream.save at : " + addr + ", bytes: "+ m_bytesWritten + ", blocks: " + m_blobHdrIdx + ", last alloc: " + precount); + } + + if (log.isDebugEnabled()) + log.debug("Writing BlobHdrIdx with " + m_blobHdrIdx + " allocations"); + + addr = m_store.registerBlob(addr); // returns handle + } catch (IOException e) { + e.printStackTrace(); + } + } finally { + m_writingHdr = false; + } } m_isSaved = true; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-07 12:49:25 UTC (rev 3908) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-08 11:09:09 UTC (rev 3909) @@ -245,6 +245,18 @@ String DEFAULT_META_BITS_SIZE = "9"; + /** + * Defines the number of bits that must be free in a FixedAllocator for + * it to be added to the free list. This is used to ensure a level + * of locality when making large numbers of allocations within a single + * commit. + * <p> + * The value should be >= 1 and <= 5000 + */ + String FREE_BITS_THRESHOLD = RWStore.class.getName() + ".freeBitsThreshold"; + + String DEFAULT_FREE_BITS_THRESHOLD = "300"; + } /* @@ -341,6 +353,12 @@ final int m_minFixedAlloc; /** + * Currently we do not support a Blob header to be a Blob, so the + * maximum possible Blob is ((maxFixed-4) * maxFixed) - 4. + */ + final int m_maxBlobAllocSize; + + /** * This lock is used to exclude readers when the extent of the backing file * is about to be changed. * <p> @@ -477,7 +495,16 @@ m_metaBitsSize = cDefaultMetaBitsSize; - m_metaBits = new int[m_metaBitsSize]; + cDefaultFreeBitsThreshold = Integer.valueOf(fileMetadata.getProperty( + Options.FREE_BITS_THRESHOLD, + Options.DEFAULT_FREE_BITS_THRESHOLD)); + + if (cDefaultFreeBitsThreshold < 1 || cDefaultFreeBitsThreshold > 5000) { + throw new IllegalArgumentException(Options.FREE_BITS_THRESHOLD + + " : Must be between 1 and 5000"); + } + + m_metaBits = new int[m_metaBitsSize]; m_metaTransientBits = new int[m_metaBitsSize]; @@ -556,6 +583,12 @@ m_minFixedAlloc = m_allocSizes[0]*64; } + final int maxBlockLessChk = m_maxFixedAlloc-4; + // set this at blob header references max 4096 fixed allocs + // meaning that header may itself be a blob if max fixed is + // less than 16K + m_maxBlobAllocSize = (BLOB_FIXED_ALLOCS * maxBlockLessChk); + assert m_maxFixedAlloc > 0; m_deferredFreeOut = PSOutputStream.getNew(this, m_maxFixedAlloc, null); @@ -668,7 +701,7 @@ + metaBitsAddr + ", m_commitCounter: " + commitCounter); } - + /** * Should be called where previously initFileSpec was used. * @@ -728,13 +761,14 @@ final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); m_lastDeferredReleaseTime = strBuf.readLong(); + cDefaultMetaBitsSize = strBuf.readInt(); final int allocBlocks = strBuf.readInt(); m_allocSizes = new int[allocBlocks]; for (int i = 0; i < allocBlocks; i++) { m_allocSizes[i] = strBuf.readInt(); } - m_metaBitsSize = metaBitsStore - allocBlocks - 3; // allow for deferred free + m_metaBitsSize = metaBitsStore - allocBlocks - 4; // allow for deferred free m_metaBits = new int[m_metaBitsSize]; if (log.isInfoEnabled()) { log.info("Raw MetaBitsAddr: " + rawmbaddr); @@ -854,10 +888,10 @@ * Meta-Allocations stored as {int address; int[8] bits}, so each block * holds 8*32=256 allocation slots of 1K totaling 256K. */ - for (int b = 0; b < m_metaBits.length; b += 9) { + for (int b = 0; b < m_metaBits.length; b += cDefaultMetaBitsSize) { final long blockStart = convertAddr(m_metaBits[b]); final int startBit = (b * 32) + 32; - final int endBit = startBit + (8*32); + final int endBit = startBit + ((cDefaultMetaBitsSize-1)*32); for (int i = startBit; i < endBit; i++) { if (tstBit(m_metaBits, i)) { final long addr = blockStart + ((i-startBit) * ALLOC_BLOCK_SIZE); @@ -1061,8 +1095,19 @@ + m_maxFixedAlloc); final byte[] hdrbuf = new byte[4 * (nblocks + 1) + 4]; // plus 4 bytes for checksum - final BlobAllocator ba = (BlobAllocator) getBlock((int) addr); - getData(ba.getBlobHdrAddress(getOffset((int) addr)), hdrbuf); // read in header + if (hdrbuf.length > m_maxFixedAlloc) { + if (log.isInfoEnabled()) { + log.info("LARGE BLOB - header is BLOB"); + } + } + + final Allocator na = getBlock((int) addr); + if (! (na instanceof BlobAllocator)) { + throw new IllegalStateException("Invalid Allocator index"); + } + final BlobAllocator ba = (BlobAllocator) na; + final int hdraddr = ba.getBlobHdrAddress(getOffset((int) addr)); + getData(hdraddr, hdrbuf); // read in header - could itself be a blob! final DataInputStream hdrstr = new DataInputStream(new ByteArrayInputStream(hdrbuf)); final int rhdrs = hdrstr.readInt(); if (rhdrs != nblocks) { @@ -1703,13 +1748,16 @@ // used to free the deferedFree allocations. This is used to determine // which commitRecord to access to process the nextbatch of deferred // frees. - final int len = 4 * (2 + 1 + m_allocSizes.length + m_metaBits.length); + // the cDefaultMetaBitsSize is also written since this can now be + // parameterized. + final int len = 4 * (2 + 1 + 1 + m_allocSizes.length + m_metaBits.length); final byte buf[] = new byte[len]; final FixedOutputStream str = new FixedOutputStream(buf); try { str.writeLong(m_lastDeferredReleaseTime); - + str.writeInt(cDefaultMetaBitsSize); + str.writeInt(m_allocSizes.length); for (int i = 0; i < m_allocSizes.length; i++) { str.writeInt(m_allocSizes[i]); @@ -1914,11 +1962,18 @@ /** * @see Options#META_BITS_SIZE */ - private final int cDefaultMetaBitsSize; + private int cDefaultMetaBitsSize; /** * @see Options#META_BITS_SIZE */ volatile private int m_metaBitsSize; + /** + * Package private since is uded by FixedAllocators + * + * @see Options#META_BITS_SIZE + */ + final int cDefaultFreeBitsThreshold; + private int m_metaBits[]; private int m_metaTransientBits[]; // volatile private int m_metaStartAddr; @@ -2075,7 +2130,7 @@ // final int bitsPerBlock = 9 * 32; final int intIndex = bit / 32; // divide 32; - final int addrIndex = (intIndex/9)*9; + final int addrIndex = (intIndex/cDefaultMetaBitsSize)*cDefaultMetaBitsSize; final long addr = convertAddr(m_metaBits[addrIndex]); final int intOffset = bit - ((addrIndex+1) * 32); @@ -2620,8 +2675,8 @@ long ret = physicalAddress((int) m_metaBitsAddr); ret <<= 16; - // include space for allocSizes and deferred free info - final int metaBitsSize = 2 + m_metaBits.length + m_allocSizes.length + 1; + // include space for allocSizes and deferred free info AND cDefaultMetaBitsSize + final int metaBitsSize = 2 + 1 + m_metaBits.length + m_allocSizes.length + 1; ret += metaBitsSize; if (log.isTraceEnabled()) @@ -3225,8 +3280,8 @@ } -// log.warn("Context: ncontexts=" + m_contexts.size() + ", context=" -// + context); + log.warn("Context: ncontexts=" + m_contexts.size() + ", context=" + + context); } @@ -3893,4 +3948,8 @@ } + public int getMaxBlobSize() { + return this.m_maxBlobAllocSize-4; + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-07 12:49:31
|
Revision: 3908 http://bigdata.svn.sourceforge.net/bigdata/?rev=3908&view=rev Author: thompsonbry Date: 2010-11-07 12:49:25 +0000 (Sun, 07 Nov 2010) Log Message: ----------- javadoc edit Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/Q14Test.java Modified: branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/Q14Test.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/Q14Test.java 2010-11-07 12:48:10 UTC (rev 3907) +++ branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/Q14Test.java 2010-11-07 12:49:25 UTC (rev 3908) @@ -116,6 +116,11 @@ /* * Open the sail and run Q14. + * + * @todo It would be interesting to run this using a lexicon join. + * Also, given the changes in the various defaults which were + * recently made, it is worth while to again explore the parameter + * space for this query. */ if(true){ final BigdataSail sail = new BigdataSail(database); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-07 12:48:16
|
Revision: 3907 http://bigdata.svn.sourceforge.net/bigdata/?rev=3907&view=rev Author: thompsonbry Date: 2010-11-07 12:48:10 +0000 (Sun, 07 Nov 2010) Log Message: ----------- Added a test utility which can be used to run LUBM Q14 either as a pure POS scan or as the POS scan with materialization of BigdataStatements using the BigdataStatementIterImpl. Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/Q14Test.java Added: branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/Q14Test.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/Q14Test.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/Q14Test.java 2010-11-07 12:48:10 UTC (rev 3907) @@ -0,0 +1,152 @@ +package com.bigdata.rdf.sail.bench; + +import info.aduna.iteration.CloseableIteration; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.sail.SailException; + +import com.bigdata.journal.Journal; +import com.bigdata.rdf.model.BigdataURI; +import com.bigdata.rdf.model.BigdataValue; +import com.bigdata.rdf.model.BigdataValueFactory; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.sail.BigdataSail.BigdataSailConnection; +import com.bigdata.rdf.spo.ISPO; +import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.relation.accesspath.IAccessPath; +import com.bigdata.striterator.IChunkedOrderedIterator; + +/** + * Hard codes LUBM U14, which is a statement index scan. + * + * <pre> + * PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> + * PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> + * SELECT ?x + * WHERE{ ?x a ub:UndergraduateStudent . } + * </pre> + * + * <pre> + * http://www.w3.org/1999/02/22-rdf-syntax-ns#type (TermId(8U)) + * + * http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#UndergraduateStudent (TermId(324U)) + * </pre> + */ +public class Q14Test { + + public static void main(String[] args) throws IOException, SailException { + + final String namespace = "LUBM_U1000"; + final String propertyFile = "/root/workspace/bigdata-journal-HA/bigdata-perf/lubm/ant-build/bin/RWStore.properties"; + final String journalFile = "/data/lubm/U1000/bigdata-lubm.RW.jnl"; + + final Properties properties = new Properties(); + { + // Read the properties from the file. + final InputStream is = new BufferedInputStream(new FileInputStream( + propertyFile)); + try { + properties.load(is); + } finally { + is.close(); + } + if (System.getProperty(BigdataSail.Options.FILE) != null) { + // Override/set from the environment. + properties.setProperty(BigdataSail.Options.FILE, System + .getProperty(BigdataSail.Options.FILE)); + } + if(properties.getProperty(BigdataSail.Options.FILE)==null) { + properties.setProperty(BigdataSail.Options.FILE, journalFile); + } + } + + final Journal jnl = new Journal(properties); + try { + + final AbstractTripleStore database = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, + jnl.getLastCommitTime()); + + if (database == null) + throw new RuntimeException("Not found: " + namespace); + + /* + * Resolve terms against the lexicon. + */ + final BigdataValueFactory f = database.getLexiconRelation() + .getValueFactory(); + + final BigdataURI rdfType = f + .createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"); + + final BigdataURI undergraduateStudent = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#UndergraduateStudent"); + + database.getLexiconRelation().addTerms( + new BigdataValue[] { rdfType, undergraduateStudent }, 2, + true/* readOnly */); + + /* + * Run the index scan without materializing anything from the lexicon. + */ + if(true){ + System.out.println("Running SPO only access path."); + final long begin = System.currentTimeMillis(); + final IAccessPath<ISPO> accessPath = database.getAccessPath( + null/* s */, rdfType, undergraduateStudent); + final IChunkedOrderedIterator<ISPO> itr = accessPath.iterator(); + try { + while (itr.hasNext()) { + itr.next(); + } + } finally { + itr.close(); + } + final long elapsed = System.currentTimeMillis() - begin; + System.err.println("Materialize SPOs : elapsed=" + elapsed + + "ms"); + } + + /* + * Open the sail and run Q14. + */ + if(true){ + final BigdataSail sail = new BigdataSail(database); + sail.initialize(); + final BigdataSailConnection conn = sail.getReadOnlyConnection(); + try { + System.out.println("Materializing statements."); + final long begin = System.currentTimeMillis(); + final CloseableIteration<? extends Statement, SailException> itr = conn + .getStatements(null/* s */, rdfType, + undergraduateStudent, + true/* includeInferred */); + try { + while (itr.hasNext()) { + itr.next(); + } + } finally { + itr.close(); + } + final long elapsed = System.currentTimeMillis() - begin; + System.err.println("Materialize statements: elapsed=" + + elapsed + "ms"); + } finally { + conn.close(); + } + sail.shutDown(); + } + } finally { + jnl.close(); + } + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-07 12:46:43
|
Revision: 3906 http://bigdata.svn.sourceforge.net/bigdata/?rev=3906&view=rev Author: thompsonbry Date: 2010-11-07 12:46:37 +0000 (Sun, 07 Nov 2010) Log Message: ----------- Reduced a variety of defaults in order to reduce the heap demand associated with join processing on larger data sets. IChunkedIterator.DEFAULT_CHUNK_SIZE = 100;//was 10000; BlockingBuffer.DEFAULT_PRODUCER_QUEUE_CAPACITY = 10; // was 5000 BlockingBuffer.DEFAULT_MINIMUM_CHUNK_SIZE = 100; // was 10000 AbstractResource.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = "10"; // was 1000 AbstractTipleStore.DEFAULT_TERM_CACHE_CAPACITY = "5000"; // was 50000 AbstractAccessPath#1030 modified to pass in the chunkCapacity. final BlockingBuffer<R[]> buffer = new BlockingBuffer<R[]>( chunkOfChunksCapacity,chunkCapacity,10,TimeUnit.MILLISECONDS); and AbstractResource.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD = "200";//""+20*Bytes.kilobyte32; Load of U50 is unchanged when compared with the baseline. [java] Load: 6890949 stmts added in 123.422 secs, rate= 55831, commitLatency=0ms However, closure is significantly slower (compare with 30707). Closure performance can not be related to the lexicon, so this must be either the queue capacity or the chunk capacity. [java] Closure: ClosureStats{mutationCount=1699274, elapsed=71662ms, rate=23712} Total time: 3 minutes 17 seconds There is very little impact on query: (compare with 10569 for ~ 4k pages from above). [java] ### Finished testing BIGDATA_SPARQL_ENDPOINT ### [java] BIGDATA_SPARQL_ENDPOINT #trials=10 #parallel=1 [java] query Time Result# [java] query1 46 4 [java] query3 25 6 [java] query4 63 34 [java] query5 59 719 [java] query7 24 61 [java] query8 189 6463 [java] query10 26 0 [java] query11 26 0 [java] query12 34 0 [java] query13 28 0 [java] query14 2952 393730 [java] query6 3218 430114 [java] query9 2958 8627 [java] query2 740 130 [java] Total 10388 However, when looking at U1000 there is a significant benefit for query: [java] Load: 138318723 stmts added in 7559.498 secs, rate= 18297, commitLatency=0ms [java] Closure: ClosureStats{mutationCount=34082911, elapsed=2909594ms, rate=11713} [java] ### Finished testing BIGDATA_SPARQL_ENDPOINT ### [java] BIGDATA_SPARQL_ENDPOINT #trials=10 #parallel=1 [java] query Time Result# [java] query1 69 4 [java] query3 33 6 [java] query4 67 34 [java] query5 66 719 [java] query7 34 61 [java] query8 231 6463 [java] query10 26 0 [java] query11 27 0 [java] query12 28 0 [java] query13 23 0 [java] query14 69907 7924765 (versus 124545) [java] query6 74343 8653646 (versus 130354) [java] query9 76161 172632 (versus 125518) [java] query2 368962 2528 (versus inconsistent due to backed out change to AbstractBTree.touch()) [java] Total 589977 This commit therefore improves query performance on larger LUBM data sets, but has a known negative impact on U50 closure and an unknown impact on LUBM U1000 closure. Closure warrants additional investigation. BSBM 100M performance with these changes and the following settings is as follows (this is the reduced query mix without query 3): com.bigdata.btree.writeRetentionQueue.capacity=4000 com.bigdata.btree.BTree.branchingFactor=128 # Reduce the branching factor for the lexicon since BSBM uses a lot of long # literals. Note that you have to edit this override to specify the namespace # into which the BSBM data will be loaded. com.bigdata.namespace.BSBM_284826.lex.TERM2ID.com.bigdata.btree.BTree.branchingFactor=32 com.bigdata.namespace.BSBM_284826.lex.ID2TERM.com.bigdata.btree.BTree.branchingFactor=32 # 4k pages. com.bigdata.namespace.BSBM_284826.spo.POS.com.bigdata.btree.BTree.branchingFactor=970 com.bigdata.namespace.BSBM_284826.spo.SPO.com.bigdata.btree.BTree.branchingFactor=512 com.bigdata.namespace.BSBM_284826.spo.OSP.com.bigdata.btree.BTree.branchingFactor=470 # Override the #of write cache buffers. com.bigdata.journal.AbstractJournal.writeCacheBufferCount=12 Cold JVM run immediately after data load: 98-99% disk utilization. [java] QMpH: 7515.78 query mixes per hour Hot JVM, cold disk: 98-99% disk utilization. [java] QMpH: 6459.97 query mixes per hour Hot JVM, hot disk: ~4% utilization. [java] QMpH: 40213.81 query mixes per hour Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/AbstractAccessPath.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/striterator/IChunkedIterator.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractLocalTripleStore.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java 2010-11-07 12:39:08 UTC (rev 3905) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java 2010-11-07 12:46:37 UTC (rev 3906) @@ -222,7 +222,7 @@ /** * Default for {@link #CHUNK_OF_CHUNKS_CAPACITY} */ - String DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = "1000"; + String DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = "10"; // was 1000 /** * <p> @@ -275,7 +275,7 @@ * * @todo figure out how good this value is. */ - String DEFAULT_FULLY_BUFFERED_READ_THRESHOLD = ""+20*Bytes.kilobyte32; + String DEFAULT_FULLY_BUFFERED_READ_THRESHOLD = "200";//""+20*Bytes.kilobyte32; /** * When <code>true</code> ({@value #DEFAULT_FORCE_SERIAL_EXECUTION}), Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/AbstractAccessPath.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/AbstractAccessPath.java 2010-11-07 12:39:08 UTC (rev 3905) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/AbstractAccessPath.java 2010-11-07 12:46:37 UTC (rev 3906) @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -1027,7 +1028,7 @@ * once the elements were materialized on the client. */ final BlockingBuffer<R[]> buffer = new BlockingBuffer<R[]>( - chunkOfChunksCapacity); + chunkOfChunksCapacity,chunkCapacity,10,TimeUnit.MILLISECONDS); final ExecutorService executorService = indexManager .getExecutorService(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java 2010-11-07 12:39:08 UTC (rev 3905) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java 2010-11-07 12:46:37 UTC (rev 3906) @@ -167,12 +167,14 @@ * The default capacity for the internal {@link Queue} on which elements (or * chunks of elements) are buffered. */ - public static transient final int DEFAULT_PRODUCER_QUEUE_CAPACITY = 5000; +// public static transient final int DEFAULT_PRODUCER_QUEUE_CAPACITY = 5000; + public static transient final int DEFAULT_PRODUCER_QUEUE_CAPACITY = 10; // was 5000 /** * The default minimum chunk size for the chunk combiner. */ - public static transient final int DEFAULT_MINIMUM_CHUNK_SIZE = 10000; +// public static transient final int DEFAULT_MINIMUM_CHUNK_SIZE = 10000; + public static transient final int DEFAULT_MINIMUM_CHUNK_SIZE = 100; // was 10000 /** * The default timeout in milliseconds during which chunks of elements may @@ -381,7 +383,12 @@ final int minimumChunkSize, final long chunkTimeout, final TimeUnit chunkTimeoutUnit, final boolean ordered) { - if (queue == null) + if (minimumChunkSize >= 1000 || queue.remainingCapacity() >= 1000) + log.fatal(new RuntimeException("queueCapacity=" + + queue.remainingCapacity() + ", minimumChunkSize=" + + minimumChunkSize)); + + if (queue == null) throw new IllegalArgumentException(); if (minimumChunkSize < 0) { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/striterator/IChunkedIterator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/striterator/IChunkedIterator.java 2010-11-07 12:39:08 UTC (rev 3905) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/striterator/IChunkedIterator.java 2010-11-07 12:46:37 UTC (rev 3906) @@ -59,7 +59,7 @@ /** * The default chunk size. */ - int DEFAULT_CHUNK_SIZE = 10000; + int DEFAULT_CHUNK_SIZE = 100;//was 10000; /** * The next element available from the iterator. Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractLocalTripleStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractLocalTripleStore.java 2010-11-07 12:39:08 UTC (rev 3905) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractLocalTripleStore.java 2010-11-07 12:46:37 UTC (rev 3906) @@ -90,7 +90,9 @@ final long nodesWritten = btreeCounters.getNodesWritten(); final long leavesWritten = btreeCounters.getLeavesWritten(); final long bytesWritten = btreeCounters.getBytesWritten(); - final long bytesPerRecord = bytesWritten/(nodesWritten+leavesWritten); + final long totalWritten = (nodesWritten + leavesWritten); + final long bytesPerRecord = totalWritten == 0 ? 0 : bytesWritten + / (nodesWritten + leavesWritten); sb.append((first ? "" : ", ") + fqn + "{nodes=" + nodesWritten + ",leaves=" + leavesWritten + ", bytes=" + bytesWritten Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-11-07 12:39:08 UTC (rev 3905) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-11-07 12:46:37 UTC (rev 3906) @@ -572,7 +572,7 @@ String TERM_CACHE_CAPACITY = AbstractTripleStore.class.getName() + ".termCache.capacity"; - String DEFAULT_TERM_CACHE_CAPACITY = "50000"; + String DEFAULT_TERM_CACHE_CAPACITY = "5000"; // was 50000 /** * The name of the class that will establish the pre-defined This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-07 12:39:15
|
Revision: 3905 http://bigdata.svn.sourceforge.net/bigdata/?rev=3905&view=rev Author: thompsonbry Date: 2010-11-07 12:39:08 +0000 (Sun, 07 Nov 2010) Log Message: ----------- Added branching factor overrides for BSBM 100M and LUBM U50 and U1000 which yield something close to an average of a 4k page. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata-perf/bsbm/RWStore.properties branches/JOURNAL_HA_BRANCH/bigdata-perf/lubm/RWStore.properties Modified: branches/JOURNAL_HA_BRANCH/bigdata-perf/bsbm/RWStore.properties =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-perf/bsbm/RWStore.properties 2010-11-05 19:32:10 UTC (rev 3904) +++ branches/JOURNAL_HA_BRANCH/bigdata-perf/bsbm/RWStore.properties 2010-11-07 12:39:08 UTC (rev 3905) @@ -19,6 +19,11 @@ com.bigdata.namespace.BSBM_284826.lex.TERM2ID.com.bigdata.btree.BTree.branchingFactor=32 com.bigdata.namespace.BSBM_284826.lex.ID2TERM.com.bigdata.btree.BTree.branchingFactor=32 +# 4k pages. +com.bigdata.namespace.BSBM_284826.spo.POS.com.bigdata.btree.BTree.branchingFactor=970 +com.bigdata.namespace.BSBM_284826.spo.SPO.com.bigdata.btree.BTree.branchingFactor=512 +com.bigdata.namespace.BSBM_284826.spo.OSP.com.bigdata.btree.BTree.branchingFactor=470 + # Override the #of write cache buffers. com.bigdata.journal.AbstractJournal.writeCacheBufferCount=12 Modified: branches/JOURNAL_HA_BRANCH/bigdata-perf/lubm/RWStore.properties =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-perf/lubm/RWStore.properties 2010-11-05 19:32:10 UTC (rev 3904) +++ branches/JOURNAL_HA_BRANCH/bigdata-perf/lubm/RWStore.properties 2010-11-07 12:39:08 UTC (rev 3905) @@ -13,8 +13,38 @@ com.bigdata.btree.writeRetentionQueue.capacity=8000 com.bigdata.btree.BTree.branchingFactor=128 +# RWStore options. +#com.bigdata.rwstore.RWStore.allocationSizes=1, 2, 3, 5, 8, 12, 16, 32, 48, 64, 128 +#com.bigdata.rwstore.RWStore.allocationSizes=1, 2, 3, 5, 8, 12, 16, 32, 48, 64, 128, 3520 +com.bigdata.rwstore.RWStore.allocationSizes=1, 2, 3, 5, 8, 12, 16, 32, 48, 64, 128, 192, 320, 512, 832, 1344, 2176, 3520 + +# +# Overrides for various indices. +# + +# U50 4k pages. +com.bigdata.namespace.LUBM_U50.lex.TERM2ID.com.bigdata.btree.BTree.branchingFactor=270 +com.bigdata.namespace.LUBM_U50.lex.ID2TERM.com.bigdata.btree.BTree.branchingFactor=120 +com.bigdata.namespace.LUBM_U50.spo.POS.com.bigdata.btree.BTree.branchingFactor=970 +com.bigdata.namespace.LUBM_U50.spo.SPO.com.bigdata.btree.BTree.branchingFactor=512 +com.bigdata.namespace.LUBM_U50.spo.OSP.com.bigdata.btree.BTree.branchingFactor=470 + +# U1000 4k +com.bigdata.namespace.LUBM_U1000.lex.TERM2ID.com.bigdata.btree.BTree.branchingFactor=270 +com.bigdata.namespace.LUBM_U1000.lex.ID2TERM.com.bigdata.btree.BTree.branchingFactor=120 +com.bigdata.namespace.LUBM_U1000.spo.POS.com.bigdata.btree.BTree.branchingFactor=970 +com.bigdata.namespace.LUBM_U1000.spo.SPO.com.bigdata.btree.BTree.branchingFactor=512 +com.bigdata.namespace.LUBM_U1000.spo.OSP.com.bigdata.btree.BTree.branchingFactor=470 + +# U50 8k pages. +#com.bigdata.namespace.LUBM_U50.lex.TERM2ID.com.bigdata.btree.BTree.branchingFactor=540 +#com.bigdata.namespace.LUBM_U50.lex.ID2TERM.com.bigdata.btree.BTree.branchingFactor=240 +#com.bigdata.namespace.LUBM_U50.spo.POS.com.bigdata.btree.BTree.branchingFactor=1940 +#com.bigdata.namespace.LUBM_U50.spo.SPO.com.bigdata.btree.BTree.branchingFactor=1024 +#com.bigdata.namespace.LUBM_U50.spo.OSP.com.bigdata.btree.BTree.branchingFactor=940 + # Override the #of write cache buffers. -com.bigdata.journal.AbstractJournal.writeCacheBufferCount=12 +#com.bigdata.journal.AbstractJournal.writeCacheBufferCount=12 # 200M initial extent. com.bigdata.journal.AbstractJournal.initialExtent=209715200 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-05 19:32:16
|
Revision: 3904 http://bigdata.svn.sourceforge.net/bigdata/?rev=3904&view=rev Author: thompsonbry Date: 2010-11-05 19:32:10 +0000 (Fri, 05 Nov 2010) Log Message: ----------- javadoc Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-05 17:14:05 UTC (rev 3903) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-05 19:32:10 UTC (rev 3904) @@ -202,42 +202,33 @@ */ public interface Options { - /** - * Option defines the Allocation block sizes for the RWStore. The values - * defined are multiplied by 64 to provide the actual allocations. The - * list of allocations should be ',' delimited and in increasing order. - * For example, - * - * <pre> - * "1,2,4,8,116,32,64" - * </pre> - * - * defines allocations from 64 to 4K in size. The default allocations - * are: - * - * <pre> - * "1,2,3,5,8,12,16,32,48,64,128,192,320,512,832,1344,2176,3520" - * </pre> - * - * providing blocks up to 220K aligned on 4K boundaries as soon as - * possible to optimize IO - particularly relevant for SSDs. - * - * @see #DEFAULT_ALLOCATION_SIZES - */ + /** + * Option defines the Allocation block sizes for the RWStore. The values + * defined are multiplied by 64 to provide the actual allocations. The + * list of allocations should be ',' delimited and in increasing order. + * This array is written into the store so changing the values does not + * break older stores. For example, + * + * <pre> + * "1,2,4,8,116,32,64" + * </pre> + * + * defines allocations from 64 to 4K in size. It is a good to define + * block sizes on 4K boundaries as soon as possible to optimize IO. This + * is particularly relevant for SSDs. A 1K boundary is expressed as + * <code>16</code> in the allocation sizes, so a 4K boundary is + * expressed as <code>64</code> and an 8k boundary as <code>128</code>. + * <p> + * The default allocations are {@value #DEFAULT_ALLOCATION_SIZES}. + * + * @see #DEFAULT_ALLOCATION_SIZES + */ String ALLOCATION_SIZES = RWStore.class.getName() + ".allocationSizes"; /** - * The sizes of the slots managed by a {@link FixedAllocator} are 64 times - * the values in this array. This array is written into the store so - * changing the values does not break older stores. This array is - * configurable using {@link com.bigdata.journal.Options#ALLOCATION_SIZES}. - * <p> - * Note: It is good to have 4k and 8k boundaries for better efficiency on - * SSD. A 1K boundary is expressed as <code>16</code> in the allocation - * sizes, so a 4K boundary is expressed as <code>64</code>. The default - * series of allocation sizes is based on the Fibonacci sequence, but is - * pegged to the closest 4K boundary for values larger than 4k. + * @see #ALLOCATION_SIZES */ + //String DEFAULT_ALLOCATION_SIZES = "1, 2, 3, 5, 8, 12, 16, 32, 48, 64, 128"; String DEFAULT_ALLOCATION_SIZES = "1, 2, 3, 5, 8, 12, 16, 32, 48, 64, 128, 192, 320, 512, 832, 1344, 2176, 3520"; // private static final int[] DEFAULT_ALLOC_SIZES = { 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181 }; // private static final int[] ALLOC_SIZES = { 1, 2, 4, 8, 16, 32, 64, 128 }; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-05 17:14:11
|
Revision: 3903 http://bigdata.svn.sourceforge.net/bigdata/?rev=3903&view=rev Author: thompsonbry Date: 2010-11-05 17:14:05 +0000 (Fri, 05 Nov 2010) Log Message: ----------- Added logger and removed System.out msg. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-05 15:51:22 UTC (rev 3902) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-05 17:14:05 UTC (rev 3903) @@ -7,6 +7,8 @@ import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; +import org.apache.log4j.Logger; + import com.bigdata.rwstore.RWStore.AllocationStats; import com.bigdata.util.ChecksumUtility; @@ -27,17 +29,21 @@ * */ public class BlobAllocator implements Allocator { - int[] m_hdrs = new int[254]; - RWStore m_store; + + private static final transient Logger log = Logger.getLogger(BlobAllocator.class); + + final private int[] m_hdrs = new int[254]; + final private RWStore m_store; private int m_diskAddr; private int m_index; private int m_sortAddr; - public BlobAllocator(RWStore store, int sortAddr) { + public BlobAllocator(final RWStore store, final int sortAddr) { m_store = store; m_sortAddr = sortAddr; - System.out.println("New BlobAllocator"); + if (log.isInfoEnabled()) + log.info("New BlobAllocator"); } public void addAddresses(ArrayList addrs) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-05 15:51:28
|
Revision: 3902 http://bigdata.svn.sourceforge.net/bigdata/?rev=3902&view=rev Author: thompsonbry Date: 2010-11-05 15:51:22 +0000 (Fri, 05 Nov 2010) Log Message: ----------- Removed import for deleted class. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java 2010-11-05 15:48:51 UTC (rev 3901) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java 2010-11-05 15:51:22 UTC (rev 3902) @@ -48,7 +48,6 @@ import com.bigdata.btree.IIndex; import com.bigdata.btree.IndexMetadata; import com.bigdata.rawstore.Bytes; -import com.bigdata.rwstore.JournalShadow; import com.bigdata.test.ExperimentDriver; import com.bigdata.test.ExperimentDriver.IComparisonTest; import com.bigdata.test.ExperimentDriver.Result; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-05 15:48:57
|
Revision: 3901 http://bigdata.svn.sourceforge.net/bigdata/?rev=3901&view=rev Author: thompsonbry Date: 2010-11-05 15:48:51 +0000 (Fri, 05 Nov 2010) Log Message: ----------- Partial integration with journal shadow allocations. See comments in the code starting at line 606 for problems which still require resolution. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-11-05 15:43:58 UTC (rev 3900) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-11-05 15:48:51 UTC (rev 3901) @@ -27,8 +27,6 @@ package com.bigdata.journal; -import java.io.DataInput; -import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -604,14 +602,29 @@ synchronized (name2Addr) { + /* + * FIXME In order to use shadow allocations, the unisolated + * index MUST be loaded using the IsolatedActionJournal. There + * are two places immediate below where it tests the cache and + * where it loads using the AbstractJournal, both of which are + * not appropriate as they fail to impose the + * IsolatedActionJournal with the consequence that the + * allocation contexts are not isolated. + */ + // recover from unisolated index cache. btree = name2Addr.getIndexCache(name); +// btree = null; // do not use the name2Addr cache. if (btree == null) { + final IJournal tmp; + tmp = resourceManager.getLiveJournal(); +// tmp = getJournal();// wrap with the IsolatedActionJournal. + // re-load btree from the store. btree = BTree.load(// - resourceManager.getLiveJournal(),// + tmp, // backing store. entry.checkpointAddr,// false// readOnly ); @@ -951,6 +964,8 @@ l.btree.writeCheckpoint(); + ((IsolatedActionJournal) getJournal()).detachContext(); + } if(INFO) { @@ -2135,7 +2150,7 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ - class IsolatedActionJournal implements IJournal { + class IsolatedActionJournal implements IJournal, IAllocationContext { private final AbstractJournal delegate; private final IResourceLocator resourceLocator; @@ -2472,46 +2487,50 @@ return delegate.toString(addr); } -// public long unpackAddr(DataInput in) throws IOException { -// return delegate.unpackAddr(in); -// } + public IRootBlockView getRootBlock(long commitTime) { + return delegate.getRootBlock(commitTime); + } + public Iterator<IRootBlockView> getRootBlocks(long startTime) { + return delegate.getRootBlocks(startTime); + } + + /* + * IAllocationContext + * + * The journal write() and delete() methods are overridden here to use + * the IsolatedActionJournal as the IAllocationContext. This causes the + * allocations to be scoped to the AbstractTask. + */ + public long write(ByteBuffer data) { - return delegate.write(data); + return delegate.write(data, this); } public long write(ByteBuffer data, long oldAddr) { - return write(data); + return delegate.write(data, oldAddr, this); } - public void delete(long addr) { - delegate.delete(addr); - } + public void delete(long addr) { + delegate.delete(addr, this); + } - public IRootBlockView getRootBlock(long commitTime) { - return delegate.getRootBlock(commitTime); - } +// public void delete(long addr, IAllocationContext context) { +// delegate.delete(addr, context); +// } +// +// public long write(ByteBuffer data, IAllocationContext context) { +// return delegate.write(data, context); +// } +// +// public long write(ByteBuffer data, long oldAddr, IAllocationContext context) { +// return delegate.write(data, oldAddr, context); +// } - public Iterator<IRootBlockView> getRootBlocks(long startTime) { - return delegate.getRootBlocks(startTime); + public void detachContext() { + delegate.detachContext(this); } - public void delete(long addr, IAllocationContext context) { - delegate.delete(addr, context); - } - - public long write(ByteBuffer data, IAllocationContext context) { - return delegate.write(data, context); - } - - public long write(ByteBuffer data, long oldAddr, IAllocationContext context) { - return delegate.write(data, oldAddr, context); - } - - public void detachContext(IAllocationContext context) { - delegate.detachContext(context); - } - } /** @@ -2789,14 +2808,14 @@ throw new UnsupportedOperationException(); } + public void delete(long addr) { + throw new UnsupportedOperationException(); + } + /* * Methods that delegate directly to the backing journal. */ -// public IKeyBuilder getKeyBuilder() { -// return delegate.getKeyBuilder(); -// } - public int getByteCount(long addr) { return delegate.getByteCount(addr); } @@ -2857,10 +2876,6 @@ return delegate.isStable(); } -// public void packAddr(DataOutput out, long addr) throws IOException { -// delegate.packAddr(out, addr); -// } - public ByteBuffer read(long addr) { return delegate.read(addr); } @@ -2877,15 +2892,6 @@ return delegate.toString(addr); } -// public long unpackAddr(DataInput in) throws IOException { -// return delegate.unpackAddr(in); -// } - - public void delete(long addr) { - // TODO Auto-generated method stub - - } - public IRootBlockView getRootBlock(long commitTime) { return delegate.getRootBlock(commitTime); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-05 15:44:05
|
Revision: 3900 http://bigdata.svn.sourceforge.net/bigdata/?rev=3900&view=rev Author: thompsonbry Date: 2010-11-05 15:43:58 +0000 (Fri, 05 Nov 2010) Log Message: ----------- Conditional invocation of the IAllocationContext specific methods which delegate to the RWStrategy. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Removed Paths: ------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/JournalShadow.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-11-04 23:20:34 UTC (rev 3899) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-11-05 15:43:58 UTC (rev 3900) @@ -2612,14 +2612,39 @@ } - public long write(ByteBuffer data, final long oldAddr, IAllocationContext context) { - return ((RWStrategy)_bufferStrategy).write(data, oldAddr, context); - } + public long write(final ByteBuffer data, final long oldAddr, + final IAllocationContext context) { - public long write(ByteBuffer data, IAllocationContext context) { - return ((RWStrategy)_bufferStrategy).write(data, context); - } + assertCanWrite(); + if (_bufferStrategy instanceof RWStrategy) { + + return ((RWStrategy) _bufferStrategy).write(data, oldAddr, context); + + } else { + + return _bufferStrategy.write(data, oldAddr); + + } + + } + + public long write(final ByteBuffer data, final IAllocationContext context) { + + assertCanWrite(); + + if (_bufferStrategy instanceof RWStrategy) { + + return ((RWStrategy) _bufferStrategy).write(data, context); + + } else { + + return _bufferStrategy.write(data); + + } + + } + // Note: NOP for WORM. Used by RW for eventual recycle protocol. public void delete(final long addr) { @@ -2629,17 +2654,31 @@ } - public void delete(final long addr, IAllocationContext context) { + public void delete(final long addr, final IAllocationContext context) { assertCanWrite(); - ((RWStrategy) _bufferStrategy).delete(addr, context); + if(_bufferStrategy instanceof RWStrategy) { + + ((RWStrategy) _bufferStrategy).delete(addr, context); + + } else { + + _bufferStrategy.delete(addr); + + } } public void detachContext(final IAllocationContext context) { - ((RWStrategy) _bufferStrategy).detachContext(context); + assertCanWrite(); + + if(_bufferStrategy instanceof RWStrategy) { + + ((RWStrategy) _bufferStrategy).detachContext(context); + + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java 2010-11-04 23:20:34 UTC (rev 3899) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java 2010-11-05 15:43:58 UTC (rev 3900) @@ -38,7 +38,6 @@ import com.bigdata.counters.CounterSet; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.relation.locator.IResourceLocator; -import com.bigdata.rwstore.IAllocationContext; import com.bigdata.sparse.SparseRowStore; public class JournalDelegate implements IJournal { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-04 23:20:34 UTC (rev 3899) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-05 15:43:58 UTC (rev 3900) @@ -40,7 +40,6 @@ import com.bigdata.rawstore.AbstractRawStore; import com.bigdata.rawstore.IAddressManager; import com.bigdata.rwstore.IAllocationContext; -import com.bigdata.rwstore.JournalShadow; import com.bigdata.rwstore.RWStore; import com.bigdata.rwstore.RWStore.StoreCounters; import com.bigdata.util.ChecksumError; @@ -183,8 +182,6 @@ * {@link RWStore}. Shadow allocators may be used to isolate allocation * changes (both allocating slots and releasing slots) across different * processes. - * - * @see JournalShadow */ @Override public long write(final ByteBuffer data, final IAllocationContext context) { Deleted: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/JournalShadow.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/JournalShadow.java 2010-11-04 23:20:34 UTC (rev 3899) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/JournalShadow.java 2010-11-05 15:43:58 UTC (rev 3900) @@ -1,126 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.rwstore; - -import java.nio.ByteBuffer; - -import com.bigdata.journal.AbstractJournal; -import com.bigdata.journal.IBufferStrategy; -import com.bigdata.journal.IJournal; -import com.bigdata.journal.JournalDelegate; -import com.bigdata.journal.RWStrategy; - -/** - * A {@link JournalShadow} wraps an Journal but provides itself as the - * allocation context to be passed through to any interested - * {@link IBufferStrategy}. This is the path by which {@link RWStore} allocators - * are provided with the context for the allocations and deletes made. - * - * @author Martyn Cutcher - */ -public class JournalShadow extends JournalDelegate implements IAllocationContext { - -// private final static AtomicLong s_idCounter = new AtomicLong(23); -// -// final private int m_id = (int) s_idCounter.incrementAndGet(); - - private JournalShadow(final AbstractJournal source) { - - super(source); - - } - - public long write(final ByteBuffer data) { - - return delegate.write(data, this); - - } - - public long write(final ByteBuffer data, final long oldAddr) { - - return delegate.write(data, oldAddr, this); - - } - - public void delete(long oldAddr) { - - delegate.delete(oldAddr, this); - - } - -// public int compareTo(Object o) { -// if (o instanceof JournalShadow) { -// JournalShadow js = (JournalShadow) o; -// return m_id - js.m_id; -// } else { -// return -1; -// } -// } - -// /** -// * TODO: should retrieve from localTransactionService or Journal -// * properties -// */ -// public long minimumReleaseTime() { -// return 0; -// } - - /** - * Release itself from the wrapped Journal, this unlocks the allocator for - * the RWStore - */ - public void detach() { - - delegate.detachContext(this); - - } - - /** - * This factory pattern creates a shadow for a RWStrategy-backed Journal to - * support protected allocations while allowing for deletion and - * re-allocation where possible. If the Journal is not backed by a - * RWStrategy, then the original Journal is returned. - * - * @param journal - * The journal to be shadowed - * - * @return The shadowed journal if necessary and otherwise the - * <i>journal</i>. - */ - public static IJournal newShadow(final AbstractJournal journal) { - - if (journal.getBufferStrategy() instanceof RWStrategy) { - - return new JournalShadow(journal); - - } else { - - return journal; - - } - - } - -} Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-04 23:20:34 UTC (rev 3899) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-05 15:43:58 UTC (rev 3900) @@ -3228,8 +3228,15 @@ ret = new ContextAllocation(this, m_freeFixed.length, null, context); - m_contexts.put(context, ret); + if (m_contexts.put(context, ret) != null) { + + throw new AssertionError(); + + } +// log.warn("Context: ncontexts=" + m_contexts.size() + ", context=" +// + context); + } return ret; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |