From: <tho...@us...> - 2011-05-18 13:04:01
|
Revision: 4515 http://bigdata.svn.sourceforge.net/bigdata/?rev=4515&view=rev Author: thompsonbry Date: 2011-05-18 13:03:50 +0000 (Wed, 18 May 2011) Log Message: ----------- Updated the JustificationIterator code a bit, reduced the maximum amount of data that it will buffer on the heap to keep down the heap pressure, removed the "timeout" logic, and modified it to run a FutureTask on the ExecutorService associated with the IIndexManager rather than using its own local singled-threaded executor service. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/IJustificationIterator.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/JustificationIterator.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/JustificationWriter.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/IJustificationIterator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/IJustificationIterator.java 2011-05-17 18:46:55 UTC (rev 4514) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/IJustificationIterator.java 2011-05-18 13:03:50 UTC (rev 4515) @@ -27,8 +27,6 @@ package com.bigdata.rdf.inf; -import org.apache.log4j.Logger; - import com.bigdata.striterator.IChunkedIterator; /** @@ -39,6 +37,4 @@ */ public interface IJustificationIterator extends IChunkedIterator<Justification> { - final Logger log = Logger.getLogger(IJustificationIterator.class); - } Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/JustificationIterator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/JustificationIterator.java 2011-05-17 18:46:55 UTC (rev 4514) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/JustificationIterator.java 2011-05-18 13:03:50 UTC (rev 4515) @@ -30,17 +30,17 @@ import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.log4j.Logger; + import com.bigdata.btree.IIndex; import com.bigdata.btree.IRangeQuery; import com.bigdata.btree.ITupleIterator; +import com.bigdata.journal.IIndexManager; import com.bigdata.rawstore.Bytes; import com.bigdata.relation.accesspath.IElementFilter; -import com.bigdata.util.concurrent.DaemonThreadFactory; /** * Iterator visits {@link Justification}s reading from the justification index. @@ -55,10 +55,13 @@ */ public class JustificationIterator implements IJustificationIterator { + private static final transient Logger log = Logger + .getLogger(JustificationIterator.class); + /** * The maximum #of statements that will be buffered by the iterator. */ - public static final transient int MAXIMUM_CAPACITY = 100 * Bytes.kilobyte32; + private static final transient int MAXIMUM_CAPACITY = 10 * Bytes.kilobyte32; // was 100k private boolean open = true; @@ -96,31 +99,37 @@ /** * The source iterator reading on the selected justification index. */ - private ITupleIterator src; + private ITupleIterator<?> src; +// /** +// * The executor service for the {@link Reader} (iff the {@link Reader} runs +// * asynchronously). +// */ +// private final ExecutorService readService; + /** - * The executor service for the {@link Reader} (iff the {@link Reader} runs - * asynchronously). + * The future for the {@link Reader} and <code>null</code> if a synchronous + * read was performed (fully buffered read in the caller's thread). */ - private final ExecutorService readService; - + private final FutureTask<Object> ft; + /** * Set to true iff an asynchronous {@link Reader} is used AND there is * nothing more to be read. */ - private AtomicBoolean readerDone = new AtomicBoolean(false); + private final AtomicBoolean readerDone = new AtomicBoolean(false); /** * The minimum desirable chunk size for {@link #nextChunk()}. */ - final int MIN_CHUNK_SIZE = 50000; + static private final int MIN_CHUNK_SIZE = 100; - /** - * If NO results show up within this timeout then {@link #nextChunk()} will - * throw a {@link RuntimeException} to abort the reader - the probably cause - * is a network outage. - */ - final long TIMEOUT = 3000; +// /** +// * If NO results show up within this timeout then {@link #nextChunk()} will +// * throw a {@link RuntimeException} to abort the reader - the probably cause +// * is a network outage. +// */ +// static private final long TIMEOUT = Long.MAX_VALUE; /** * Create an iterator reading from the justifications index. @@ -133,26 +142,29 @@ * value is used - this gives you control when you really, really * want to have something fully buffered, e.g., for an in-memory * self-join. - * @param async - * When true, asynchronous read-ahead will be used to refill the - * buffer as it becomes depleted. When false, read-ahead will be - * synchronous (this is useful when you want to read at most N - * statements from the index). */ - public JustificationIterator(IIndex ndx, int capacity, boolean async) { + public JustificationIterator(final IIndexManager indexManager, + final IIndex ndx, int capacity) { - assert ndx != null; - - assert capacity >= 0; + if (indexManager == null) + throw new IllegalArgumentException(); + if (ndx == null) + throw new IllegalArgumentException(); + + if (capacity < 0) + throw new IllegalArgumentException(); + /* - * The range count. - * - * Note: The range count is generally an upper bound rather than an - * exact value. + * When true, asynchronous read-ahead will be used to refill the buffer + * as it becomes depleted. When false, read-ahead will be synchronous + * (this is useful when you want to read at most N statements from the + * index). */ + boolean async = true; - final long rangeCount = ndx.rangeCount(null,null); + // The fast range count (upper bound) + final long rangeCount = ndx.rangeCount(); if (capacity == 0) { @@ -171,6 +183,7 @@ } else { + // Otherwise use the range count (upper bound). capacity = (int) rangeCount; } @@ -204,7 +217,7 @@ } - if( rangeCount < 1000) { + if (rangeCount < 100) { // Disable async reads if we are not reading much data. @@ -224,29 +237,25 @@ this.capacity = capacity; - this.src = ndx.rangeIterator(null, null,0/*capacity*/,IRangeQuery.KEYS,null/*filter*/); + this.src = ndx.rangeIterator(null/* fromKey */, null/* toKey */, + 0/* capacity */, IRangeQuery.KEYS, null/* filter */); this.buffer = new ArrayBlockingQueue<Justification>(capacity); if (async) { - /* - * FIXME provide an API to run this on an existing executorService - * just like we do for proxied asynchronous iterators. - */ - readService = Executors.newSingleThreadExecutor(new DaemonThreadFactory - (getClass().getName()+".readService")); + // wrap reader as Future + ft = new FutureTask<Object>(new Reader()); + + // submit for asynchronous read ahead + indexManager.getExecutorService().submit(ft); - readService.submit(new Reader()); - } else { - /* - * Pre-fill the buffer. - */ + // Fill the buffer (synchronous). - readService = null; - + ft = null; + fillBuffer(); } @@ -270,7 +279,7 @@ while (src.hasNext()) { - Justification t = (Justification)src.next().getObject(); + final Justification t = (Justification)src.next().getObject(); try { @@ -310,16 +319,15 @@ assertOpen(); - if (readService != null) { + if (ft != null) { // This method MUST NOT be invoked when using the async reader. - throw new AssertionError(); } try { - // log.info("(Re-)filling buffer: remainingCapacity=" + // if(log.isDebugEnabled()) log.debug("(Re-)filling buffer: remainingCapacity=" // + buffer.remainingCapacity()); while (src.hasNext() && buffer.remainingCapacity() > 0) { @@ -347,8 +355,8 @@ } finally { - if (log.isInfoEnabled()) - log.info("(Re-)filled buffer: size=" + buffer.size() + if (log.isDebugEnabled()) + log.debug("(Re-)filled buffer: size=" + buffer.size() + ", remainingCapacity=" + buffer.remainingCapacity() + ", done=" + !src.hasNext()); @@ -368,16 +376,14 @@ * the underlying iterator. */ - if (readService != null) { + if (ft != null) { // async reader - so wait on it. - awaitReader(); } else { // sync reader - so fill the buffer in this thread. - fillBuffer(); } @@ -385,15 +391,13 @@ if (buffer.isEmpty()) { // the buffer is still empty, so the iterator is exhausted. - return false; } } - // at least one SPO in the buffer. - + // at least one Justification in the buffer. return true; } @@ -439,30 +443,29 @@ } - if (readService != null) { + if (ft != null) { // make sure that we fill the buffer before we deliver a chunk. - awaitReader(); } // there are at least this many in the buffer. - final int n = buffer.size(); // allocate the array. + final Justification[] stmts = new Justification[n]; - Justification[] stmts = new Justification[n]; - for (int i = 0; i < n; i++) { stmts[i] = next(); } - log.info("chunkSize=" + n + ", nchunks=" + nchunks + ", #read(caller)=" - + numReadByCaller + ", #read(src)=" + numBuffered); + if (log.isDebugEnabled()) + log.debug("chunkSize=" + n + ", nchunks=" + nchunks + + ", #read(caller)=" + numReadByCaller + ", #read(src)=" + + numBuffered); return stmts; @@ -480,7 +483,7 @@ */ private void awaitReader() { - if (readService == null) { + if (ft == null) { /* * This method MUST NOT be invoked unless you are using the async @@ -491,7 +494,7 @@ } - final long begin = System.currentTimeMillis(); +// final long begin = System.currentTimeMillis(); /* * Wait for at least N records to show up. @@ -503,6 +506,18 @@ try { + /* + * TODO This uses a Thread.sleep() to avoid a lock ordering + * problem because we did not have access to the lock used + * internally by the blocking queue when this code was written. + * However, we now have incorporated at least one JSR166 + * blocking queue class which can use the caller's lock. That + * makes it possible to setup conditions which can be awaited + * for full/not-full, etc., but you have to be careful not to + * violate the manner in which the lock signal/notify semantics + * are used internally by the blocking queue implementation. + */ + Thread.sleep(10/*ms*/); } catch (InterruptedException ex) { @@ -511,14 +526,14 @@ } - long elapsed = System.currentTimeMillis() - begin; +// final long elapsed = System.currentTimeMillis() - begin; +// +// if (elapsed > TIMEOUT && buffer.isEmpty()) { +// +// throw new RuntimeException("Timeout after " + elapsed + "ms"); +// +// } - if (elapsed > TIMEOUT && buffer.isEmpty()) { - - throw new RuntimeException("Timeout after " + elapsed + "ms"); - - } - } } @@ -539,40 +554,37 @@ if (!open) { // Already closed. - return; } - log.info("Closing iterator"); + log.debug("Closing iterator"); open = false; - if (readService != null) { + if (ft != null) { - // immediate shutdown. - readService.shutdownNow(); + // terminate the Reader. + ft.cancel(true/* mayInterruptIfRunning */); - try { +// try { +// +// readService.awaitTermination(500, TimeUnit.MILLISECONDS); +// +// } catch (InterruptedException e) { +// +// log.warn("Read service did not terminate: " + e); +// +// } - readService.awaitTermination(500, TimeUnit.MILLISECONDS); - - } catch (InterruptedException e) { - - log.warn("Read service did not terminate: " + e); - - } - } // discard buffer. - buffer.clear(); buffer = null; // discard the source iterator. - src = null; } Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/JustificationWriter.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/JustificationWriter.java 2011-05-17 18:46:55 UTC (rev 4514) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/JustificationWriter.java 2011-05-18 13:03:50 UTC (rev 4515) @@ -40,8 +40,8 @@ * Incremented as a side-effect for each justification * actually written on the justification index. */ - public JustificationWriter(AbstractTripleStore dst, - IChunkedIterator<Justification> src, AtomicLong nwritten) { + public JustificationWriter(final AbstractTripleStore dst, + final IChunkedIterator<Justification> src, final AtomicLong nwritten) { this.dst = dst; Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2011-05-17 18:46:55 UTC (rev 4514) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2011-05-18 13:03:50 UTC (rev 4515) @@ -3208,8 +3208,9 @@ if (justify) { final IJustificationIterator jitr = new JustificationIterator( + getIndexManager(), getSPORelation().getJustificationIndex(), - 0/* capacity */, true/* async */); + 0/* capacity */); tasks.add(new JustificationWriter(dst, jitr, nwrittenj)); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |