From: <tho...@us...> - 2013-08-23 21:24:21
|
Revision: 7326 http://bigdata.svn.sourceforge.net/bigdata/?rev=7326&view=rev Author: thompsonbry Date: 2013-08-23 21:24:14 +0000 (Fri, 23 Aug 2013) Log Message: ----------- Modified to not execute GATHER and SCATTER stages when NoEdges was specified (9% of cost is a NOP GATHER for BFS). Added more info to the GASRunner reporting. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-08-23 15:05:39 UTC (rev 7325) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-08-23 21:24:14 UTC (rev 7326) @@ -578,10 +578,19 @@ */ final long beginGather = System.nanoTime(); - - final long gatherEdgeCount = gatherEdges(kb, gatherEdges, - pushDownApplyInGather); + final long gatherEdgeCount; + if (gatherEdges == EdgesEnum.NoEdges) { + + gatherEdgeCount = 0L; + + } else { + + gatherEdgeCount = gatherEdges(kb, gatherEdges, + pushDownApplyInGather); + + } + final long elapsedGather = System.nanoTime() - beginGather; /* @@ -609,10 +618,20 @@ */ final long beginScatter = System.nanoTime(); + + final long scatterEdgeCount; - final long scatterEdgeCount = scatterEdges(kb, scatterEdges, - pushDownApplyInScatter); + if (scatterEdges == EdgesEnum.NoEdges) { + scatterEdgeCount = 0L; + + } else { + + scatterEdgeCount = scatterEdges(kb, scatterEdges, + pushDownApplyInScatter); + + } + final long elapsedScatter = System.nanoTime() - beginScatter; /* Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-23 15:05:39 UTC (rev 7325) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-23 21:24:14 UTC (rev 7326) @@ -523,8 +523,10 @@ * Setup and run over those samples. */ + final IGASProgram<VS, ES, ST> gasProgram = newGASProgram(); + final IGASEngine<VS, ES, ST> gasEngine = new GASEngine<VS, ES, ST>(jnl, - namespace, ITx.READ_COMMITTED, newGASProgram(), nthreads); + namespace, ITx.READ_COMMITTED, gasProgram, nthreads); final GASStats total = new GASStats(); @@ -549,10 +551,11 @@ } // Total over all sampled vertices. - System.out.println("TOTAL: nseed=" + seed + ", nsamples=" + nsamples - + ", nthreads=" + nthreads + ", bufferMode=" - + jnl.getBufferStrategy().getBufferMode() + ", edges(kb)=" - + nedges + ", stats(total)=" + total); + System.out.println("TOTAL: analytic=" + + gasProgram.getClass().getSimpleName() + ", nseed=" + seed + + ", nsamples=" + nsamples + ", nthreads=" + nthreads + + ", bufferMode=" + jnl.getBufferStrategy().getBufferMode() + + ", edges(kb)=" + nedges + ", stats(total)=" + total); return total; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-25 20:47:34
|
Revision: 7339 http://bigdata.svn.sourceforge.net/bigdata/?rev=7339&view=rev Author: thompsonbry Date: 2013-08-25 20:47:26 +0000 (Sun, 25 Aug 2013) Log Message: ----------- Made it possible to configure the scheduler (a bit of a hack). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASContext.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASState.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/MergeSortIterator.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2013-08-25 16:33:39 UTC (rev 7338) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2013-08-25 20:47:26 UTC (rev 7339) @@ -108,7 +108,7 @@ this.program = program; - this.state = new GASState<VS, ES, ST>(gasEngine, program); + this.state = new GASState<VS, ES, ST>(gasEngine, this, program); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-08-25 16:33:39 UTC (rev 7338) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-08-25 20:47:26 UTC (rev 7339) @@ -1,11 +1,13 @@ package com.bigdata.rdf.graph.impl; +import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicReference; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.ITx; @@ -13,6 +15,8 @@ import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASEngine; import com.bigdata.rdf.graph.IGASProgram; +import com.bigdata.rdf.graph.impl.GASState.CHMScheduler; +import com.bigdata.rdf.graph.impl.GASState.MyScheduler; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.store.AbstractTripleStore; import com.bigdata.service.IBigdataFederation; @@ -109,6 +113,10 @@ .newFixedThreadPool(nthreads, new DaemonThreadFactory( GASEngine.class.getSimpleName())); + this.schedulerClassRef = new AtomicReference<Class<MyScheduler>>(); + + this.schedulerClassRef.set((Class)CHMScheduler.class); + } /** @@ -365,4 +373,36 @@ } + private final AtomicReference<Class<MyScheduler>> schedulerClassRef; + + void setSchedulerClass(final Class<MyScheduler> newValue) { + + if(newValue == null) + throw new IllegalArgumentException(); + + schedulerClassRef.set(newValue); + + } + + MyScheduler newScheduler(final GASContext<?, ?, ?> gasContext) { + + final Class<MyScheduler> cls = schedulerClassRef.get(); + + try { + + final Constructor<MyScheduler> ctor = cls + .getConstructor(new Class[] { GASEngine.class }); + + final MyScheduler sch = ctor.newInstance(new Object[] { this }); + + return sch; + + } catch (Exception e) { + + throw new RuntimeException(e); + + } + + } + } // GASEngine Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-25 16:33:39 UTC (rev 7338) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-25 20:47:26 UTC (rev 7339) @@ -22,6 +22,9 @@ import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASEngine; import com.bigdata.rdf.graph.IGASProgram; +import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IScheduler; +import com.bigdata.rdf.graph.impl.GASState.MyScheduler; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.rio.LoadStats; import com.bigdata.rdf.sail.BigdataSail; @@ -76,6 +79,11 @@ private final String namespaceOverride; /** + * The {@link MyScheduler} class to use. + */ + private final Class<MyScheduler> schedulerClassOverride; + + /** * When non-<code>null</code>, a list of zero or more resources to be * loaded. The resources will be searched for as URLs, on the CLASSPATH, and * in the file system. @@ -129,6 +137,12 @@ * <dt>-seed</dt> * <dd>The seed for the random number generator (default is * <code>217L</code>).</dd> + * <dt>-bufferMode</dt> + * <dd>Overrides the {@link BufferMode} (if any) specified in the + * <code>propertyFile</code>.</dd> + * <dt>-schedulerClass</dt> + * <dd>Override the default {@link IScheduler}. Class must + * implement {@link MyScheduler}.</dd> * <dt>-namespace</dt> * <dd>The namespace of the default SPARQL endpoint (the * namespace will be <code>kb</code> if none was specified when @@ -138,12 +152,10 @@ * exist) at the time this utility is executed. This option may * appear multiple times. The resources will be searched for as * URLs, on the CLASSPATH, and in the file system.</dd> - * <dt>-bufferMode</dt> - * <dd>Overrides the {@link BufferMode} (if any) specified in the - * <code>propertyFile</code>.</dd> * </p> + * @throws ClassNotFoundException */ - public GASRunner(final String[] args) { + public GASRunner(final String[] args) throws ClassNotFoundException { Banner.banner(); @@ -151,6 +163,7 @@ int nsamples = 100; int nthreads = 4; BufferMode bufferMode = null; // override only. + Class<MyScheduler> schedulerClass = null; // override only. String namespace = "kb"; // Set of files to load (optional). LinkedHashSet<String> loadSet = new LinkedHashSet<String>(); @@ -182,6 +195,9 @@ } else if (arg.equals("-bufferMode")) { final String s = args[++i]; bufferMode = BufferMode.valueOf(s); + } else if (arg.equals("-schedulerClass")) { + final String s = args[++i]; + schedulerClass = (Class<MyScheduler>) Class.forName(s); } else if (arg.equals("-namespace")) { final String s = args[++i]; namespace = s; @@ -222,6 +238,7 @@ this.nthreads = nthreads; this.namespaceOverride = namespace; this.bufferModeOverride = bufferMode; + this.schedulerClassOverride = schedulerClass; this.loadSet = loadSet.isEmpty() ? null : loadSet .toArray(new String[loadSet.size()]); @@ -538,11 +555,20 @@ try { + if (schedulerClassOverride != null) { + + ((GASEngine) gasEngine) + .setSchedulerClass(schedulerClassOverride); + + } + final IGASProgram<VS, ES, ST> gasProgram = newGASProgram(); final IGASContext<VS, ES, ST> gasContext = gasEngine.newGASContext( namespace, jnl.getLastCommitTime(), gasProgram); + final IGASState<VS, ES, ST> gasState = gasContext.getGASState(); + final GASStats total = new GASStats(); for (int i = 0; i < samples.length; i++) { @@ -550,9 +576,9 @@ @SuppressWarnings("rawtypes") final IV startingVertex = samples[i]; - gasContext.getGASState().init(startingVertex); + gasState.init(startingVertex); - // TODO Pure interface for this. + // TODO STATS: Pure interface. final GASStats stats = (GASStats) gasContext.call(); total.add(stats); @@ -566,11 +592,16 @@ } // Total over all sampled vertices. - System.out.println("TOTAL: analytic=" - + gasProgram.getClass().getSimpleName() + ", nseed=" + seed - + ", nsamples=" + nsamples + ", nthreads=" + nthreads + System.out.println("TOTAL"// + +": analytic=" + gasProgram.getClass().getSimpleName() // + + ", nseed=" + seed + + ", nsamples=" + nsamples // + + ", nthreads=" + nthreads + ", bufferMode=" + jnl.getBufferStrategy().getBufferMode() - + ", edges(kb)=" + nedges + ", stats(total)=" + total); + + ", scheduler=" + ((GASState<VS, ES, ST>)gasState).getScheduler().getClass().getSimpleName() + + ", edges(kb)=" + nedges// + + ", stats(total)=" + total// + ); return total; Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASState.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASState.java 2013-08-25 16:33:39 UTC (rev 7338) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASState.java 2013-08-25 20:47:26 UTC (rev 7339) @@ -7,7 +7,6 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -24,7 +23,6 @@ import com.bigdata.rdf.graph.IGASState; import com.bigdata.rdf.graph.IScheduler; import com.bigdata.rdf.internal.IV; -import com.bigdata.rdf.internal.IVUtility; import com.bigdata.rdf.model.BigdataValue; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.store.AbstractTripleStore; @@ -34,12 +32,12 @@ @SuppressWarnings("rawtypes") public class GASState<VS, ES, ST> implements IGASState<VS, ES, ST> { - private static final Logger log = Logger.getLogger(GASState.class); + static final Logger log = Logger.getLogger(GASState.class); - /** - * The {@link GASEngine} on which the {@link IGASProgram} will be run. - */ - private final GASEngine gasEngine; +// /** +// * The {@link GASEngine} on which the {@link IGASProgram} will be run. +// */ +// private final GASEngine gasEngine; /** * The {@link IGASProgram} to be run. @@ -94,9 +92,11 @@ */ private final ConcurrentMap<ISPO, ES> edgeState = null; - GASState(final GASEngine gasEngine, final IGASProgram<VS, ES, ST> program) { + GASState(final GASEngine gasEngine, + final GASContext<VS, ES, ST> gasContext, + final IGASProgram<VS, ES, ST> program) { - this.gasEngine = gasEngine; +// this.gasEngine = gasEngine; this.gasProgram = program; @@ -106,17 +106,7 @@ this.frontier = new StaticFrontier(); - /* - * TODO FRONTIER: Choose thread-local versus CHM implementation using a - * GASEngine option and then echo in the GASRunner reports. - */ - if (false) { - this.scheduler = new SingleThreadScheduler(); - } else if (false) { - this.scheduler = new CHMScheduler(gasEngine.getNThreads()); - } else { - this.scheduler = new ThreadLocalScheduler(gasEngine); - } + this.scheduler = (MyScheduler) gasEngine.newScheduler(gasContext); } @@ -403,7 +393,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ - private interface MyScheduler extends IScheduler { + interface MyScheduler extends IScheduler { /** * Compact the schedule into the new frontier. @@ -456,7 +446,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ - static private class SingleThreadScheduler implements MyScheduler { + static class SingleThreadScheduler implements MyScheduler { private final Set<IV> vertices; @@ -495,14 +485,14 @@ * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> */ - static private class CHMScheduler implements MyScheduler { + static class CHMScheduler implements MyScheduler { // FIXME This is a Jetty class. Unbundle it! Use CHM instead. private final ConcurrentHashSet<IV> vertices; - public CHMScheduler(final int nthreads) { + public CHMScheduler(final GASEngine gasEngine) { - vertices = new ConcurrentHashSet<IV>(); + vertices = new ConcurrentHashSet<IV>(/* TODO nthreads (CHM) */); } @@ -542,7 +532,7 @@ * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> */ - static private class ThreadLocalScheduler implements MyScheduler { + static class ThreadLocalScheduler implements MyScheduler { private final GASEngine gasEngine; private final int nthreads; @@ -607,18 +597,7 @@ s.clear(); } - -// if (false) { -// /* -// * Note: This should not be required. It is a bit of a paranoid -// * step. It could reduce the efficiency by forcing us to -// * reallocate the backing data structures. We should keep those -// * on hand for the life of the Scheduler, which is linked to the -// * execution of the GASProgram. -// */ -// map.clear(); -// } - + } @Override @@ -725,278 +704,4 @@ } - /** - * An N-way merge sort of N source iterators. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private static class MergeSortIterator implements Iterator<IV> { - - /** - * The #of source iterators. - */ - private final int n; - - /** - * The source iterators in the order given to the ctor. - */ - private final Iterator<IV>[] sourceIterator; - - /** - * The current value from each source and <code>null</code> if we need - * to get another value from that source. The value for a source - * iterator that has been exhausted will remain <code>null</code>. When - * all entries in this array are <code>null</code> there are no more - * values to be visited and we are done. - */ - private final IV[] sourceTuple; - - /** - * Index into {@link #sourceIterator} and {@link #sourceTuple} of the - * iterator whose value will be returned next -or- <code>-1</code> if we - * need to choose the next value to be visited. - */ - private int current = -1; - - /** - * - * @param sourceIterators - * Each source iterator MUST be in ascending {@link IV} - * order. - */ - public MergeSortIterator(final Iterator<IV>[] sourceIterators) { - - assert sourceIterators != null; - - assert sourceIterators.length > 0; - - this.n = sourceIterators.length; - - for (int i = 0; i < n; i++) { - - assert sourceIterators[i] != null; - - } - - this.sourceIterator = sourceIterators; - - sourceTuple = new IV[n]; - - } - - @Override - public boolean hasNext() { - - /* - * Until we find an undeleted tuple (or any tuple if DELETED is - * true). - */ - while (true) { - - if (current != -1) { - - if (log.isTraceEnabled()) - log.trace("Already matched: source=" + current); - - return true; - - } - - /* - * First, make sure that we have a tuple for each source - * iterator (unless that iterator is exhausted). - */ - - int nexhausted = 0; - - for (int i = 0; i < n; i++) { - - if (sourceTuple[i] == null) { - - if (sourceIterator[i].hasNext()) { - - sourceTuple[i] = sourceIterator[i].next(); - - if (log.isTraceEnabled()) - log.trace("read sourceTuple[" + i + "]=" - + sourceTuple[i]); - - } else { - - nexhausted++; - - } - - } - - } - - if (nexhausted == n) { - - // the aggregate iterator is exhausted. - - return false; - - } - - /* - * Now consider the current tuple for each source iterator in - * turn and choose the _first_ iterator having a tuple whose key - * orders LTE all the others (or GTE if [reverseScan == true]). - * This is the next tuple to be visited by the aggregate - * iterator. - */ - { - - // current is index of the smallest key so far. - assert current == -1; - - IV key = null; // smallest key so far. - - for (int i = 0; i < n; i++) { - - if (sourceTuple[i] == null) { - - // This source is exhausted. - - continue; - - } - - if (current == -1) { - - current = i; - - key = sourceTuple[i]; - - assert key != null; - - } else { - - final IV tmp = sourceTuple[i]; - - final int ret = IVUtility.compare(tmp, key); - - if (ret < 0) { - - /* - * This key orders LT the current key. - * - * Note: This test MUST be strictly LT since LTE - * would break the precedence in which we are - * processing the source iterators and give us - * the key from the last source by preference - * when we need the key from the first source by - * preference. - */ - - current = i; - - key = tmp; - - } - - } - - } - - assert current != -1; - - } - - if (log.isDebugEnabled()) { - - log.debug("Will visit next: source=" + current - + ", tuple: " + sourceTuple[current]); - - } - - return true; - - } - - } - - @Override - public IV next() { - - if (!hasNext()) - throw new NoSuchElementException(); - - return consumeLookaheadTuple(); - - } - - /** - * Consume the {@link #current} source value. - * - * @return The {@link #current} tuple. - */ - private IV consumeLookaheadTuple() { - - final IV t = sourceTuple[current]; - - // clear tuples from other sources having the same key as the - // current tuple. - clearCurrent(); - - return t; - - } - - /** - * <p> - * Clear tuples from other sources having the same key as the current - * tuple (eliminates duplicates). - * </p> - */ - protected void clearCurrent() { - - assert current != -1; - - final IV key = sourceTuple[current]; - - for (int i = current + 1; i < n; i++) { - - if (sourceTuple[i] == null) { - - // this iterator is exhausted. - - continue; - - } - - final IV tmp = sourceTuple[i]; - - final int ret = IVUtility.compare(key, tmp); - - if (ret == 0) { - - // discard tuple. - - sourceTuple[i] = null; - - } - - } - - // clear the tuple that we are returning so that we will read - // another from that source. - sourceTuple[current] = null; - - // clear so that we will look again. - current = -1; - - } - - @Override - public void remove() { - - throw new UnsupportedOperationException(); - - } - - } // MergeSortIterator - } Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/MergeSortIterator.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/MergeSortIterator.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/MergeSortIterator.java 2013-08-25 20:47:26 UTC (rev 7339) @@ -0,0 +1,277 @@ +package com.bigdata.rdf.graph.impl; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.internal.IVUtility; + +/** + * An N-way merge sort of N source iterators. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +@SuppressWarnings("rawtypes") +class MergeSortIterator implements Iterator<IV> { + + /** + * The #of source iterators. + */ + private final int n; + + /** + * The source iterators in the order given to the ctor. + */ + private final Iterator<IV>[] sourceIterator; + + /** + * The current value from each source and <code>null</code> if we need to + * get another value from that source. The value for a source iterator that + * has been exhausted will remain <code>null</code>. When all entries in + * this array are <code>null</code> there are no more values to be visited + * and we are done. + */ + private final IV[] sourceTuple; + + /** + * Index into {@link #sourceIterator} and {@link #sourceTuple} of the + * iterator whose value will be returned next -or- <code>-1</code> if we + * need to choose the next value to be visited. + */ + private int current = -1; + + /** + * + * @param sourceIterators + * Each source iterator MUST be in ascending {@link IV} order. + */ + public MergeSortIterator(final Iterator<IV>[] sourceIterators) { + + assert sourceIterators != null; + + assert sourceIterators.length > 0; + + this.n = sourceIterators.length; + + for (int i = 0; i < n; i++) { + + assert sourceIterators[i] != null; + + } + + this.sourceIterator = sourceIterators; + + sourceTuple = new IV[n]; + + } + + @Override + public boolean hasNext() { + + /* + * Until we find an undeleted tuple (or any tuple if DELETED is true). + */ + while (true) { + + if (current != -1) { + + if (GASState.log.isTraceEnabled()) + GASState.log.trace("Already matched: source=" + current); + + return true; + + } + + /* + * First, make sure that we have a tuple for each source iterator + * (unless that iterator is exhausted). + */ + + int nexhausted = 0; + + for (int i = 0; i < n; i++) { + + if (sourceTuple[i] == null) { + + if (sourceIterator[i].hasNext()) { + + sourceTuple[i] = sourceIterator[i].next(); + + if (GASState.log.isTraceEnabled()) + GASState.log.trace("read sourceTuple[" + i + "]=" + + sourceTuple[i]); + + } else { + + nexhausted++; + + } + + } + + } + + if (nexhausted == n) { + + // the aggregate iterator is exhausted. + + return false; + + } + + /* + * Now consider the current tuple for each source iterator in turn + * and choose the _first_ iterator having a tuple whose key orders + * LTE all the others (or GTE if [reverseScan == true]). This is the + * next tuple to be visited by the aggregate iterator. + */ + { + + // current is index of the smallest key so far. + assert current == -1; + + IV key = null; // smallest key so far. + + for (int i = 0; i < n; i++) { + + if (sourceTuple[i] == null) { + + // This source is exhausted. + + continue; + + } + + if (current == -1) { + + current = i; + + key = sourceTuple[i]; + + assert key != null; + + } else { + + final IV tmp = sourceTuple[i]; + + final int ret = IVUtility.compare(tmp, key); + + if (ret < 0) { + + /* + * This key orders LT the current key. + * + * Note: This test MUST be strictly LT since LTE + * would break the precedence in which we are + * processing the source iterators and give us the + * key from the last source by preference when we + * need the key from the first source by preference. + */ + + current = i; + + key = tmp; + + } + + } + + } + + assert current != -1; + + } + + if (GASState.log.isDebugEnabled()) { + + GASState.log.debug("Will visit next: source=" + current + + ", tuple: " + sourceTuple[current]); + + } + + return true; + + } + + } + + @Override + public IV next() { + + if (!hasNext()) + throw new NoSuchElementException(); + + return consumeLookaheadTuple(); + + } + + /** + * Consume the {@link #current} source value. + * + * @return The {@link #current} tuple. + */ + private IV consumeLookaheadTuple() { + + final IV t = sourceTuple[current]; + + // clear tuples from other sources having the same key as the + // current tuple. + clearCurrent(); + + return t; + + } + + /** + * <p> + * Clear tuples from other sources having the same key as the current tuple + * (eliminates duplicates). + * </p> + */ + protected void clearCurrent() { + + assert current != -1; + + final IV key = sourceTuple[current]; + + for (int i = current + 1; i < n; i++) { + + if (sourceTuple[i] == null) { + + // this iterator is exhausted. + + continue; + + } + + final IV tmp = sourceTuple[i]; + + final int ret = IVUtility.compare(key, tmp); + + if (ret == 0) { + + // discard tuple. + + sourceTuple[i] = null; + + } + + } + + // clear the tuple that we are returning so that we will read + // another from that source. + sourceTuple[current] = null; + + // clear so that we will look again. + current = -1; + + } + + @Override + public void remove() { + + throw new UnsupportedOperationException(); + + } + +} // MergeSortIterator \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-28 19:14:18
|
Revision: 7354 http://bigdata.svn.sourceforge.net/bigdata/?rev=7354&view=rev Author: thompsonbry Date: 2013-08-28 19:14:10 +0000 (Wed, 28 Aug 2013) Log Message: ----------- Modified the thread-local scheduler to reuse the same backing array to sort the thread-local frontier in each round. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/util/GASImplUtil.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java 2013-08-28 18:57:07 UTC (rev 7353) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java 2013-08-28 19:14:10 UTC (rev 7354) @@ -14,12 +14,11 @@ import com.bigdata.rdf.graph.IStaticFrontier; import com.bigdata.rdf.graph.impl.GASEngine; import com.bigdata.rdf.graph.impl.util.GASImplUtil; +import com.bigdata.rdf.graph.impl.util.IArraySlice; import com.bigdata.rdf.graph.impl.util.ManagedArray; import com.bigdata.rdf.graph.impl.util.MergeSortIterator; import com.bigdata.rdf.internal.IV; -import cutthecrap.utils.striterators.ArrayIterator; - /** * This scheduler uses thread-local buffers ({@link LinkedHashSet}) to track the * distinct vertices scheduled by each execution thread. After the computation @@ -34,35 +33,35 @@ @SuppressWarnings("rawtypes") public class TLScheduler implements IGASSchedulerImpl { -// /** -// * Class bundles a reusable, extensible array for sorting the thread-local -// * frontier. -// * -// * @author <a href="mailto:tho...@us...">Bryan -// * Thompson</a> -// */ -// private static class MySTScheduler extends STScheduler { -// -// /** -// * This is used to sort the thread-local frontier (that is, the frontier -// * for a single thread). The backing array will grow as necessary and is -// * reused in each round. -// */ -// private final ManagedArray<IV> tmp; -// -// public MySTScheduler(final GASEngine gasEngine) { -// -// super(gasEngine); -// -// tmp = new ManagedArray<IV>(IV.class, 64); -// -// } -// -// } + /** + * Class bundles a reusable, extensible array for sorting the thread-local + * frontier. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private static class MySTScheduler extends STScheduler { + + /** + * This is used to sort the thread-local frontier (that is, the frontier + * for a single thread). The backing array will grow as necessary and is + * reused in each round. + */ + private final ManagedArray<IV> tmp; + + public MySTScheduler(final GASEngine gasEngine) { + + super(gasEngine); + + tmp = new ManagedArray<IV>(IV.class, 64); + + } + + } // class MySTScheduler private final GASEngine gasEngine; private final int nthreads; - private final ConcurrentHashMap<Long/* threadId */, STScheduler> map; + private final ConcurrentHashMap<Long/* threadId */, MySTScheduler> map; public TLScheduler(final GASEngine gasEngine) { @@ -70,7 +69,7 @@ this.nthreads = gasEngine.getNThreads(); - this.map = new ConcurrentHashMap<Long, STScheduler>( + this.map = new ConcurrentHashMap<Long, MySTScheduler>( nthreads/* initialCapacity */, .75f/* loadFactor */, nthreads); } @@ -79,11 +78,11 @@ final Long id = Thread.currentThread().getId(); - STScheduler s = map.get(id); + MySTScheduler s = map.get(id); if (s == null) { - final IGASScheduler old = map.putIfAbsent(id, s = new STScheduler( + final IGASScheduler old = map.putIfAbsent(id, s = new MySTScheduler( gasEngine)); if (old != null) { @@ -131,38 +130,38 @@ /* * Extract a sorted, compact frontier from each thread local frontier. */ - final IV[][] frontiers = new IV[nthreads][]; + @SuppressWarnings("unchecked") + final IArraySlice<IV>[] frontiers = new IArraySlice[nthreads]; int nsources = 0; int nvertices = 0; { - final List<Callable<IV[]>> tasks = new ArrayList<Callable<IV[]>>( + final List<Callable<IArraySlice<IV>>> tasks = new ArrayList<Callable<IArraySlice<IV>>>( nthreads); - for (STScheduler s : map.values()) { - final STScheduler t = s; - tasks.add(new Callable<IV[]>() { + for (MySTScheduler s : map.values()) { + final MySTScheduler t = s; + tasks.add(new Callable<IArraySlice<IV>>() { @Override - public IV[] call() throws Exception { - return GASImplUtil.compactAndSort(t.vertices); + public IArraySlice<IV> call() throws Exception { + return GASImplUtil.compactAndSort(t.vertices, t.tmp); } }); } // invokeAll() - futures will be done() before it returns. - final List<Future<IV[]>> futures; + final List<Future<IArraySlice<IV>>> futures; try { futures = gasEngine.getGASThreadPool().invokeAll(tasks); } catch (InterruptedException e) { throw new RuntimeException(e); } - for (Future<IV[]> f : futures) { + for (Future<IArraySlice<IV>> f : futures) { - final IV[] b; try { - b = frontiers[nsources] = f.get(); - nvertices += b.length; + final IArraySlice<IV> b = frontiers[nsources] = f.get(); + nvertices += b.len(); nsources++; } catch (InterruptedException e) { throw new RuntimeException(e); @@ -223,7 +222,7 @@ * The new frontier to be populated. */ private void mergeSortSourcesAndSetFrontier(final int nsources, - final int nvertices, final IV[][] frontiers, + final int nvertices, final IArraySlice<IV>[] frontiers, final IStaticFrontier frontier) { // wrap IVs[] as Iterators. @@ -232,7 +231,7 @@ for (int i = 0; i < nsources; i++) { - itrs[i] = new ArrayIterator<IV>(frontiers[i]); + itrs[i] = frontiers[i].iterator(); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/util/GASImplUtil.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/util/GASImplUtil.java 2013-08-28 18:57:07 UTC (rev 7353) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/util/GASImplUtil.java 2013-08-28 19:14:10 UTC (rev 7354) @@ -17,45 +17,45 @@ @SuppressWarnings({ "unchecked", "rawtypes" }) public static final Iterator<IV> EMPTY_VERTICES_ITERATOR = EmptyIterator.DEFAULT; - /** - * Compact a collection of vertices into an ordered frontier. - * - * @param vertices - * The collection of vertices for the new frontier. - * - * @return The compact, ordered frontier. - * - * @deprecated This implementation fails to reuse/grow the array for each - * round. This causes a lot of avoidable heap pressure during - * the single-threaded execution between each round and is a - * large percentage of the total runtime costs of the engine! - */ - @Deprecated - @SuppressWarnings("rawtypes") - public static IV[] compactAndSort(final Set<IV> vertices) { - - final IV[] a; - - final int size = vertices.size(); - - /* - * FIXME FRONTIER: Grow/reuse this array for each round! This is 15% of - * all time in the profiler. The #1 hot spot with the CHMScheduler. We - * need to reuse the target array!!! - */ - vertices.toArray(a = new IV[size]); - - /* - * Order for index access. An ordered scan on a B+Tree is 10X faster - * than random access lookups. - * - * Note: This uses natural V order, which is also the index order. - */ - java.util.Arrays.sort(a); - - return a; - - } +// /** +// * Compact a collection of vertices into an ordered frontier. +// * +// * @param vertices +// * The collection of vertices for the new frontier. +// * +// * @return The compact, ordered frontier. +// * +// * @deprecated This implementation fails to reuse/grow the array for each +// * round. This causes a lot of avoidable heap pressure during +// * the single-threaded execution between each round and is a +// * large percentage of the total runtime costs of the engine! +// */ +// @Deprecated +// @SuppressWarnings("rawtypes") +// public static IV[] compactAndSort(final Set<IV> vertices) { +// +// final IV[] a; +// +// final int size = vertices.size(); +// +// /* +// * FRONTIER: Grow/reuse this array for each round! This is 15% of +// * all time in the profiler. The #1 hot spot with the CHMScheduler. We +// * need to reuse the target array!!! +// */ +// vertices.toArray(a = new IV[size]); +// +// /* +// * Order for index access. An ordered scan on a B+Tree is 10X faster +// * than random access lookups. +// * +// * Note: This uses natural V order, which is also the index order. +// */ +// java.util.Arrays.sort(a); +// +// return a; +// +// } /** * Compact a collection of vertices into an ordered frontier. @@ -69,7 +69,7 @@ * @return A slice onto just the new frontier. */ @SuppressWarnings("rawtypes") - public static IArraySlice compactAndSort(final Set<IV> vertices, + public static IArraySlice<IV> compactAndSort(final Set<IV> vertices, final IManagedArray<IV> buffer) { final int nvertices = vertices.size(); @@ -93,8 +93,6 @@ * B+Tree is 10X faster than random access lookups. * * Note: This uses natural V order, which is also the index order. - * - * FIXME FRONTIER : We should parallelize this sort! */ java.util.Arrays.sort(a, 0/* fromIndex */, nvertices/* toIndex */); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-28 19:47:42
|
Revision: 7356 http://bigdata.svn.sourceforge.net/bigdata/?rev=7356&view=rev Author: thompsonbry Date: 2013-08-28 19:47:36 +0000 (Wed, 28 Aug 2013) Log Message: ----------- Deprecated the old StaticFrontier class. Refactored the new StaticFrontier2 class so that the time required to copy in the new frontier and the time to sort it show up clearly in the profiler and can be clearly distinguished. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier2.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier.java 2013-08-28 19:17:16 UTC (rev 7355) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier.java 2013-08-28 19:47:36 UTC (rev 7356) @@ -16,6 +16,7 @@ * kinds of mutation. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @deprecated by {@link StaticFrontier2}, which is more efficient. */ @SuppressWarnings("rawtypes") public class StaticFrontier implements IStaticFrontier { Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier2.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier2.java 2013-08-28 19:17:16 UTC (rev 7355) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier2.java 2013-08-28 19:47:36 UTC (rev 7356) @@ -67,19 +67,7 @@ return vertices.iterator(); } - - // private void clear() { - // - // vertices.clear(); - // - // } - - // private void schedule(IV v) { - // - // vertices.add(v); - // - // } - + /** * Setup the same static frontier object for the new compact fronter (it is * reused in each round). @@ -88,6 +76,32 @@ public void resetFrontier(final int minCapacity, final boolean ordered, final Iterator<IV> itr) { + copyScheduleIntoFrontier(minCapacity, itr); + + if (!ordered) { + + /* + * Sort the current slice of the backing array. + */ + + Arrays.sort(backing.array(), 0/* fromIndex */, vertices.len()/* toIndex */); + + } + + } + + /** + * Copy the data from the iterator into the backing array and update the + * slice which provides our exposed view of the backing array. + * + * @param minCapacity + * The minimum required capacity for the backing array. + * @param itr + * The source from which we will repopulate the backing array. + */ + private void copyScheduleIntoFrontier(final int minCapacity, + final Iterator<IV> itr) { + // ensure enough capacity for the new frontier. backing.ensureCapacity(minCapacity); @@ -104,19 +118,12 @@ } - if (!ordered) { + /* + * Take a slice of the backing showing only the valid entries and use it + * to replace the view of the backing array. + */ + this.vertices = backing.slice(0/* off */, nvertices); - // Sort. - Arrays.sort(a, 0/* fromIndex */, nvertices/* toIndex */); - - } - - // take a slice of the backing showing only the valid entries. - final IArraySlice<IV> tmp = backing.slice(0/* off */, nvertices); - - // update the view. - this.vertices = tmp; - } } \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-28 20:01:53
|
Revision: 7357 http://bigdata.svn.sourceforge.net/bigdata/?rev=7357&view=rev Author: thompsonbry Date: 2013-08-28 20:01:46 +0000 (Wed, 28 Aug 2013) Log Message: ----------- Modified StaticFrontier2 to clear out any non-null elements in the backing array. This addresses a GC leak during the computation. Modified TLScheduler to clear the map of per-thread schedulers rather than clearing each per-thread scheduler. This is a big hot spot. See #629 Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier2.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier2.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier2.java 2013-08-28 19:47:36 UTC (rev 7356) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/StaticFrontier2.java 2013-08-28 20:01:46 UTC (rev 7357) @@ -119,6 +119,18 @@ } /* + * Null fill until the end of the last frontier. That will help out GC. + * Otherwise those IV references are pinned and can hang around. We + * could track the high water mark on the backing array for this + * purpose. + */ + for (int i = nvertices; i < a.length; i++) { + if (a[i] == null) + break; + a[i] = null; + } + + /* * Take a slice of the backing showing only the valid entries and use it * to replace the view of the backing array. */ Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java 2013-08-28 19:47:36 UTC (rev 7356) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java 2013-08-28 20:01:46 UTC (rev 7357) @@ -122,13 +122,16 @@ /* * Clear the per-thread maps, but do not discard. They will be reused in * the next round. + * + * FIXME This is a big cost. Try simply clearing [map] and see if that + * is less expensive. */ - for (STScheduler s : map.values()) { - - s.clear(); - - } - +// for (STScheduler s : map.values()) { +// +// s.clear(); +// +// } + map.clear(); } @Override This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-30 13:36:21
|
Revision: 7365 http://bigdata.svn.sourceforge.net/bigdata/?rev=7365&view=rev Author: thompsonbry Date: 2013-08-30 13:36:14 +0000 (Fri, 30 Aug 2013) Log Message: ----------- The implementation to date has incorrectly failed to consider a bnode to be a vertex. All of the baselines will have to be recomputed (this has a large effect for the data set we have been using since bnodes are quite common in the foaf crawl.) See #629 (Graph Mining API) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASState.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASState.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASState.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASState.java 2013-08-30 13:15:14 UTC (rev 7364) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASState.java 2013-08-30 13:36:14 UTC (rev 7365) @@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; +import org.openrdf.model.Resource; import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; @@ -329,7 +330,7 @@ @Override public boolean isEdge(final Statement e) { - return e.getObject() instanceof URI; // FIXME CORRECTNESS (instanceof Resource) + return e.getObject() instanceof Resource; } @Override Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASState.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASState.java 2013-08-30 13:15:14 UTC (rev 7364) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASState.java 2013-08-30 13:36:14 UTC (rev 7365) @@ -117,9 +117,23 @@ public boolean isEdge(final Statement e) { final ISPO spo = (ISPO) e; + + /** + * For the early development of the GAS API, this test was written using + * o.isURI() rather than o.isResource(). That caused edges that ended in + * a bnode to be ignored, which means that a lot of the FOAF data set we + * were using was ignored. This was changed in r7365 to use + * isResource(). That change invalidates the historical baseline for the + * BFS and SSSP performance. This is also documented at the ticket + * below. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/629#comment:33"> + * Graph Mining API </a> + */ + return spo.o().isResource(); +// return spo.o().isURI(); - return spo.o().isURI(); // FIXME CORRECTNESS : isResource() - } @Override This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |