|
From: <tho...@us...> - 2013-09-12 16:02:13
|
Revision: 7398
http://bigdata.svn.sourceforge.net/bigdata/?rev=7398&view=rev
Author: thompsonbry
Date: 2013-09-12 16:02:04 +0000 (Thu, 12 Sep 2013)
Log Message:
-----------
Decoupled the Apache2 module for the GAS Engine. I had to duplicate two classes (DeamonThreadFactory and Sesame2BigdataIterator).
There is currently a problem running ant for the bigdata-gas module under eclipse. In part, this may have to do with the path to the smallGraph.ttl file but it also appears to be confounded with the JDK6/JDK7 external executable configurations in exclipse.
See #629 (Graph mining API)
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/build.xml
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASStats.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/STScheduler.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedArray.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedIntArray.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/util/GASUtil.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java
Added Paths:
-----------
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/ctc-striterators-0.1.0.jar
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-api-1.6.1.jar
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-log4j12-1.6.1.jar
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/DaemonThreadFactory.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/Sesame2BigdataIterator.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler2.java
Removed Paths:
-------------
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler2.java
Property Changed:
----------------
branches/BIGDATA_RELEASE_1_3_0/ctc-striterators/
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/build.xml
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/build.xml 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/build.xml 2013-09-12 16:02:04 UTC (rev 7398)
@@ -12,8 +12,10 @@
<pathelement location="${build.dir}/classes" />
<pathelement location="${build.dir}/test" />
<fileset dir="${build.lib.dir}">
- <include name="junit*.jar" />
- <include name="log4j*.jar" />
+<!-- <include name="junit*.jar" />
+ <include name="log4j*.jar" />
+ <include name="ctc-striterators*.jar" />
+ <include name="openrdf*.jar" /> -->
</fileset>
</path>
@@ -74,7 +76,7 @@
</jar>
</target>
- <target name="test" depends="compile">
+ <target name="test" depends="clean, compile">
<javac destdir="${build.dir}/test" debug="${javac.debug}" debuglevel="${javac.debuglevel}" verbose="${javac.verbose}" encoding="${javac.encoding}">
<classpath refid="test.classpath" />
<src path="${bigdata-gas.dir}/src/test" />
@@ -93,7 +95,7 @@
<!-- ant -DtestName=com.bigdata.cache.TestAll junit -->
<test name="${testName}" todir="${test.results.dir}" if="testName" />
<!-- Test suites to run when -DtestName is not set -->
- <test name="com.bigdata.gas.TestAll" todir="${test.results.dir}" unless="testName" />
+ <test name="com.bigdata.rdf.graph.TestAll" todir="${test.results.dir}" unless="testName" />
</junit>
<!-- Generate an HTML report. -->
<junitreport todir="${build.dir}">
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/ctc-striterators-0.1.0.jar
===================================================================
(Binary files differ)
Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/ctc-striterators-0.1.0.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-api-1.6.1.jar
===================================================================
(Binary files differ)
Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-api-1.6.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-log4j12-1.6.1.jar
===================================================================
(Binary files differ)
Property changes on: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/lib/slf4j-log4j12-1.6.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/DaemonThreadFactory.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/DaemonThreadFactory.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/DaemonThreadFactory.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -0,0 +1,115 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+package com.bigdata.rdf.graph.impl;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * A thread factory that configures the thread as a daemon thread.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id: DaemonThreadFactory.java 5824 2011-12-29 20:52:02Z thompsonbry $
+ */
+/*
+ * Note: This is a clone of the same-named class in the bigdata module. The
+ * clone exists to have it under the Apache 2 license requiring the creation of
+ * a bigdata-commons module.
+ */
+class DaemonThreadFactory implements ThreadFactory {
+
+ final private ThreadFactory delegate;
+ final private String basename; // MAY be null.
+ private int counter = 0; // used iff basename was given.
+
+ private static ThreadFactory _default = new DaemonThreadFactory();
+
+ /**
+ * Returns an instance based on {@link Executors#defaultThreadFactory()}
+ * that configures the thread for daemon mode.
+ */
+ final public static ThreadFactory defaultThreadFactory() {
+
+ return _default;
+
+ }
+
+ /**
+ * Uses {@link Executors#defaultThreadFactory()} as the delegate.
+ */
+ public DaemonThreadFactory() {
+
+ this( Executors.defaultThreadFactory(), null/*basename*/ );
+
+ }
+
+ public DaemonThreadFactory(String basename) {
+
+ this(Executors.defaultThreadFactory(), basename);
+
+ }
+
+ /**
+ * Uses the specified delegate {@link ThreadFactory}.
+ *
+ * @param delegate
+ * The delegate thread factory that is responsible for creating
+ * the threads.
+ * @param basename
+ * Optional prefix that will be used to assign names to the
+ * generated threads.
+ */
+ public DaemonThreadFactory(final ThreadFactory delegate,
+ final String basename) {
+
+ if (delegate == null)
+ throw new IllegalArgumentException();
+
+ this.delegate = delegate;
+
+ this.basename = basename;
+
+ }
+
+ public Thread newThread(final Runnable r) {
+
+ final Thread t = delegate.newThread( r );
+
+ if (basename != null) {
+
+ counter++;
+
+ t.setName(basename + counter);
+
+ }
+
+ t.setDaemon(true);
+
+// System.err.println("new thread: "+t.getName()+", id="+t.getId());
+
+ return t;
+
+ }
+
+}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -37,7 +37,6 @@
import com.bigdata.rdf.graph.IStaticFrontier;
import com.bigdata.rdf.graph.impl.frontier.StaticFrontier2;
import com.bigdata.rdf.graph.impl.scheduler.CHMScheduler;
-import com.bigdata.util.concurrent.DaemonThreadFactory;
/**
* {@link IGASEngine} for dynamic activation of vertices. This implementation
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASStats.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASStats.java 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASStats.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -16,8 +16,8 @@
package com.bigdata.rdf.graph.impl;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
-import com.bigdata.counters.CAT;
import com.bigdata.rdf.graph.IGASStats;
import com.bigdata.rdf.graph.util.GASUtil;
@@ -28,10 +28,10 @@
*/
public class GASStats implements IGASStats {
- private final CAT nrounds = new CAT();
- private final CAT frontierSize = new CAT();
- private final CAT nedges = new CAT();
- private final CAT elapsedNanos = new CAT();
+ private final AtomicLong nrounds = new AtomicLong();
+ private final AtomicLong frontierSize = new AtomicLong();
+ private final AtomicLong nedges = new AtomicLong();
+ private final AtomicLong elapsedNanos = new AtomicLong();
/* (non-Javadoc)
* @see com.bigdata.rdf.graph.impl.IFOO#add(long, long, long)
@@ -40,13 +40,13 @@
public void add(final long frontierSize, final long nedges,
final long elapsedNanos) {
- this.nrounds.increment();
+ this.nrounds.incrementAndGet();
- this.frontierSize.add(frontierSize);
+ this.frontierSize.addAndGet(frontierSize);
- this.nedges.add(nedges);
+ this.nedges.addAndGet(nedges);
- this.elapsedNanos.add(elapsedNanos);
+ this.elapsedNanos.addAndGet(elapsedNanos);
}
@@ -56,13 +56,13 @@
@Override
public void add(final IGASStats o) {
- nrounds.add(o.getNRounds());
+ nrounds.addAndGet(o.getNRounds());
- frontierSize.add(o.getFrontierSize());
+ frontierSize.addAndGet(o.getFrontierSize());
- nedges.add(o.getNEdges());
+ nedges.addAndGet(o.getNEdges());
- elapsedNanos.add(o.getElapsedNanos());
+ elapsedNanos.addAndGet(o.getElapsedNanos());
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -35,7 +35,6 @@
import com.bigdata.rdf.graph.IGraphAccessor;
import com.bigdata.rdf.graph.impl.GASEngine;
import com.bigdata.rdf.graph.impl.util.VertexDistribution;
-import com.bigdata.rdf.sail.Sesame2BigdataIterator;
import cutthecrap.utils.striterators.EmptyIterator;
import cutthecrap.utils.striterators.IStriterator;
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/Sesame2BigdataIterator.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/Sesame2BigdataIterator.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/Sesame2BigdataIterator.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -0,0 +1,119 @@
+/**
+ Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+/*
+ * Created on Sep 4, 2008
+ */
+
+package com.bigdata.rdf.graph.impl.sail;
+
+import java.util.NoSuchElementException;
+
+import info.aduna.iteration.CloseableIteration;
+
+import cutthecrap.utils.striterators.ICloseableIterator;
+
+/**
+ * Class aligns a Sesame 2 {@link CloseableIteration} with a bigdata
+ * {@link ICloseableIterator}.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id: Bigdata2SesameIteration.java 2265 2009-10-26 12:51:06Z
+ * thompsonbry $
+ * @param <T>
+ * The generic type of the visited elements.
+ * @param <E>
+ * The generic type of the exceptions thrown by the Sesame 2
+ * {@link CloseableIteration}.
+ */
+/*
+ * Note: This is a clone of the same-named class in the bigdata-rdf module. The
+ * clone exists to have it under the Apache 2 license without going through a
+ * large relayering of the dependencies.
+ */
+class Sesame2BigdataIterator<T, E extends Exception> implements
+ ICloseableIterator<T> {
+
+ private final CloseableIteration<? extends T,E> src;
+
+ private volatile boolean open = true;
+
+ public Sesame2BigdataIterator(final CloseableIteration<? extends T,E> src) {
+
+ if (src == null)
+ throw new IllegalArgumentException();
+
+ this.src = src;
+
+ }
+
+ public void close() {
+
+ if (open) {
+ open = false;
+ try {
+ src.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ public boolean hasNext() {
+
+ try {
+
+ if (open && src.hasNext())
+ return true;
+
+ close();
+
+ return false;
+
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public T next() {
+
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ try {
+ return src.next();
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void remove() {
+
+ if(!open)
+ throw new IllegalStateException();
+
+ try {
+ src.remove();
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/STScheduler.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/STScheduler.java 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/STScheduler.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -33,8 +33,8 @@
/**
* The scheduled vertices.
- */// Note: package private. Exposed to TLScheduler.
- /*private*/ final Set<Value> vertices;
+ */
+ private final Set<Value> vertices;
private final boolean sortFrontier;
public STScheduler(final GASEngine gasEngine) {
@@ -44,6 +44,24 @@
}
+ /**
+ * The #of vertices in the frontier.
+ */
+ public int size() {
+
+ return vertices.size();
+
+ }
+
+ /**
+ * The backing collection.
+ */
+ public Set<Value> getVertices() {
+
+ return vertices;
+
+ }
+
@Override
public void schedule(final Value v) {
Deleted: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -1,275 +0,0 @@
-/**
- Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-package com.bigdata.rdf.graph.impl.scheduler;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.openrdf.model.Value;
-
-import com.bigdata.rdf.graph.IGASScheduler;
-import com.bigdata.rdf.graph.IGASSchedulerImpl;
-import com.bigdata.rdf.graph.IStaticFrontier;
-import com.bigdata.rdf.graph.impl.GASEngine;
-import com.bigdata.rdf.graph.impl.bd.MergeSortIterator;
-import com.bigdata.rdf.graph.impl.util.GASImplUtil;
-import com.bigdata.rdf.graph.impl.util.IArraySlice;
-import com.bigdata.rdf.graph.impl.util.ManagedArray;
-import com.bigdata.rdf.graph.util.GASUtil;
-
-/**
- * This scheduler uses thread-local buffers ({@link LinkedHashSet}) to track the
- * distinct vertices scheduled by each execution thread. After the computation
- * round, those per-thread segments of the frontier are combined into a single
- * global, compact, and ordered frontier. To maximize the parallel activity, the
- * per-thread frontiers are sorted using N threads (one per segment). Finally,
- * the frontier segments are combined using a {@link MergeSortIterator} - this
- * is a sequential step with a linear cost in the size of the frontier.
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- *
- * TODO Discard if dominated by {@link TLScheduler2}.
- */
-public class TLScheduler implements IGASSchedulerImpl {
-
- /**
- * Class bundles a reusable, extensible array for sorting the thread-local
- * frontier.
- *
- * @author <a href="mailto:tho...@us...">Bryan
- * Thompson</a>
- */
- private static class MySTScheduler extends STScheduler {
-
- /**
- * This is used to sort the thread-local frontier (that is, the frontier
- * for a single thread). The backing array will grow as necessary and is
- * reused in each round.
- * <P>
- * Note: The schedule (for each thread) is using a set - see the
- * {@link STScheduler} base class. This means that the schedule (for
- * each thread) is compact, but not ordered. We need to use (and re-use)
- * an array to order that compact per-thread schedule. The compact
- * per-thread schedules are then combined into a single compact frontier
- * for the new round.
- */
- private final ManagedArray<Value> tmp;
-
- public MySTScheduler(final GASEngine gasEngine) {
-
- super(gasEngine);
-
- tmp = new ManagedArray<Value>(Value.class, 64);
-
- }
-
- } // class MySTScheduler
-
- private final GASEngine gasEngine;
- private final int nthreads;
- private final ConcurrentHashMap<Long/* threadId */, MySTScheduler> map;
-
- public TLScheduler(final GASEngine gasEngine) {
-
- this.gasEngine = gasEngine;
-
- this.nthreads = gasEngine.getNThreads();
-
- this.map = new ConcurrentHashMap<Long, MySTScheduler>(
- nthreads/* initialCapacity */, .75f/* loadFactor */, nthreads);
-
- }
-
- private IGASScheduler threadLocalScheduler() {
-
- final Long id = Thread.currentThread().getId();
-
- MySTScheduler s = map.get(id);
-
- if (s == null) {
-
- final IGASScheduler old = map.putIfAbsent(id, s = new MySTScheduler(
- gasEngine));
-
- if (old != null) {
-
- /*
- * We should not have a key collision since this is based on the
- * threadId.
- */
-
- throw new AssertionError();
-
- }
-
- }
-
- return s;
-
- }
-
- @Override
- public void schedule(final Value v) {
-
- threadLocalScheduler().schedule(v);
-
- }
-
- @Override
- public void clear() {
-
- /*
- * Clear the per-thread maps, but do not discard. They will be reused in
- * the next round.
- *
- * Note: This is a big cost. Simply clearing [map] results in much less
- * time and less GC.
- */
-// for (STScheduler s : map.values()) {
-//
-// s.clear();
-//
-// }
- map.clear();
- }
-
- @Override
- public void compactFrontier(final IStaticFrontier frontier) {
-
- /*
- * Extract a sorted, compact frontier from each thread local frontier.
- */
- @SuppressWarnings("unchecked")
- final IArraySlice<Value>[] frontiers = new IArraySlice[nthreads];
-
- int nsources = 0;
- int nvertices = 0;
- {
- final List<Callable<IArraySlice<Value>>> tasks = new ArrayList<Callable<IArraySlice<Value>>>(
- nthreads);
-
- for (MySTScheduler s : map.values()) {
- final MySTScheduler t = s;
- tasks.add(new Callable<IArraySlice<Value>>() {
- @Override
- public IArraySlice<Value> call() throws Exception {
- return GASImplUtil.compactAndSort(t.vertices, t.tmp);
- }
- });
-
- }
- // invokeAll() - futures will be done() before it returns.
- final List<Future<IArraySlice<Value>>> futures;
- try {
- futures = gasEngine.getGASThreadPool().invokeAll(tasks);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- for (Future<IArraySlice<Value>> f : futures) {
-
- try {
- final IArraySlice<Value> b = frontiers[nsources] = f.get();
- nvertices += b.len();
- nsources++;
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
-
- }
- }
-
- if (nvertices == 0) {
-
- /*
- * The new frontier is empty.
- */
-
- frontier.resetFrontier(0/* minCapacity */, false/* sortFrontier */,
- GASUtil.EMPTY_VERTICES_ITERATOR);
-
- return;
-
- }
-
- if (nsources > nthreads) {
-
- /*
- * nsources could be LT nthreads if we have a very small frontier,
- * but it should never be GTE nthreads.
- */
-
- throw new AssertionError("nsources=" + nsources + ", nthreads="
- + nthreads);
-
- }
-
- /*
- * Now merge sort those arrays and populate the new frontier.
- */
- mergeSortSourcesAndSetFrontier(nsources, nvertices, frontiers, frontier);
-
- }
-
- /**
- * Now merge sort the ordered frontier segments and populate the new
- * frontier.
- *
- * @param nsources
- * The #of frontier segments.
- * @param nvertices
- * The total #of vertice across those segments (may double-count
- * across segments).
- * @param frontiers
- * The ordered, compact frontier segments
- * @param frontier
- * The new frontier to be populated.
- */
- private void mergeSortSourcesAndSetFrontier(final int nsources,
- final int nvertices, final IArraySlice<Value>[] frontiers,
- final IStaticFrontier frontier) {
-
- // wrap Values[] as Iterators.
- @SuppressWarnings("unchecked")
- final Iterator<Value>[] itrs = new Iterator[nsources];
-
- for (int i = 0; i < nsources; i++) {
-
- itrs[i] = frontiers[i].iterator();
-
- }
-
- // merge sort of those iterators.
- final Iterator<Value> itr = new MergeSortIterator(itrs);
-
- /*
- * Note: The merge iterator visits the vertices in the natural order and
- * does not need to be sorted.
- */
- frontier.resetFrontier(nvertices/* minCapacity */,
- false/* sortFrontier */, itr);
-
- }
-
-}
\ No newline at end of file
Deleted: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler2.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler2.java 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler2.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -1,306 +0,0 @@
-/**
- Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-package com.bigdata.rdf.graph.impl.scheduler;
-
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.log4j.Logger;
-import org.openrdf.model.Value;
-
-import com.bigdata.rdf.graph.IGASScheduler;
-import com.bigdata.rdf.graph.IGASSchedulerImpl;
-import com.bigdata.rdf.graph.IStaticFrontier;
-import com.bigdata.rdf.graph.impl.GASEngine;
-import com.bigdata.rdf.graph.impl.bd.MergeSortIterator;
-import com.bigdata.rdf.graph.impl.frontier.StaticFrontier2;
-import com.bigdata.rdf.graph.impl.util.GASImplUtil;
-import com.bigdata.rdf.graph.impl.util.IArraySlice;
-import com.bigdata.rdf.graph.impl.util.ManagedArray;
-import com.bigdata.rdf.graph.util.GASUtil;
-
-/**
- * This scheduler uses thread-local buffers ({@link LinkedHashSet}) to track the
- * distinct vertices scheduled by each execution thread. After the computation
- * round, those per-thread segments of the frontier are combined into a single
- * global, compact, and ordered frontier. To maximize the parallel activity, the
- * per-thread frontiers are sorted using N threads (one per segment). Finally,
- * the frontier segments are combined using a {@link MergeSortIterator} - this
- * is a sequential step with a linear cost in the size of the frontier.
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- */
-public class TLScheduler2 implements IGASSchedulerImpl {
-
- private static final Logger log = Logger.getLogger(TLScheduler2.class);
-
- /**
- * Class bundles a reusable, extensible array for sorting the thread-local
- * frontier.
- *
- * @author <a href="mailto:tho...@us...">Bryan
- * Thompson</a>
- */
- private static class MySTScheduler extends STScheduler {
-
- /**
- * This is used to sort the thread-local frontier (that is, the frontier
- * for a single thread). The backing array will grow as necessary and is
- * reused in each round.
- * <P>
- * Note: The schedule (for each thread) is using a set - see the
- * {@link STScheduler} base class. This means that the schedule (for
- * each thread) is compact, but not ordered. We need to use (and re-use)
- * an array to order that compact per-thread schedule. The compact
- * per-thread schedules are then combined into a single compact frontier
- * for the new round.
- */
- private final ManagedArray<Value> tmp;
-
- public MySTScheduler(final GASEngine gasEngine) {
-
- super(gasEngine);
-
- tmp = new ManagedArray<Value>(Value.class, 64);
-
- }
-
- } // class MySTScheduler
-
- private final GASEngine gasEngine;
- private final int nthreads;
- private final ConcurrentHashMap<Long/* threadId */, MySTScheduler> map;
-
- public TLScheduler2(final GASEngine gasEngine) {
-
- this.gasEngine = gasEngine;
-
- this.nthreads = gasEngine.getNThreads();
-
- this.map = new ConcurrentHashMap<Long, MySTScheduler>(
- nthreads/* initialCapacity */, .75f/* loadFactor */, nthreads);
-
- }
-
- private IGASScheduler threadLocalScheduler() {
-
- final Long id = Thread.currentThread().getId();
-
- MySTScheduler s = map.get(id);
-
- if (s == null) {
-
- final IGASScheduler old = map.putIfAbsent(id, s = new MySTScheduler(
- gasEngine));
-
- if (old != null) {
-
- /*
- * We should not have a key collision since this is based on the
- * threadId.
- */
-
- throw new AssertionError();
-
- }
-
- }
-
- return s;
-
- }
-
- @Override
- public void schedule(final Value v) {
-
- threadLocalScheduler().schedule(v);
-
- }
-
- @Override
- public void clear() {
-
- /*
- * Clear the per-thread maps, but do not discard. They will be reused in
- * the next round.
- *
- * Note: This is a big cost. Simply clearing [map] results in much less
- * time and less GC.
- */
-// for (STScheduler s : map.values()) {
-//
-// s.clear();
-//
-// }
- map.clear();
- }
-
- @Override
- public void compactFrontier(final IStaticFrontier frontier) {
-
- /*
- * Figure out the #of sources and the #of vertices across those sources.
- *
- * This also computes the cumulative offsets into the new frontier for
- * the different per-thread segments.
- */
- final int[] off = new int[nthreads]; // zero for 1st thread.
- final int nsources;
- final int nvertices;
- {
- int ns = 0, nv = 0;
- for (MySTScheduler s : map.values()) {
- final MySTScheduler t = s;
- final int sz = t.vertices.size();
- off[ns] = nv; // starting index.
- ns++;
- nv += sz;
- }
- nsources = ns;
- nvertices = nv;
- }
-
- if (nsources > nthreads) {
-
- /*
- * nsources could be LT nthreads if we have a very small frontier,
- * but it should never be GTE nthreads.
- */
-
- throw new AssertionError("nsources=" + nsources + ", nthreads="
- + nthreads);
-
- }
-
- if (nvertices == 0) {
-
- /*
- * The new frontier is empty.
- */
-
- frontier.resetFrontier(0/* minCapacity */, false/* sortFrontier */,
- GASUtil.EMPTY_VERTICES_ITERATOR);
-
- return;
-
- }
-
- /*
- * Parallel copy of the per-thread frontiers onto the new frontier.
- *
- * Note: This DOES NOT produce a compact frontier! The code that maps
- * the gather/reduce operations over the frontier will eliminate
- * duplicate work.
- */
-
- // TODO Requires a specific class to work! API!
- final StaticFrontier2 f2 = (StaticFrontier2) frontier;
- {
-
- // ensure sufficient capacity!
- f2.resetAndEnsureCapacity(nvertices);
- f2.setCompact(false); // NOT COMPACT!
-
- final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(
- nsources);
-
- int i = 0;
- for (MySTScheduler s : map.values()) { // TODO Paranoia suggests to put these into an [] so we know that we have the same traversal order as above. That might not be guaranteed.
- final MySTScheduler t = s;
- final int index = i++;
- tasks.add(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- final IArraySlice<Value> orderedSegment = GASImplUtil
- .compactAndSort(t.vertices, t.tmp);
- f2.copyIntoResetFrontier(off[index], orderedSegment);
- return (Void) null;
- }
- });
- }
-
- // invokeAll() - futures will be done() before it returns.
- final List<Future<Void>> futures;
- try {
- futures = gasEngine.getGASThreadPool().invokeAll(tasks);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- for (Future<Void> f : futures) {
-
- try {
- f.get();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
-
- }
- }
- if (log.isInfoEnabled())
- log.info("Done: " + this.getClass().getCanonicalName()
- + ",frontier=" + frontier);
-// /*
-// * Now merge sort those arrays and populate the new frontier.
-// */
-// mergeSortSourcesAndSetFrontier(nsources, nvertices, frontiers, frontier);
-
- }
-
-// /**
-// * Now merge sort the ordered frontier segments and populate the new
-// * frontier.
-// *
-// * @param nsources
-// * The #of frontier segments.
-// * @param nvertices
-// * The total #of vertice across those segments (may double-count
-// * across segments).
-// * @param frontiers
-// * The ordered, compact frontier segments
-// * @param frontier
-// * The new frontier to be populated.
-// */
-// private void mergeSortSourcesAndSetFrontier(final int nsources,
-// final int nvertices, final IArraySlice<Value>[] frontiers,
-// final IStaticFrontier frontier) {
-//
-// // wrap Values[] as Iterators.
-// @SuppressWarnings("unchecked")
-// final Iterator<Value>[] itrs = new Iterator[nsources];
-//
-// for (int i = 0; i < nsources; i++) {
-//
-// itrs[i] = frontiers[i].iterator();
-//
-// }
-//
-// // merge sort of those iterators.
-// final Iterator<Value> itr = new MergeSortIterator(itrs);
-//
-// frontier.resetFrontier(nvertices/* minCapacity */, true/* ordered */,
-// itr);
-//
-// }
-
-}
\ No newline at end of file
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedArray.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedArray.java 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedArray.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -19,8 +19,6 @@
import org.apache.log4j.Logger;
-import com.bigdata.io.ByteArrayBuffer;
-
import cutthecrap.utils.striterators.ArrayIterator;
/**
@@ -337,8 +335,8 @@
}
/**
- * A slice of the outer {@link ByteArrayBuffer}. The slice will always
- * reflect the backing {@link #array()} for the instance of the outer class.
+ * A slice of the outer {@link ManagedArray}. The slice will always reflect
+ * the backing {@link #array()} for the instance of the outer class.
*
* @author <a href="mailto:tho...@us...">Bryan
* Thompson</a>
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedIntArray.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedIntArray.java 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/ManagedIntArray.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -17,8 +17,6 @@
import org.apache.log4j.Logger;
-import com.bigdata.io.ByteArrayBuffer;
-
/**
* A view on a mutable int[] that may be extended.
* <p>
@@ -283,8 +281,8 @@
}
/**
- * A slice of the outer {@link ByteArrayBuffer}. The slice will always
- * reflect the backing {@link #array()} for the instance of the outer class.
+ * A slice of the outer {@link ManagedArray}. The slice will always reflect
+ * the backing {@link #array()} for the instance of the outer class.
*
* @author <a href="mailto:tho...@us...">Bryan
* Thompson</a>
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/util/GASUtil.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/util/GASUtil.java 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/util/GASUtil.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -18,6 +18,7 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -228,8 +229,8 @@
} else {
- throw new IOException("Could not locate resource: "
- + resource);
+ throw new FileNotFoundException(
+ "Could not locate resource: " + resource);
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java 2013-09-12 15:58:40 UTC (rev 7397)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -15,6 +15,8 @@
*/
package com.bigdata.rdf.graph.impl.sail;
+import java.io.FileNotFoundException;
+
import org.openrdf.model.URI;
import org.openrdf.model.vocabulary.RDF;
import org.openrdf.sail.Sail;
@@ -69,13 +71,20 @@
/**
* The data file.
*/
- static private final String smallGraph = "bigdata-gas/src/test/com/bigdata/rdf/graph/data/smallGraph.ttl";
+ static private final String smallGraph1 = "bigdata-gas/src/test/com/bigdata/rdf/graph/data/smallGraph.ttl";
+ static private final String smallGraph2 = "src/test/com/bigdata/rdf/graph/data/smallGraph.ttl";
private final URI rdfType, foafKnows, foafPerson, mike, bryan, martyn;
public SmallGraphProblem() throws Exception {
- getGraphFixture().loadGraph(smallGraph);
+ try {
+ // in eclipse with bigdata as the root dir.
+ getGraphFixture().loadGraph(smallGraph1);
+ } catch (FileNotFoundException ex) {
+ // from the ant build file with bigdata-gas as the root dir.
+ getGraphFixture().loadGraph(smallGraph2);
+ }
final Sail sail = getGraphFixture().getSail();
Copied: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler.java (from rev 7382, branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler.java)
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -0,0 +1,275 @@
+/**
+ Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.bigdata.rdf.graph.impl.bd;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.openrdf.model.Value;
+
+import com.bigdata.rdf.graph.IGASScheduler;
+import com.bigdata.rdf.graph.IGASSchedulerImpl;
+import com.bigdata.rdf.graph.IStaticFrontier;
+import com.bigdata.rdf.graph.impl.GASEngine;
+import com.bigdata.rdf.graph.impl.scheduler.STScheduler;
+import com.bigdata.rdf.graph.impl.util.GASImplUtil;
+import com.bigdata.rdf.graph.impl.util.IArraySlice;
+import com.bigdata.rdf.graph.impl.util.ManagedArray;
+import com.bigdata.rdf.graph.util.GASUtil;
+
+/**
+ * This scheduler uses thread-local buffers ({@link LinkedHashSet}) to track the
+ * distinct vertices scheduled by each execution thread. After the computation
+ * round, those per-thread segments of the frontier are combined into a single
+ * global, compact, and ordered frontier. To maximize the parallel activity, the
+ * per-thread frontiers are sorted using N threads (one per segment). Finally,
+ * the frontier segments are combined using a {@link MergeSortIterator} - this
+ * is a sequential step with a linear cost in the size of the frontier.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ *
+ * TODO Discard if dominated by {@link TLScheduler2}.
+ */
+public class TLScheduler implements IGASSchedulerImpl {
+
+ /**
+ * Class bundles a reusable, extensible array for sorting the thread-local
+ * frontier.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ */
+ private static class MySTScheduler extends STScheduler {
+
+ /**
+ * This is used to sort the thread-local frontier (that is, the frontier
+ * for a single thread). The backing array will grow as necessary and is
+ * reused in each round.
+ * <P>
+ * Note: The schedule (for each thread) is using a set - see the
+ * {@link STScheduler} base class. This means that the schedule (for
+ * each thread) is compact, but not ordered. We need to use (and re-use)
+ * an array to order that compact per-thread schedule. The compact
+ * per-thread schedules are then combined into a single compact frontier
+ * for the new round.
+ */
+ private final ManagedArray<Value> tmp;
+
+ public MySTScheduler(final GASEngine gasEngine) {
+
+ super(gasEngine);
+
+ tmp = new ManagedArray<Value>(Value.class, 64);
+
+ }
+
+ } // class MySTScheduler
+
+ private final GASEngine gasEngine;
+ private final int nthreads;
+ private final ConcurrentHashMap<Long/* threadId */, MySTScheduler> map;
+
+ public TLScheduler(final GASEngine gasEngine) {
+
+ this.gasEngine = gasEngine;
+
+ this.nthreads = gasEngine.getNThreads();
+
+ this.map = new ConcurrentHashMap<Long, MySTScheduler>(
+ nthreads/* initialCapacity */, .75f/* loadFactor */, nthreads);
+
+ }
+
+ private IGASScheduler threadLocalScheduler() {
+
+ final Long id = Thread.currentThread().getId();
+
+ MySTScheduler s = map.get(id);
+
+ if (s == null) {
+
+ final IGASScheduler old = map.putIfAbsent(id, s = new MySTScheduler(
+ gasEngine));
+
+ if (old != null) {
+
+ /*
+ * We should not have a key collision since this is based on the
+ * threadId.
+ */
+
+ throw new AssertionError();
+
+ }
+
+ }
+
+ return s;
+
+ }
+
+ @Override
+ public void schedule(final Value v) {
+
+ threadLocalScheduler().schedule(v);
+
+ }
+
+ @Override
+ public void clear() {
+
+ /*
+ * Clear the per-thread maps, but do not discard. They will be reused in
+ * the next round.
+ *
+ * Note: This is a big cost. Simply clearing [map] results in much less
+ * time and less GC.
+ */
+// for (STScheduler s : map.values()) {
+//
+// s.clear();
+//
+// }
+ map.clear();
+ }
+
+ @Override
+ public void compactFrontier(final IStaticFrontier frontier) {
+
+ /*
+ * Extract a sorted, compact frontier from each thread local frontier.
+ */
+ @SuppressWarnings("unchecked")
+ final IArraySlice<Value>[] frontiers = new IArraySlice[nthreads];
+
+ int nsources = 0;
+ int nvertices = 0;
+ {
+ final List<Callable<IArraySlice<Value>>> tasks = new ArrayList<Callable<IArraySlice<Value>>>(
+ nthreads);
+
+ for (MySTScheduler s : map.values()) {
+ final MySTScheduler t = s;
+ tasks.add(new Callable<IArraySlice<Value>>() {
+ @Override
+ public IArraySlice<Value> call() throws Exception {
+ return GASImplUtil.compactAndSort(t.getVertices(), t.tmp);
+ }
+ });
+
+ }
+ // invokeAll() - futures will be done() before it returns.
+ final List<Future<IArraySlice<Value>>> futures;
+ try {
+ futures = gasEngine.getGASThreadPool().invokeAll(tasks);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ for (Future<IArraySlice<Value>> f : futures) {
+
+ try {
+ final IArraySlice<Value> b = frontiers[nsources] = f.get();
+ nvertices += b.len();
+ nsources++;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ }
+
+ if (nvertices == 0) {
+
+ /*
+ * The new frontier is empty.
+ */
+
+ frontier.resetFrontier(0/* minCapacity */, false/* sortFrontier */,
+ GASUtil.EMPTY_VERTICES_ITERATOR);
+
+ return;
+
+ }
+
+ if (nsources > nthreads) {
+
+ /*
+ * nsources could be LT nthreads if we have a very small frontier,
+ * but it should never be GTE nthreads.
+ */
+
+ throw new AssertionError("nsources=" + nsources + ", nthreads="
+ + nthreads);
+
+ }
+
+ /*
+ * Now merge sort those arrays and populate the new frontier.
+ */
+ mergeSortSourcesAndSetFrontier(nsources, nvertices, frontiers, frontier);
+
+ }
+
+ /**
+ * Now merge sort the ordered frontier segments and populate the new
+ * frontier.
+ *
+ * @param nsources
+ * The #of frontier segments.
+ * @param nvertices
+ * The total #of vertice across those segments (may double-count
+ * across segments).
+ * @param frontiers
+ * The ordered, compact frontier segments
+ * @param frontier
+ * The new frontier to be populated.
+ */
+ private void mergeSortSourcesAndSetFrontier(final int nsources,
+ final int nvertices, final IArraySlice<Value>[] frontiers,
+ final IStaticFrontier frontier) {
+
+ // wrap Values[] as Iterators.
+ @SuppressWarnings("unchecked")
+ final Iterator<Value>[] itrs = new Iterator[nsources];
+
+ for (int i = 0; i < nsources; i++) {
+
+ itrs[i] = frontiers[i].iterator();
+
+ }
+
+ // merge sort of those iterators.
+ final Iterator<Value> itr = new MergeSortIterator(itrs);
+
+ /*
+ * Note: The merge iterator visits the vertices in the natural order and
+ * does not need to be sorted.
+ */
+ frontier.resetFrontier(nvertices/* minCapacity */,
+ false/* sortFrontier */, itr);
+
+ }
+
+}
\ No newline at end of file
Copied: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler2.java (from rev 7382, branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/scheduler/TLScheduler2.java)
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler2.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/TLScheduler2.java 2013-09-12 16:02:04 UTC (rev 7398)
@@ -0,0 +1,306 @@
+/**
+ Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.bigdata.rdf.graph.impl.bd;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.log4j.Logger;
+import org.openrdf.model.Value;
+
+import com.bigdata.rdf.graph.IGASScheduler;
+import com.bigdata.rdf.graph.IGASSchedulerImpl;
+import com.bigdata.rdf.graph.IStaticFrontier;
+import com.bigdata.rdf.graph.impl.GASEngine;
+import com.bigdata.rdf.graph.impl.frontier.StaticFrontier2;
+import com.bigdata.rdf.graph.impl.scheduler.STScheduler;
+import com.bigdata.rdf.graph.impl.util.GASImplUtil;
+import com.bigdata.rdf.graph.impl.util.IArraySlice;
+import com.bigdata.rdf.graph.impl.util.ManagedArray;
+import com.bigdata.rdf.graph.util.GASUtil;
+
+/**
+ * This scheduler uses thread-local buffers ({@link LinkedHashSet}) to track the
+ * distinct vertices scheduled by each execution thread. After the computation
+ * round, those per-thread segments of the frontier are combined into a single
+ * global, compact, and ordered frontier. To maximize the parallel activity, the
+ * per-thread frontiers are sorted using N threads (one per segment). Finally,
+ * the frontier segments are combined using a {@link MergeSortIterator} - this
+ * is a sequential step with a linear cost in the size of the frontier.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+public class TLScheduler2 implements IGASSchedulerImpl {
+
+ private static final Logger log = Logger.getLogger(TLScheduler2.class);
+
+ /**
+ * Class bundles a reusable, extensible array for sorting the thread-local
+ * frontier.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ */
+ private static class MySTScheduler extends STScheduler {
+
+ /**
+ * This is used to sort the thread-local frontier (that is, the frontier
+ * for a single thread). The backing array will grow as necessary and is
+ * reused in each round.
+ * <P>
+ * Note: The schedule (for each thread) is using a set - see the
+ * {@link STScheduler} base class. This means that the schedule (for
+ * each thread) is compact, but not ordered. We need to use (and re-use)
+ * an array to order that compact per-thread schedule. The compact
+ * per-thread schedules are then combined into a single compact frontier
+ * for the new round.
+ */
+ private final ManagedArray<Value> tmp;
+
+ public MySTScheduler(final GASEngine gasEngine) {
+
+ super(gasEngine);
+
+ tmp = new ManagedArray<Value>(Value.class, 64);
+
+ }
+
+ } // class MySTScheduler
+
+ private final GASEngine gasEngine;
+ private final int nthreads;
+ private final ConcurrentHashMap<Long/* threadId */, MySTScheduler> map;
+
+ public TLScheduler2(final GASEngine gasEngine) {
+
+ this.gasEngine = gasEngine;
+
+ this.nthreads = gasEngine.getNThreads();
+
+ this.map = new ConcurrentHashMap<Long, MySTScheduler>(
+ nthreads/* initialCapacity */, .75f/* loadFactor */, nthreads);
+
+ }
+
+ private IGASScheduler threadLocalScheduler() {
+
+ final Long id = Thread.currentThread().getId();
+
+ MySTScheduler s = map.get(id);
+
+ if (s == null) {
+
+ final IGASScheduler old = map.putIfAbsent(id, s = new MySTScheduler(
+ gasEngine));
+
+ if (old != null) {
+
+ /*
+ * We should not have a key collision since this is based on the
+ * threadId.
+ */
+
+ throw new AssertionError();
+
+ }
+
+ }
+
+ return s;
+
+ }
+
+ @Override
+ public void schedule(final Value v) {
+
+ threadLocalScheduler().schedule(v);
+
+ }
+
+ @Override
+ public void clear() {
+
+ /*
+ * Clear the per-thread maps, but do not discard. They will be reused in
+ * the next round.
+ *
+ * Note: This is a big cost. Simply clearing [map] results in much less
+ * time and less GC.
+ */
+// for (STScheduler s : map.values()) {
+//
+// s.clear();
+//
+// }
+ map.clear();
+ }
+
+ @Override
+ public void compactFrontier(final IStaticFrontier frontier) {
+
+ /*
+ * Figure out the #of sources and the #of vertices across those sources.
+ *
+ * This also computes the cumulative offsets into the new frontier for
+ * the different per-thread segments.
+ */
+ final int[] off = new int[nthreads]; // zero for 1st thread.
+ final int nsources;
+ final int nvertices;
+ {
+ int ns = 0, nv = 0;
+ for (MySTScheduler s : map.values()) {
+ final MySTScheduler t = s;
+ final int sz = t.size();
+ off[ns] = nv; // starting index.
+ ns++;
+ nv += sz;
+ }
+ nsources = ns;
+ nvertices = nv;
+ }
+
+ if (nsources > nthreads) {
+
+ /*
+ * nsources could be LT nthreads if we have a very small frontier,
+ * but it should never be GTE nthreads.
+ */
+
+ throw new AssertionError("nsources=" + nsources + ", nthreads="
+ + nthreads);
+
+ }
+
+ if (nvertices == 0) {
+
+ /*
+ * The new frontier is empty.
+ */
+
+ frontier.resetFrontier(0/* minCapacity */, false/* sortFrontier */,
+ GASUtil.EMPTY_VERTICES_ITERATOR);
+
+ return;
+
+ }
+
+ /*
+ * Parallel copy of the per-thread frontiers onto the new frontier.
+ *
+ * Note: This DOES NOT produce a compact frontier! The code that maps
+ * the gather/reduce operations over the frontier will eliminate
+ * duplicate work.
+ */
+
+ // TODO Requires a specific class to work! API!
+ final StaticFrontier2 f2 = (StaticFrontier2) frontier;
+ {
+
+ // ensure sufficient capacity!
+ f2.resetAndEnsureCapacity(nvertices);
+ f2.setCompact(false); // NOT COMPACT!
+
+ final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(
+ nsources);
+
+ int i = 0;
+ for (MySTScheduler s : map.values()) { // TODO Paranoia suggests to put these into an [] so we know that we have the same trav...
[truncated message content] |