From: <tho...@us...> - 2013-09-12 16:02:13
|
Revision: 7398 http://bigdata.svn.sourceforge.net/bigdata/?rev=7398&view=rev Author: thompsonbry Date: 2013-09-12 16:02:04 +0000 (Thu, 12 Sep 2013) Log Message: ----------- Decoupled the Apache2 module for the GAS Engine. I had to duplicate two classes (DeamonThreadFactory and Sesame2BigdataIterator). There is currently a problem running ant for the bigdata-gas module under eclipse. In part, this may have to do with the path to the smallGraph.ttl file but it also appears to be confounded with the JDK6/JDK7 external executable configurations in exclipse. See #629 (Graph mining API) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/build.xml branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASStats.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/STScheduler.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedArray.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedIntArray.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/util/GASUtil.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/ctc-striterators-0.1.0.jar branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-api-1.6.1.jar branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-log4j12-1.6.1.jar branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/DaemonThreadFactory.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/Sesame2BigdataIterator.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler2.java Removed Paths: ------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler2.java Property Changed: ---------------- branches/BIGDATA_RELEASE_1_3_0/ctc-striterators/ Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/build.xml =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/build.xml 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/build.xml 2013-09-12 16:02:04 UTC (rev 7398) @@ -12,8 +12,10 @@ <pathelement location="${build.dir}/classes" /> <pathelement location="${build.dir}/test" /> <fileset dir="${build.lib.dir}"> - <include name="junit*.jar" /> - <include name="log4j*.jar" /> +<!-- <include name="junit*.jar" /> + <include name="log4j*.jar" /> + <include name="ctc-striterators*.jar" /> + <include name="openrdf*.jar" /> --> </fileset> </path> @@ -74,7 +76,7 @@ </jar> </target> - <target name="test" depends="compile"> + <target name="test" depends="clean, compile"> <javac destdir="${build.dir}/test" debug="${javac.debug}" debuglevel="${javac.debuglevel}" verbose="${javac.verbose}" encoding="${javac.encoding}"> <classpath refid="test.classpath" /> <src path="${bigdata-gas.dir}/src/test" /> @@ -93,7 +95,7 @@ <!-- ant -DtestName=com.bigdata.cache.TestAll junit --> <test name="${testName}" todir="${test.results.dir}" if="testName" /> <!-- Test suites to run when -DtestName is not set --> - <test name="com.bigdata.gas.TestAll" todir="${test.results.dir}" unless="testName" /> + <test name="com.bigdata.rdf.graph.TestAll" todir="${test.results.dir}" unless="testName" /> </junit> <!-- Generate an HTML report. --> <junitreport todir="${build.dir}"> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/ctc-striterators-0.1.0.jar =================================================================== (Binary files differ) Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/ctc-striterators-0.1.0.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-api-1.6.1.jar =================================================================== (Binary files differ) Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-api-1.6.1.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-log4j12-1.6.1.jar =================================================================== (Binary files differ) Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-log4j12-1.6.1.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/DaemonThreadFactory.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/DaemonThreadFactory.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/DaemonThreadFactory.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -0,0 +1,115 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.rdf.graph.impl; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +/** + * A thread factory that configures the thread as a daemon thread. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: DaemonThreadFactory.java 5824 2011-12-29 20:52:02Z thompsonbry $ + */ +/* + * Note: This is a clone of the same-named class in the bigdata module. The + * clone exists to have it under the Apache 2 license requiring the creation of + * a bigdata-commons module. + */ +class DaemonThreadFactory implements ThreadFactory { + + final private ThreadFactory delegate; + final private String basename; // MAY be null. + private int counter = 0; // used iff basename was given. + + private static ThreadFactory _default = new DaemonThreadFactory(); + + /** + * Returns an instance based on {@link Executors#defaultThreadFactory()} + * that configures the thread for daemon mode. + */ + final public static ThreadFactory defaultThreadFactory() { + + return _default; + + } + + /** + * Uses {@link Executors#defaultThreadFactory()} as the delegate. + */ + public DaemonThreadFactory() { + + this( Executors.defaultThreadFactory(), null/*basename*/ ); + + } + + public DaemonThreadFactory(String basename) { + + this(Executors.defaultThreadFactory(), basename); + + } + + /** + * Uses the specified delegate {@link ThreadFactory}. + * + * @param delegate + * The delegate thread factory that is responsible for creating + * the threads. + * @param basename + * Optional prefix that will be used to assign names to the + * generated threads. + */ + public DaemonThreadFactory(final ThreadFactory delegate, + final String basename) { + + if (delegate == null) + throw new IllegalArgumentException(); + + this.delegate = delegate; + + this.basename = basename; + + } + + public Thread newThread(final Runnable r) { + + final Thread t = delegate.newThread( r ); + + if (basename != null) { + + counter++; + + t.setName(basename + counter); + + } + + t.setDaemon(true); + +// System.err.println("new thread: "+t.getName()+", id="+t.getId()); + + return t; + + } + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -37,7 +37,6 @@ import com.bigdata.rdf.graph.IStaticFrontier; import com.bigdata.rdf.graph.impl.frontier.StaticFrontier2; import com.bigdata.rdf.graph.impl.scheduler.CHMScheduler; -import com.bigdata.util.concurrent.DaemonThreadFactory; /** * {@link IGASEngine} for dynamic activation of vertices. This implementation Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASStats.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASStats.java 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASStats.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -16,8 +16,8 @@ package com.bigdata.rdf.graph.impl; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; -import com.bigdata.counters.CAT; import com.bigdata.rdf.graph.IGASStats; import com.bigdata.rdf.graph.util.GASUtil; @@ -28,10 +28,10 @@ */ public class GASStats implements IGASStats { - private final CAT nrounds = new CAT(); - private final CAT frontierSize = new CAT(); - private final CAT nedges = new CAT(); - private final CAT elapsedNanos = new CAT(); + private final AtomicLong nrounds = new AtomicLong(); + private final AtomicLong frontierSize = new AtomicLong(); + private final AtomicLong nedges = new AtomicLong(); + private final AtomicLong elapsedNanos = new AtomicLong(); /* (non-Javadoc) * @see com.bigdata.rdf.graph.impl.IFOO#add(long, long, long) @@ -40,13 +40,13 @@ public void add(final long frontierSize, final long nedges, final long elapsedNanos) { - this.nrounds.increment(); + this.nrounds.incrementAndGet(); - this.frontierSize.add(frontierSize); + this.frontierSize.addAndGet(frontierSize); - this.nedges.add(nedges); + this.nedges.addAndGet(nedges); - this.elapsedNanos.add(elapsedNanos); + this.elapsedNanos.addAndGet(elapsedNanos); } @@ -56,13 +56,13 @@ @Override public void add(final IGASStats o) { - nrounds.add(o.getNRounds()); + nrounds.addAndGet(o.getNRounds()); - frontierSize.add(o.getFrontierSize()); + frontierSize.addAndGet(o.getFrontierSize()); - nedges.add(o.getNEdges()); + nedges.addAndGet(o.getNEdges()); - elapsedNanos.add(o.getElapsedNanos()); + elapsedNanos.addAndGet(o.getElapsedNanos()); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -35,7 +35,6 @@ import com.bigdata.rdf.graph.IGraphAccessor; import com.bigdata.rdf.graph.impl.GASEngine; import com.bigdata.rdf.graph.impl.util.VertexDistribution; -import com.bigdata.rdf.sail.Sesame2BigdataIterator; import cutthecrap.utils.striterators.EmptyIterator; import cutthecrap.utils.striterators.IStriterator; Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/Sesame2BigdataIterator.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/Sesame2BigdataIterator.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/Sesame2BigdataIterator.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -0,0 +1,119 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +/* + * Created on Sep 4, 2008 + */ + +package com.bigdata.rdf.graph.impl.sail; + +import java.util.NoSuchElementException; + +import info.aduna.iteration.CloseableIteration; + +import cutthecrap.utils.striterators.ICloseableIterator; + +/** + * Class aligns a Sesame 2 {@link CloseableIteration} with a bigdata + * {@link ICloseableIterator}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: Bigdata2SesameIteration.java 2265 2009-10-26 12:51:06Z + * thompsonbry $ + * @param <T> + * The generic type of the visited elements. + * @param <E> + * The generic type of the exceptions thrown by the Sesame 2 + * {@link CloseableIteration}. + */ +/* + * Note: This is a clone of the same-named class in the bigdata-rdf module. The + * clone exists to have it under the Apache 2 license without going through a + * large relayering of the dependencies. + */ +class Sesame2BigdataIterator<T, E extends Exception> implements + ICloseableIterator<T> { + + private final CloseableIteration<? extends T,E> src; + + private volatile boolean open = true; + + public Sesame2BigdataIterator(final CloseableIteration<? extends T,E> src) { + + if (src == null) + throw new IllegalArgumentException(); + + this.src = src; + + } + + public void close() { + + if (open) { + open = false; + try { + src.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + } + + public boolean hasNext() { + + try { + + if (open && src.hasNext()) + return true; + + close(); + + return false; + + } catch(Exception e) { + throw new RuntimeException(e); + } + + } + + public T next() { + + if (!hasNext()) { + throw new NoSuchElementException(); + } + + try { + return src.next(); + } catch(Exception e) { + throw new RuntimeException(e); + } + + } + + public void remove() { + + if(!open) + throw new IllegalStateException(); + + try { + src.remove(); + } catch(Exception e) { + throw new RuntimeException(e); + } + + } + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/STScheduler.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/STScheduler.java 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/STScheduler.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -33,8 +33,8 @@ /** * The scheduled vertices. - */// Note: package private. Exposed to TLScheduler. - /*private*/ final Set<Value> vertices; + */ + private final Set<Value> vertices; private final boolean sortFrontier; public STScheduler(final GASEngine gasEngine) { @@ -44,6 +44,24 @@ } + /** + * The #of vertices in the frontier. + */ + public int size() { + + return vertices.size(); + + } + + /** + * The backing collection. + */ + public Set<Value> getVertices() { + + return vertices; + + } + @Override public void schedule(final Value v) { Deleted: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -1,275 +0,0 @@ -/** - Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ -package com.bigdata.rdf.graph.impl.scheduler; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.openrdf.model.Value; - -import com.bigdata.rdf.graph.IGASScheduler; -import com.bigdata.rdf.graph.IGASSchedulerImpl; -import com.bigdata.rdf.graph.IStaticFrontier; -import com.bigdata.rdf.graph.impl.GASEngine; -import com.bigdata.rdf.graph.impl.bd.MergeSortIterator; -import com.bigdata.rdf.graph.impl.util.GASImplUtil; -import com.bigdata.rdf.graph.impl.util.IArraySlice; -import com.bigdata.rdf.graph.impl.util.ManagedArray; -import com.bigdata.rdf.graph.util.GASUtil; - -/** - * This scheduler uses thread-local buffers ({@link LinkedHashSet}) to track the - * distinct vertices scheduled by each execution thread. After the computation - * round, those per-thread segments of the frontier are combined into a single - * global, compact, and ordered frontier. To maximize the parallel activity, the - * per-thread frontiers are sorted using N threads (one per segment). Finally, - * the frontier segments are combined using a {@link MergeSortIterator} - this - * is a sequential step with a linear cost in the size of the frontier. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * - * TODO Discard if dominated by {@link TLScheduler2}. - */ -public class TLScheduler implements IGASSchedulerImpl { - - /** - * Class bundles a reusable, extensible array for sorting the thread-local - * frontier. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private static class MySTScheduler extends STScheduler { - - /** - * This is used to sort the thread-local frontier (that is, the frontier - * for a single thread). The backing array will grow as necessary and is - * reused in each round. - * <P> - * Note: The schedule (for each thread) is using a set - see the - * {@link STScheduler} base class. This means that the schedule (for - * each thread) is compact, but not ordered. We need to use (and re-use) - * an array to order that compact per-thread schedule. The compact - * per-thread schedules are then combined into a single compact frontier - * for the new round. - */ - private final ManagedArray<Value> tmp; - - public MySTScheduler(final GASEngine gasEngine) { - - super(gasEngine); - - tmp = new ManagedArray<Value>(Value.class, 64); - - } - - } // class MySTScheduler - - private final GASEngine gasEngine; - private final int nthreads; - private final ConcurrentHashMap<Long/* threadId */, MySTScheduler> map; - - public TLScheduler(final GASEngine gasEngine) { - - this.gasEngine = gasEngine; - - this.nthreads = gasEngine.getNThreads(); - - this.map = new ConcurrentHashMap<Long, MySTScheduler>( - nthreads/* initialCapacity */, .75f/* loadFactor */, nthreads); - - } - - private IGASScheduler threadLocalScheduler() { - - final Long id = Thread.currentThread().getId(); - - MySTScheduler s = map.get(id); - - if (s == null) { - - final IGASScheduler old = map.putIfAbsent(id, s = new MySTScheduler( - gasEngine)); - - if (old != null) { - - /* - * We should not have a key collision since this is based on the - * threadId. - */ - - throw new AssertionError(); - - } - - } - - return s; - - } - - @Override - public void schedule(final Value v) { - - threadLocalScheduler().schedule(v); - - } - - @Override - public void clear() { - - /* - * Clear the per-thread maps, but do not discard. They will be reused in - * the next round. - * - * Note: This is a big cost. Simply clearing [map] results in much less - * time and less GC. - */ -// for (STScheduler s : map.values()) { -// -// s.clear(); -// -// } - map.clear(); - } - - @Override - public void compactFrontier(final IStaticFrontier frontier) { - - /* - * Extract a sorted, compact frontier from each thread local frontier. - */ - @SuppressWarnings("unchecked") - final IArraySlice<Value>[] frontiers = new IArraySlice[nthreads]; - - int nsources = 0; - int nvertices = 0; - { - final List<Callable<IArraySlice<Value>>> tasks = new ArrayList<Callable<IArraySlice<Value>>>( - nthreads); - - for (MySTScheduler s : map.values()) { - final MySTScheduler t = s; - tasks.add(new Callable<IArraySlice<Value>>() { - @Override - public IArraySlice<Value> call() throws Exception { - return GASImplUtil.compactAndSort(t.vertices, t.tmp); - } - }); - - } - // invokeAll() - futures will be done() before it returns. - final List<Future<IArraySlice<Value>>> futures; - try { - futures = gasEngine.getGASThreadPool().invokeAll(tasks); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - for (Future<IArraySlice<Value>> f : futures) { - - try { - final IArraySlice<Value> b = frontiers[nsources] = f.get(); - nvertices += b.len(); - nsources++; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - - } - } - - if (nvertices == 0) { - - /* - * The new frontier is empty. - */ - - frontier.resetFrontier(0/* minCapacity */, false/* sortFrontier */, - GASUtil.EMPTY_VERTICES_ITERATOR); - - return; - - } - - if (nsources > nthreads) { - - /* - * nsources could be LT nthreads if we have a very small frontier, - * but it should never be GTE nthreads. - */ - - throw new AssertionError("nsources=" + nsources + ", nthreads=" - + nthreads); - - } - - /* - * Now merge sort those arrays and populate the new frontier. - */ - mergeSortSourcesAndSetFrontier(nsources, nvertices, frontiers, frontier); - - } - - /** - * Now merge sort the ordered frontier segments and populate the new - * frontier. - * - * @param nsources - * The #of frontier segments. - * @param nvertices - * The total #of vertice across those segments (may double-count - * across segments). - * @param frontiers - * The ordered, compact frontier segments - * @param frontier - * The new frontier to be populated. - */ - private void mergeSortSourcesAndSetFrontier(final int nsources, - final int nvertices, final IArraySlice<Value>[] frontiers, - final IStaticFrontier frontier) { - - // wrap Values[] as Iterators. - @SuppressWarnings("unchecked") - final Iterator<Value>[] itrs = new Iterator[nsources]; - - for (int i = 0; i < nsources; i++) { - - itrs[i] = frontiers[i].iterator(); - - } - - // merge sort of those iterators. - final Iterator<Value> itr = new MergeSortIterator(itrs); - - /* - * Note: The merge iterator visits the vertices in the natural order and - * does not need to be sorted. - */ - frontier.resetFrontier(nvertices/* minCapacity */, - false/* sortFrontier */, itr); - - } - -} \ No newline at end of file Deleted: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler2.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler2.java 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler2.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -1,306 +0,0 @@ -/** - Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ -package com.bigdata.rdf.graph.impl.scheduler; - -import java.util.ArrayList; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.apache.log4j.Logger; -import org.openrdf.model.Value; - -import com.bigdata.rdf.graph.IGASScheduler; -import com.bigdata.rdf.graph.IGASSchedulerImpl; -import com.bigdata.rdf.graph.IStaticFrontier; -import com.bigdata.rdf.graph.impl.GASEngine; -import com.bigdata.rdf.graph.impl.bd.MergeSortIterator; -import com.bigdata.rdf.graph.impl.frontier.StaticFrontier2; -import com.bigdata.rdf.graph.impl.util.GASImplUtil; -import com.bigdata.rdf.graph.impl.util.IArraySlice; -import com.bigdata.rdf.graph.impl.util.ManagedArray; -import com.bigdata.rdf.graph.util.GASUtil; - -/** - * This scheduler uses thread-local buffers ({@link LinkedHashSet}) to track the - * distinct vertices scheduled by each execution thread. After the computation - * round, those per-thread segments of the frontier are combined into a single - * global, compact, and ordered frontier. To maximize the parallel activity, the - * per-thread frontiers are sorted using N threads (one per segment). Finally, - * the frontier segments are combined using a {@link MergeSortIterator} - this - * is a sequential step with a linear cost in the size of the frontier. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - */ -public class TLScheduler2 implements IGASSchedulerImpl { - - private static final Logger log = Logger.getLogger(TLScheduler2.class); - - /** - * Class bundles a reusable, extensible array for sorting the thread-local - * frontier. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private static class MySTScheduler extends STScheduler { - - /** - * This is used to sort the thread-local frontier (that is, the frontier - * for a single thread). The backing array will grow as necessary and is - * reused in each round. - * <P> - * Note: The schedule (for each thread) is using a set - see the - * {@link STScheduler} base class. This means that the schedule (for - * each thread) is compact, but not ordered. We need to use (and re-use) - * an array to order that compact per-thread schedule. The compact - * per-thread schedules are then combined into a single compact frontier - * for the new round. - */ - private final ManagedArray<Value> tmp; - - public MySTScheduler(final GASEngine gasEngine) { - - super(gasEngine); - - tmp = new ManagedArray<Value>(Value.class, 64); - - } - - } // class MySTScheduler - - private final GASEngine gasEngine; - private final int nthreads; - private final ConcurrentHashMap<Long/* threadId */, MySTScheduler> map; - - public TLScheduler2(final GASEngine gasEngine) { - - this.gasEngine = gasEngine; - - this.nthreads = gasEngine.getNThreads(); - - this.map = new ConcurrentHashMap<Long, MySTScheduler>( - nthreads/* initialCapacity */, .75f/* loadFactor */, nthreads); - - } - - private IGASScheduler threadLocalScheduler() { - - final Long id = Thread.currentThread().getId(); - - MySTScheduler s = map.get(id); - - if (s == null) { - - final IGASScheduler old = map.putIfAbsent(id, s = new MySTScheduler( - gasEngine)); - - if (old != null) { - - /* - * We should not have a key collision since this is based on the - * threadId. - */ - - throw new AssertionError(); - - } - - } - - return s; - - } - - @Override - public void schedule(final Value v) { - - threadLocalScheduler().schedule(v); - - } - - @Override - public void clear() { - - /* - * Clear the per-thread maps, but do not discard. They will be reused in - * the next round. - * - * Note: This is a big cost. Simply clearing [map] results in much less - * time and less GC. - */ -// for (STScheduler s : map.values()) { -// -// s.clear(); -// -// } - map.clear(); - } - - @Override - public void compactFrontier(final IStaticFrontier frontier) { - - /* - * Figure out the #of sources and the #of vertices across those sources. - * - * This also computes the cumulative offsets into the new frontier for - * the different per-thread segments. - */ - final int[] off = new int[nthreads]; // zero for 1st thread. - final int nsources; - final int nvertices; - { - int ns = 0, nv = 0; - for (MySTScheduler s : map.values()) { - final MySTScheduler t = s; - final int sz = t.vertices.size(); - off[ns] = nv; // starting index. - ns++; - nv += sz; - } - nsources = ns; - nvertices = nv; - } - - if (nsources > nthreads) { - - /* - * nsources could be LT nthreads if we have a very small frontier, - * but it should never be GTE nthreads. - */ - - throw new AssertionError("nsources=" + nsources + ", nthreads=" - + nthreads); - - } - - if (nvertices == 0) { - - /* - * The new frontier is empty. - */ - - frontier.resetFrontier(0/* minCapacity */, false/* sortFrontier */, - GASUtil.EMPTY_VERTICES_ITERATOR); - - return; - - } - - /* - * Parallel copy of the per-thread frontiers onto the new frontier. - * - * Note: This DOES NOT produce a compact frontier! The code that maps - * the gather/reduce operations over the frontier will eliminate - * duplicate work. - */ - - // TODO Requires a specific class to work! API! - final StaticFrontier2 f2 = (StaticFrontier2) frontier; - { - - // ensure sufficient capacity! - f2.resetAndEnsureCapacity(nvertices); - f2.setCompact(false); // NOT COMPACT! - - final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>( - nsources); - - int i = 0; - for (MySTScheduler s : map.values()) { // TODO Paranoia suggests to put these into an [] so we know that we have the same traversal order as above. That might not be guaranteed. - final MySTScheduler t = s; - final int index = i++; - tasks.add(new Callable<Void>() { - @Override - public Void call() throws Exception { - final IArraySlice<Value> orderedSegment = GASImplUtil - .compactAndSort(t.vertices, t.tmp); - f2.copyIntoResetFrontier(off[index], orderedSegment); - return (Void) null; - } - }); - } - - // invokeAll() - futures will be done() before it returns. - final List<Future<Void>> futures; - try { - futures = gasEngine.getGASThreadPool().invokeAll(tasks); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - for (Future<Void> f : futures) { - - try { - f.get(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - - } - } - if (log.isInfoEnabled()) - log.info("Done: " + this.getClass().getCanonicalName() - + ",frontier=" + frontier); -// /* -// * Now merge sort those arrays and populate the new frontier. -// */ -// mergeSortSourcesAndSetFrontier(nsources, nvertices, frontiers, frontier); - - } - -// /** -// * Now merge sort the ordered frontier segments and populate the new -// * frontier. -// * -// * @param nsources -// * The #of frontier segments. -// * @param nvertices -// * The total #of vertice across those segments (may double-count -// * across segments). -// * @param frontiers -// * The ordered, compact frontier segments -// * @param frontier -// * The new frontier to be populated. -// */ -// private void mergeSortSourcesAndSetFrontier(final int nsources, -// final int nvertices, final IArraySlice<Value>[] frontiers, -// final IStaticFrontier frontier) { -// -// // wrap Values[] as Iterators. -// @SuppressWarnings("unchecked") -// final Iterator<Value>[] itrs = new Iterator[nsources]; -// -// for (int i = 0; i < nsources; i++) { -// -// itrs[i] = frontiers[i].iterator(); -// -// } -// -// // merge sort of those iterators. -// final Iterator<Value> itr = new MergeSortIterator(itrs); -// -// frontier.resetFrontier(nvertices/* minCapacity */, true/* ordered */, -// itr); -// -// } - -} \ No newline at end of file Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedArray.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedArray.java 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedArray.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -19,8 +19,6 @@ import org.apache.log4j.Logger; -import com.bigdata.io.ByteArrayBuffer; - import cutthecrap.utils.striterators.ArrayIterator; /** @@ -337,8 +335,8 @@ } /** - * A slice of the outer {@link ByteArrayBuffer}. The slice will always - * reflect the backing {@link #array()} for the instance of the outer class. + * A slice of the outer {@link ManagedArray}. The slice will always reflect + * the backing {@link #array()} for the instance of the outer class. * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedIntArray.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedIntArray.java 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedIntArray.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -17,8 +17,6 @@ import org.apache.log4j.Logger; -import com.bigdata.io.ByteArrayBuffer; - /** * A view on a mutable int[] that may be extended. * <p> @@ -283,8 +281,8 @@ } /** - * A slice of the outer {@link ByteArrayBuffer}. The slice will always - * reflect the backing {@link #array()} for the instance of the outer class. + * A slice of the outer {@link ManagedArray}. The slice will always reflect + * the backing {@link #array()} for the instance of the outer class. * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/util/GASUtil.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/util/GASUtil.java 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/util/GASUtil.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -18,6 +18,7 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -228,8 +229,8 @@ } else { - throw new IOException("Could not locate resource: " - + resource); + throw new FileNotFoundException( + "Could not locate resource: " + resource); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java 2013-09-12 15:58:40 UTC (rev 7397) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -15,6 +15,8 @@ */ package com.bigdata.rdf.graph.impl.sail; +import java.io.FileNotFoundException; + import org.openrdf.model.URI; import org.openrdf.model.vocabulary.RDF; import org.openrdf.sail.Sail; @@ -69,13 +71,20 @@ /** * The data file. */ - static private final String smallGraph = "bigdata-gas/src/test/com/bigdata/rdf/graph/data/smallGraph.ttl"; + static private final String smallGraph1 = "bigdata-gas/src/test/com/bigdata/rdf/graph/data/smallGraph.ttl"; + static private final String smallGraph2 = "src/test/com/bigdata/rdf/graph/data/smallGraph.ttl"; private final URI rdfType, foafKnows, foafPerson, mike, bryan, martyn; public SmallGraphProblem() throws Exception { - getGraphFixture().loadGraph(smallGraph); + try { + // in eclipse with bigdata as the root dir. + getGraphFixture().loadGraph(smallGraph1); + } catch (FileNotFoundException ex) { + // from the ant build file with bigdata-gas as the root dir. + getGraphFixture().loadGraph(smallGraph2); + } final Sail sail = getGraphFixture().getSail(); Copied: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler.java (from rev 7382, branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java) =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -0,0 +1,275 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.bigdata.rdf.graph.impl.bd; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.openrdf.model.Value; + +import com.bigdata.rdf.graph.IGASScheduler; +import com.bigdata.rdf.graph.IGASSchedulerImpl; +import com.bigdata.rdf.graph.IStaticFrontier; +import com.bigdata.rdf.graph.impl.GASEngine; +import com.bigdata.rdf.graph.impl.scheduler.STScheduler; +import com.bigdata.rdf.graph.impl.util.GASImplUtil; +import com.bigdata.rdf.graph.impl.util.IArraySlice; +import com.bigdata.rdf.graph.impl.util.ManagedArray; +import com.bigdata.rdf.graph.util.GASUtil; + +/** + * This scheduler uses thread-local buffers ({@link LinkedHashSet}) to track the + * distinct vertices scheduled by each execution thread. After the computation + * round, those per-thread segments of the frontier are combined into a single + * global, compact, and ordered frontier. To maximize the parallel activity, the + * per-thread frontiers are sorted using N threads (one per segment). Finally, + * the frontier segments are combined using a {@link MergeSortIterator} - this + * is a sequential step with a linear cost in the size of the frontier. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * + * TODO Discard if dominated by {@link TLScheduler2}. + */ +public class TLScheduler implements IGASSchedulerImpl { + + /** + * Class bundles a reusable, extensible array for sorting the thread-local + * frontier. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private static class MySTScheduler extends STScheduler { + + /** + * This is used to sort the thread-local frontier (that is, the frontier + * for a single thread). The backing array will grow as necessary and is + * reused in each round. + * <P> + * Note: The schedule (for each thread) is using a set - see the + * {@link STScheduler} base class. This means that the schedule (for + * each thread) is compact, but not ordered. We need to use (and re-use) + * an array to order that compact per-thread schedule. The compact + * per-thread schedules are then combined into a single compact frontier + * for the new round. + */ + private final ManagedArray<Value> tmp; + + public MySTScheduler(final GASEngine gasEngine) { + + super(gasEngine); + + tmp = new ManagedArray<Value>(Value.class, 64); + + } + + } // class MySTScheduler + + private final GASEngine gasEngine; + private final int nthreads; + private final ConcurrentHashMap<Long/* threadId */, MySTScheduler> map; + + public TLScheduler(final GASEngine gasEngine) { + + this.gasEngine = gasEngine; + + this.nthreads = gasEngine.getNThreads(); + + this.map = new ConcurrentHashMap<Long, MySTScheduler>( + nthreads/* initialCapacity */, .75f/* loadFactor */, nthreads); + + } + + private IGASScheduler threadLocalScheduler() { + + final Long id = Thread.currentThread().getId(); + + MySTScheduler s = map.get(id); + + if (s == null) { + + final IGASScheduler old = map.putIfAbsent(id, s = new MySTScheduler( + gasEngine)); + + if (old != null) { + + /* + * We should not have a key collision since this is based on the + * threadId. + */ + + throw new AssertionError(); + + } + + } + + return s; + + } + + @Override + public void schedule(final Value v) { + + threadLocalScheduler().schedule(v); + + } + + @Override + public void clear() { + + /* + * Clear the per-thread maps, but do not discard. They will be reused in + * the next round. + * + * Note: This is a big cost. Simply clearing [map] results in much less + * time and less GC. + */ +// for (STScheduler s : map.values()) { +// +// s.clear(); +// +// } + map.clear(); + } + + @Override + public void compactFrontier(final IStaticFrontier frontier) { + + /* + * Extract a sorted, compact frontier from each thread local frontier. + */ + @SuppressWarnings("unchecked") + final IArraySlice<Value>[] frontiers = new IArraySlice[nthreads]; + + int nsources = 0; + int nvertices = 0; + { + final List<Callable<IArraySlice<Value>>> tasks = new ArrayList<Callable<IArraySlice<Value>>>( + nthreads); + + for (MySTScheduler s : map.values()) { + final MySTScheduler t = s; + tasks.add(new Callable<IArraySlice<Value>>() { + @Override + public IArraySlice<Value> call() throws Exception { + return GASImplUtil.compactAndSort(t.getVertices(), t.tmp); + } + }); + + } + // invokeAll() - futures will be done() before it returns. + final List<Future<IArraySlice<Value>>> futures; + try { + futures = gasEngine.getGASThreadPool().invokeAll(tasks); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + for (Future<IArraySlice<Value>> f : futures) { + + try { + final IArraySlice<Value> b = frontiers[nsources] = f.get(); + nvertices += b.len(); + nsources++; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + + } + } + + if (nvertices == 0) { + + /* + * The new frontier is empty. + */ + + frontier.resetFrontier(0/* minCapacity */, false/* sortFrontier */, + GASUtil.EMPTY_VERTICES_ITERATOR); + + return; + + } + + if (nsources > nthreads) { + + /* + * nsources could be LT nthreads if we have a very small frontier, + * but it should never be GTE nthreads. + */ + + throw new AssertionError("nsources=" + nsources + ", nthreads=" + + nthreads); + + } + + /* + * Now merge sort those arrays and populate the new frontier. + */ + mergeSortSourcesAndSetFrontier(nsources, nvertices, frontiers, frontier); + + } + + /** + * Now merge sort the ordered frontier segments and populate the new + * frontier. + * + * @param nsources + * The #of frontier segments. + * @param nvertices + * The total #of vertice across those segments (may double-count + * across segments). + * @param frontiers + * The ordered, compact frontier segments + * @param frontier + * The new frontier to be populated. + */ + private void mergeSortSourcesAndSetFrontier(final int nsources, + final int nvertices, final IArraySlice<Value>[] frontiers, + final IStaticFrontier frontier) { + + // wrap Values[] as Iterators. + @SuppressWarnings("unchecked") + final Iterator<Value>[] itrs = new Iterator[nsources]; + + for (int i = 0; i < nsources; i++) { + + itrs[i] = frontiers[i].iterator(); + + } + + // merge sort of those iterators. + final Iterator<Value> itr = new MergeSortIterator(itrs); + + /* + * Note: The merge iterator visits the vertices in the natural order and + * does not need to be sorted. + */ + frontier.resetFrontier(nvertices/* minCapacity */, + false/* sortFrontier */, itr); + + } + +} \ No newline at end of file Copied: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler2.java (from rev 7382, branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler2.java) =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler2.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler2.java 2013-09-12 16:02:04 UTC (rev 7398) @@ -0,0 +1,306 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.bigdata.rdf.graph.impl.bd; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.log4j.Logger; +import org.openrdf.model.Value; + +import com.bigdata.rdf.graph.IGASScheduler; +import com.bigdata.rdf.graph.IGASSchedulerImpl; +import com.bigdata.rdf.graph.IStaticFrontier; +import com.bigdata.rdf.graph.impl.GASEngine; +import com.bigdata.rdf.graph.impl.frontier.StaticFrontier2; +import com.bigdata.rdf.graph.impl.scheduler.STScheduler; +import com.bigdata.rdf.graph.impl.util.GASImplUtil; +import com.bigdata.rdf.graph.impl.util.IArraySlice; +import com.bigdata.rdf.graph.impl.util.ManagedArray; +import com.bigdata.rdf.graph.util.GASUtil; + +/** + * This scheduler uses thread-local buffers ({@link LinkedHashSet}) to track the + * distinct vertices scheduled by each execution thread. After the computation + * round, those per-thread segments of the frontier are combined into a single + * global, compact, and ordered frontier. To maximize the parallel activity, the + * per-thread frontiers are sorted using N threads (one per segment). Finally, + * the frontier segments are combined using a {@link MergeSortIterator} - this + * is a sequential step with a linear cost in the size of the frontier. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class TLScheduler2 implements IGASSchedulerImpl { + + private static final Logger log = Logger.getLogger(TLScheduler2.class); + + /** + * Class bundles a reusable, extensible array for sorting the thread-local + * frontier. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private static class MySTScheduler extends STScheduler { + + /** + * This is used to sort the thread-local frontier (that is, the frontier + * for a single thread). The backing array will grow as necessary and is + * reused in each round. + * <P> + * Note: The schedule (for each thread) is using a set - see the + * {@link STScheduler} base class. This means that the schedule (for + * each thread) is compact, but not ordered. We need to use (and re-use) + * an array to order that compact per-thread schedule. The compact + * per-thread schedules are then combined into a single compact frontier + * for the new round. + */ + private final ManagedArray<Value> tmp; + + public MySTScheduler(final GASEngine gasEngine) { + + super(gasEngine); + + tmp = new ManagedArray<Value>(Value.class, 64); + + } + + } // class MySTScheduler + + private final GASEngine gasEngine; + private final int nthreads; + private final ConcurrentHashMap<Long/* threadId */, MySTScheduler> map; + + public TLScheduler2(final GASEngine gasEngine) { + + this.gasEngine = gasEngine; + + this.nthreads = gasEngine.getNThreads(); + + this.map = new ConcurrentHashMap<Long, MySTScheduler>( + nthreads/* initialCapacity */, .75f/* loadFactor */, nthreads); + + } + + private IGASScheduler threadLocalScheduler() { + + final Long id = Thread.currentThread().getId(); + + MySTScheduler s = map.get(id); + + if (s == null) { + + final IGASScheduler old = map.putIfAbsent(id, s = new MySTScheduler( + gasEngine)); + + if (old != null) { + + /* + * We should not have a key collision since this is based on the + * threadId. + */ + + throw new AssertionError(); + + } + + } + + return s; + + } + + @Override + public void schedule(final Value v) { + + threadLocalScheduler().schedule(v); + + } + + @Override + public void clear() { + + /* + * Clear the per-thread maps, but do not discard. They will be reused in + * the next round. + * + * Note: This is a big cost. Simply clearing [map] results in much less + * time and less GC. + */ +// for (STScheduler s : map.values()) { +// +// s.clear(); +// +// } + map.clear(); + } + + @Override + public void compactFrontier(final IStaticFrontier frontier) { + + /* + * Figure out the #of sources and the #of vertices across those sources. + * + * This also computes the cumulative offsets into the new frontier for + * the different per-thread segments. + */ + final int[] off = new int[nthreads]; // zero for 1st thread. + final int nsources; + final int nvertices; + { + int ns = 0, nv = 0; + for (MySTScheduler s : map.values()) { + final MySTScheduler t = s; + final int sz = t.size(); + off[ns] = nv; // starting index. + ns++; + nv += sz; + } + nsources = ns; + nvertices = nv; + } + + if (nsources > nthreads) { + + /* + * nsources could be LT nthreads if we have a very small frontier, + * but it should never be GTE nthreads. + */ + + throw new AssertionError("nsources=" + nsources + ", nthreads=" + + nthreads); + + } + + if (nvertices == 0) { + + /* + * The new frontier is empty. + */ + + frontier.resetFrontier(0/* minCapacity */, false/* sortFrontier */, + GASUtil.EMPTY_VERTICES_ITERATOR); + + return; + + } + + /* + * Parallel copy of the per-thread frontiers onto the new frontier. + * + * Note: This DOES NOT produce a compact frontier! The code that maps + * the gather/reduce operations over the frontier will eliminate + * duplicate work. + */ + + // TODO Requires a specific class to work! API! + final StaticFrontier2 f2 = (StaticFrontier2) frontier; + { + + // ensure sufficient capacity! + f2.resetAndEnsureCapacity(nvertices); + f2.setCompact(false); // NOT COMPACT! + + final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>( + nsources); + + int i = 0; + for (MySTScheduler s : map.values()) { // TODO Paranoia suggests to put these into an [] so we know that we have the same trav... [truncated message content] |