|
From: <tho...@us...> - 2013-09-10 15:31:13
|
Revision: 7393
http://bigdata.svn.sourceforge.net/bigdata/?rev=7393&view=rev
Author: thompsonbry
Date: 2013-09-10 15:31:04 +0000 (Tue, 10 Sep 2013)
Log Message:
-----------
Refactored the GASRunner into a base class and derived a BigdataGASRunner and a SAILGASRunner from it.
Initial performance with the memory sail does not look very good. About 1/2 of the throughput that we observe with bigdata. I am going to try this on a machine with more RAM and see how that helps.
See #629 (Graph mining API)
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java
Added Paths:
-----------
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASRunner.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASRunner.java
Removed Paths:
-------------
branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASRunner.java
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2013-09-10 13:55:55 UTC (rev 7392)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2013-09-10 15:31:04 UTC (rev 7393)
@@ -32,6 +32,7 @@
* the generic type for the per-edge state, but that is not always
* true. The SUM type is scoped to the GATHER + SUM operation (NOT
* the computation).
+ *
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
*/
public interface IGASProgram<VS, ES, ST> extends IGASOptions<VS, ES, ST> {
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASRunner.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASRunner.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASRunner.java 2013-09-10 15:31:04 UTC (rev 7393)
@@ -0,0 +1,181 @@
+package com.bigdata.rdf.graph.impl.sail;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.openrdf.model.Resource;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+import org.openrdf.sail.memory.MemoryStore;
+
+import com.bigdata.rdf.graph.IGASEngine;
+import com.bigdata.rdf.graph.IGraphAccessor;
+import com.bigdata.rdf.graph.impl.sail.SAILGASEngine.SAILGraphAccessor;
+import com.bigdata.rdf.graph.impl.util.GASRunnerBase;
+import com.bigdata.rdf.graph.util.GASUtil;
+
+/**
+ * Class for running GAS performance tests against the SAIL.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+public class SAILGASRunner<VS, ES, ST> extends GASRunnerBase<VS, ES, ST> {
+
+ private static final Logger log = Logger.getLogger(SAILGASRunner.class);
+
+ public SAILGASRunner(String[] args) throws ClassNotFoundException {
+ super(args);
+ }
+
+ protected class SAILOptionData extends GASRunnerBase<VS, ES, ST>.OptionData {
+
+ private Sail sail = null;
+
+ private SailConnection cxn = null;
+
+ @Override
+ public void init() throws Exception {
+
+ super.init();
+
+ sail = new MemoryStore();
+
+ sail.initialize();
+
+ cxn = sail.getConnection();
+
+ }
+
+ @Override
+ public void shutdown() {
+
+ if (cxn != null) {
+
+ try {
+
+ cxn.close();
+
+ } catch (SailException e) {
+
+ log.error(e, e);
+
+ } finally {
+
+ cxn = null;
+
+ }
+
+ }
+
+ if (sail != null) {
+
+ try {
+
+ sail.shutDown();
+
+ } catch (SailException e) {
+
+ log.error(e,e);
+
+ } finally {
+
+ sail = null;
+
+ }
+
+ }
+
+ }
+ @Override
+ public boolean handleArg(final AtomicInteger i, final String[] args) {
+ if (super.handleArg(i, args)) {
+ return true;
+ }
+// final String arg = args[i.get()];
+// if (arg.equals("-bufferMode")) {
+// final String s = args[i.incrementAndGet()];
+// bufferModeOverride = BufferMode.valueOf(s);
+// } else if (arg.equals("-namespace")) {
+// final String s = args[i.incrementAndGet()];
+// namespaceOverride = s;
+// } else {
+// return false;
+// }
+ return false;
+ }
+
+ @Override
+ public void report(final StringBuilder sb) {
+ // NOP
+ }
+
+ } // class SAILOptionData
+
+ @Override
+ protected SAILOptionData newOptionData() {
+
+ return new SAILOptionData();
+
+ }
+
+ @Override
+ protected IGASEngine newGASEngine() {
+
+ return new SAILGASEngine(getOptionData().nthreads);
+
+ }
+
+ @Override
+ protected void loadFiles() throws Exception {
+
+ final SAILOptionData opt = getOptionData();
+ final String[] resources = opt.loadSet.toArray(new String[0]);
+
+ boolean ok = false;
+ SailConnection cxn = null;
+ try {
+ cxn = opt.cxn;
+ new GASUtil().loadGraph(cxn, null/* fallback */, resources);
+ cxn.commit();
+ ok = true;
+ } finally {
+ if (cxn != null) {
+ if (!ok)
+ cxn.rollback();
+ // Note: using the same connection, so don't close here.
+// cxn.close();
+ }
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected SAILOptionData getOptionData() {
+
+ return (SAILOptionData) super.getOptionData();
+
+ }
+
+ @Override
+ protected IGraphAccessor newGraphAccessor() {
+
+ return new SAILGraphAccessor(getOptionData().cxn,
+ false/* includeInferred */, new Resource[0]/* defaultContext */);
+
+ }
+
+ /**
+ * Performance testing harness.
+ *
+ * @see #GASRunner(String[])
+ */
+ @SuppressWarnings("rawtypes")
+ public static void main(final String[] args) throws Exception {
+
+ new SAILGASRunner(args).call();
+
+ }
+
+}
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java 2013-09-10 15:31:04 UTC (rev 7393)
@@ -0,0 +1,464 @@
+package com.bigdata.rdf.graph.impl.util;
+
+import java.lang.reflect.Constructor;
+import java.util.LinkedHashSet;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.openrdf.model.Value;
+
+import com.bigdata.rdf.graph.IGASContext;
+import com.bigdata.rdf.graph.IGASEngine;
+import com.bigdata.rdf.graph.IGASProgram;
+import com.bigdata.rdf.graph.IGASScheduler;
+import com.bigdata.rdf.graph.IGASSchedulerImpl;
+import com.bigdata.rdf.graph.IGASState;
+import com.bigdata.rdf.graph.IGASStats;
+import com.bigdata.rdf.graph.IGraphAccessor;
+import com.bigdata.rdf.graph.impl.GASEngine;
+import com.bigdata.rdf.graph.impl.GASState;
+import com.bigdata.rdf.graph.impl.GASStats;
+
+/**
+ * Base class for running performance tests.
+ *
+ * @param <VS>
+ * The generic type for the per-vertex state. This is scoped to the
+ * computation of the {@link IGASProgram}.
+ * @param <ES>
+ * The generic type for the per-edge state. This is scoped to the
+ * computation of the {@link IGASProgram}.
+ * @param <ST>
+ * The generic type for the SUM. This is often directly related to
+ * the generic type for the per-edge state, but that is not always
+ * true. The SUM type is scoped to the GATHER + SUM operation (NOT
+ * the computation).
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ *
+ * TODO Do we need a different driver if the algorithm always visits all
+ * vertices? For such algorithms, we just run them once per graph
+ * (unless the graph is dynamic).
+ */
+//* @param <GE>
+//* The generic type for the {@link IGASEngine}.
+//* @param <BE>
+//* The generic type for the backend implementation.
+
+public abstract class GASRunnerBase<VS, ES, ST> implements
+ Callable<IGASStats> {
+
+ private static final Logger log = Logger.getLogger(GASRunnerBase.class);
+
+ /**
+ * Configured options for the {@link GASRunner}.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+ protected class OptionData {
+ /**
+ * The seed used for the random number generator (default {@value #seed}
+ * ).
+ */
+ public long seed = 217L;
+ /**
+ * Random number generated used for sampling the starting vertices. Set
+ * by #init().
+ */
+ public Random r = null;
+ /**
+ * The #of random starting vertices to use.
+ */
+ public int nsamples = 100;
+ /**
+ * The #of threads to use for GATHER and SCATTER operators.
+ */
+ public int nthreads = 4;
+ /**
+ * The analytic class to be executed.
+ */
+ public Class<IGASProgram<VS, ES, ST>> analyticClass;
+ /**
+ * The {@link IGASSchedulerImpl} class to use.
+ *
+ * TODO Override or always? If always, then where to get the default?
+ */
+ public Class<IGASSchedulerImpl> schedulerClassOverride;
+
+ /** Set of files to load (may be empty). */
+ public final LinkedHashSet<String> loadSet = new LinkedHashSet<String>();
+
+ /** The name of the implementation specific configuration file. */
+ public String propertyFile;
+
+ protected OptionData() {
+
+ }
+
+ /**
+ * Initialize any resources, including the connection to the backend.
+ */
+ public void init() throws Exception {
+
+ // Setup the random number generator.
+ this.r = new Random(seed);
+
+ r = new Random(seed);
+
+ }
+
+ /**
+ * Shutdown any resources, including the connection to the backend.
+ * <p>
+ * Note: This method must be safe. It may be called if {@link #init()}
+ * fails. It may be called more than once.
+ */
+ public void shutdown() {
+
+ }
+
+ /**
+ * Return <code>true</code>iff one or more arguments can be parsed
+ * starting at the specified index.
+ *
+ * @param i
+ * The index into the arguments.
+ * @param args
+ * The arguments.
+ * @return <code>true</code> iff any arguments were recognized.
+ */
+ public boolean handleArg(final AtomicInteger i, final String[] args) {
+
+ return false;
+
+ }
+
+ /**
+ * Print the optional message on stderr, print the usage information on
+ * stderr, and then force the program to exit with the given status code.
+ *
+ * @param status
+ * The status code.
+ * @param msg
+ * The optional message
+ */
+ public void usage(final int status, final String msg) {
+
+ if (msg != null) {
+
+ System.err.println(msg);
+
+ }
+
+ System.err.println("[options] analyticClass propertyFile");
+
+ System.exit(status);
+
+ }
+
+ /**
+ * Extension hook for reporting at the end of the test run.
+ *
+ * @param sb A buffer into which more information may be appended.
+ */
+ public void report(final StringBuilder sb) {
+
+ // NOP
+
+ }
+
+ } // class OptionData
+
+ /**
+ * The configuration metadata for the run.
+ */
+ private final OptionData opt;
+
+ /**
+ * Factory for the {@link OptionData}.
+ */
+ abstract protected OptionData newOptionData();
+
+ /**
+ * The {@link OptionData} for the run.
+ */
+ protected OptionData getOptionData() {
+
+ return opt;
+
+ }
+
+ /**
+ * Factory for the {@link IGASEngine}.
+ */
+ abstract protected IGASEngine newGASEngine();
+
+ /**
+ * Load files into the backend if they can not be assumed to already exist
+ * (a typical pattern is that files are loaded into an empty KB instance,
+ * but not loaded into a pre-existing one).
+ *
+ * @throws Exception
+ */
+ abstract protected void loadFiles() throws Exception;
+
+ /**
+ * Run a GAS analytic against some data set.
+ *
+ * @param args
+ * USAGE:<br/>
+ * <code>(options) analyticClass propertyFile</code>
+ * <p>
+ * <i>Where:</i>
+ * <dl>
+ * <dt>propertyFile</dt>
+ * <dd>The implementation specific property file or other type of
+ * configuration file.</dd>
+ * </dl>
+ * and <i>options</i> are any of:
+ * <dl>
+ * <dt>-nthreads</dt>
+ * <dd>The #of threads which will be used for GATHER and SCATTER
+ * operations.</dd>
+ * <dt>-nsamples</dt>
+ * <dd>The #of random sample starting vertices that will be
+ * selected. The algorithm will be run ONCE for EACH sampled
+ * vertex.</dd>
+ * <dt>-seed</dt>
+ * <dd>The seed for the random number generator (default is
+ * <code>217L</code>).</dd>
+ * <dt>-schedulerClass</dt>
+ * <dd>Override the default {@link IGASScheduler}. Class must
+ * implement {@link IGASSchedulerImpl}.</dd>
+ * <dt>-load</dt>
+ * <dd>Loads the named resource IFF the KB is empty (or does not
+ * exist) at the time this utility is executed. This option may
+ * appear multiple times. The resources will be searched for as
+ * URLs, on the CLASSPATH, and in the file system.</dd>
+ * </p>
+ * @throws ClassNotFoundException
+ */
+ public GASRunnerBase(final String[] args) throws ClassNotFoundException {
+
+ final OptionData opt = newOptionData();
+
+ /*
+ * Handle all arguments starting with "-". These should appear before
+ * any non-option arguments to the program.
+ */
+ final AtomicInteger i = new AtomicInteger(0);
+ while (i.get() < args.length) {
+ final String arg = args[i.get()];
+ if (arg.startsWith("-")) {
+ if (arg.equals("-seed")) {
+ opt.seed = Long.valueOf(args[i.incrementAndGet()]);
+ } else if (arg.equals("-nsamples")) {
+ final String s = args[i.incrementAndGet()];
+ opt.nsamples = Integer.valueOf(s);
+ if (opt.nsamples <= 0) {
+ opt.usage(1/* status */,
+ "-nsamples must be positive, not: " + s);
+ }
+ } else if (arg.equals("-nthreads")) {
+ final String s = args[i.incrementAndGet()];
+ opt.nthreads = Integer.valueOf(s);
+ if (opt.nthreads < 0) {
+ opt.usage(1/* status */,
+ "-nthreads must be non-negative, not: " + s);
+ }
+ } else if (arg.equals("-schedulerClass")) {
+ final String s = args[i.incrementAndGet()];
+ opt.schedulerClassOverride = (Class<IGASSchedulerImpl>) Class.forName(s);
+ } else if (arg.equals("-load")) {
+ final String s = args[i.incrementAndGet()];
+ opt.loadSet.add(s);
+ } else {
+ if (!opt.handleArg(i, args)) {
+ opt.usage(1/* status */, "Unknown argument: " + arg);
+ }
+ }
+ } else {
+ break;
+ }
+ i.incrementAndGet();
+ }
+
+ /*
+ * Check for the remaining (required) argument(s).
+ */
+ final int nremaining = args.length - i.get();
+ if (nremaining != 2) {
+ /*
+ * There are either too many or too few arguments remaining.
+ */
+ opt.usage(1/* status */, nremaining < 1 ? "Too few arguments."
+ : "Too many arguments");
+ }
+
+ /*
+ * The analytic to be executed.
+ */
+ {
+
+ final String s = args[i.getAndIncrement()];
+
+ opt.analyticClass = (Class<IGASProgram<VS, ES, ST>>) Class
+ .forName(s);
+
+ }
+
+ /*
+ * Property file.
+ */
+ opt.propertyFile = args[i.getAndIncrement()];
+
+ this.opt = opt; // assign options.
+
+ }
+
+ /**
+ * Return the object used to access the as-configured graph.
+ */
+ abstract protected IGraphAccessor newGraphAccessor();
+
+ /**
+ * Return an instance of the {@link IGASProgram} to be evaluated.
+ */
+ protected IGASProgram<VS, ES, ST> newGASProgram() {
+
+ final Class<IGASProgram<VS, ES, ST>> cls = (Class<IGASProgram<VS, ES, ST>>)opt.analyticClass;
+
+ try {
+
+ final Constructor<IGASProgram<VS, ES, ST>> ctor = cls
+ .getConstructor(new Class[] {});
+
+ final IGASProgram<VS, ES, ST> gasProgram = ctor
+ .newInstance(new Object[] {});
+
+ return gasProgram;
+
+ } catch (Exception e) {
+
+ throw new RuntimeException(e);
+
+ }
+
+ }
+
+ /**
+ * Run the test.
+ * <p>
+ * This provides a safe pattern for either loading data into a temporary
+ * journal, which is then destroyed, or using an exiting journal and
+ * optionally loading in some data set. When we load the data the journal is
+ * destroyed afterwards and when the journal is pre-existing and we neither
+ * load the data nor destroy the journal. This has to do with the effective
+ * BufferMode (if transient) and whether the file is specified and whether a
+ * temporary file is created (CREATE_TEMP_FILE). If we do our own file
+ * create if the effective buffer mode is non-transient, then we can get all
+ * this information.
+ */
+ @Override
+ final public IGASStats call() throws Exception {
+
+ try {
+
+ // initialize backend / connection to backend.
+ opt.init();
+
+ // Load data sets
+ loadFiles();
+
+ // Run GAS program.
+ return runAnalytic();
+
+ } finally {
+
+ // Shutdown backend / connection to backend.
+ opt.shutdown();
+
+ }
+
+ }
+
+ /**
+ * Run the analytic.
+ *
+ * @return The performance statistics for the run.
+ *
+ * @throws Exception
+ */
+ final protected IGASStats runAnalytic() throws Exception {
+
+ final IGASEngine gasEngine = newGASEngine();
+
+ try {
+
+ if (opt.schedulerClassOverride != null) {
+
+ ((GASEngine) gasEngine)
+ .setSchedulerClass(opt.schedulerClassOverride);
+
+ }
+
+ final IGASProgram<VS, ES, ST> gasProgram = newGASProgram();
+
+ final IGraphAccessor graphAccessor = newGraphAccessor();
+
+ final IGASContext<VS, ES, ST> gasContext = gasEngine.newGASContext(
+ graphAccessor, gasProgram);
+
+ final IGASState<VS, ES, ST> gasState = gasContext.getGASState();
+
+ final VertexDistribution dist = graphAccessor.getDistribution(opt.r);
+
+ final Value[] samples = dist.getWeightedSample(opt.nsamples);
+
+ final IGASStats total = new GASStats();
+
+ for (int i = 0; i < samples.length; i++) {
+
+ final Value startingVertex = samples[i];
+
+ gasState.init(startingVertex);
+
+ final IGASStats stats = (IGASStats) gasContext.call();
+
+ total.add(stats);
+
+ if (log.isInfoEnabled()) {
+ log.info("Run complete: vertex[" + i + "] of "
+ + samples.length + " : startingVertex="
+ + startingVertex + ", stats(sample)=" + stats);
+ }
+
+ }
+
+ // Total over all sampled vertices.
+ final StringBuilder sb = new StringBuilder();
+ sb.append("TOTAL");
+ sb.append(": analytic=" + gasProgram.getClass().getSimpleName());
+ sb.append(", nseed=" + opt.seed);
+ sb.append(", nsamples=" + opt.nsamples);
+ sb.append(", nthreads=" + opt.nthreads);
+ sb.append(", scheduler=" + ((GASState<VS, ES, ST>)gasState).getScheduler().getClass().getSimpleName());
+ sb.append(", gasEngine=" + gasEngine.getClass().getSimpleName());
+ opt.report(sb); // extension hook.
+ // performance results.
+ sb.append(", stats(total)=" + total);
+ System.out.println(sb);
+
+ return total;
+
+ } finally {
+
+ gasEngine.shutdownNow();
+
+ }
+
+ }
+
+}
Copied: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASRunner.java (from rev 7382, branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASRunner.java)
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASRunner.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASRunner.java 2013-09-10 15:31:04 UTC (rev 7393)
@@ -0,0 +1,544 @@
+package com.bigdata.rdf.graph.impl.bd;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.lang.reflect.Constructor;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.openrdf.sail.SailConnection;
+
+import com.bigdata.Banner;
+import com.bigdata.journal.BufferMode;
+import com.bigdata.journal.ITx;
+import com.bigdata.journal.Journal;
+import com.bigdata.rdf.graph.IGASProgram;
+import com.bigdata.rdf.graph.IGraphAccessor;
+import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor;
+import com.bigdata.rdf.graph.impl.util.GASRunnerBase;
+import com.bigdata.rdf.graph.util.GASUtil;
+import com.bigdata.rdf.sail.BigdataSail;
+import com.bigdata.rdf.store.AbstractTripleStore;
+
+/**
+ * Base class for running performance tests against the bigdata backend.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+public class BigdataGASRunner<VS, ES, ST> extends GASRunnerBase<VS, ES, ST> {
+
+ private static final Logger log = Logger.getLogger(BigdataGASRunner.class);
+
+ /**
+ * Configured options for the {@link GASRunner}.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+ protected class BigdataOptionData extends
+ GASRunnerBase<VS, ES, ST>.OptionData {
+
+ /**
+ * The {@link BufferMode} to use.
+ */
+ private BufferMode bufferModeOverride = null; // override only.
+
+ /**
+ * The namespace of the bigdata KB instance.
+ */
+ private String namespaceOverride = "kb";
+
+ /**
+ * The as-configured {@link Properties} for the {@link Journal}.
+ */
+ private Properties properties;
+
+ /**
+ * The effective KB name. This is set by consulting
+ * {@link #namespaceOverride} and the as configured {@link #properties}.
+ */
+ private String namespace;
+
+ /**
+ * The backend.
+ *
+ * TODO Could start NSS and use SPARQL UPDATE "LOAD" to load the data.
+ * That exposes the SPARQL end point for other purposes during the test.
+ * Is this useful? It could also let us run the GASEngine on a remote
+ * service (submit a callable to an HA server or define a REST API for
+ * submitting these GAS algorithms).
+ */
+ private Journal jnl;
+
+ /**
+ * <code>true</code> iff the backend is temporary (created on a
+ * temporary backing file). Temporary backends are destroyed in
+ * {@link #shutdown()}.
+ */
+ private boolean isTemporary;
+
+ /**
+ * Set to <code>true</code> iff we determine that the data needs to be
+ * loaded (e.g., the KB was empty, so we have to load the data sets).
+ *
+ * TODO Rename for clearer semantics. Basically, do we have to load the
+ * data files or can we assume that the data are already loaded. Lift
+ * into base class?
+ */
+ private boolean newKB = false;
+
+ /**
+ * The #of edges in the KB instance and <code>-1</code> until set by
+ * {@link BigdataGASRunner#loadFiles()}.
+ */
+ private long nedges = -1;
+
+ protected BigdataOptionData() {
+
+ super();
+
+ }
+
+ private Properties getProperties(final String resource) throws IOException {
+
+ if (log.isInfoEnabled())
+ log.info("Reading properties: " + resource);
+
+ InputStream is = null;
+ try {
+
+ // try the classpath
+ is = getClass().getResourceAsStream(resource);
+
+ if (is != null) {
+
+ } else {
+
+ // try file system.
+ final File file = new File(resource);
+
+ if (file.exists()) {
+
+ is = new FileInputStream(file);
+
+ } else {
+
+ throw new IOException("Could not locate resource: "
+ + resource);
+
+ }
+
+ }
+
+ /*
+ * Obtain a buffered reader on the input stream.
+ */
+
+ final Properties properties = new Properties();
+
+ final Reader reader = new BufferedReader(new InputStreamReader(is));
+
+ try {
+
+ properties.load(reader);
+
+ } finally {
+
+ try {
+
+ reader.close();
+
+ } catch (Throwable t) {
+
+ log.error(t);
+
+ }
+
+ }
+
+ /*
+ * Allow override of select options from the command line.
+ */
+ {
+ final String[] overrides = new String[] {
+ // Journal options.
+ com.bigdata.journal.Options.FILE,
+// // RDFParserOptions.
+// RDFParserOptions.Options.DATATYPE_HANDLING,
+// RDFParserOptions.Options.PRESERVE_BNODE_IDS,
+// RDFParserOptions.Options.STOP_AT_FIRST_ERROR,
+// RDFParserOptions.Options.VERIFY_DATA,
+// // DataLoader options.
+// DataLoader.Options.BUFFER_CAPACITY,
+// DataLoader.Options.CLOSURE,
+// DataLoader.Options.COMMIT,
+// DataLoader.Options.FLUSH,
+ };
+ for (String s : overrides) {
+ if (System.getProperty(s) != null) {
+ // Override/set from the environment.
+ final String v = System.getProperty(s);
+ if (log.isInfoEnabled())
+ log.info("OVERRIDE:: Using: " + s + "=" + v);
+ properties.setProperty(s, v);
+ }
+ }
+ }
+
+ return properties;
+
+ } finally {
+
+ if (is != null) {
+
+ try {
+
+ is.close();
+
+ } catch (Throwable t) {
+
+ log.error(t);
+
+ }
+
+ }
+
+ }
+
+ }
+
+ /**
+ * Initialization after all arguments have been set.
+ */
+ @Override
+ public void init() throws Exception {
+
+ super.init();
+
+ properties = getProperties(propertyFile);
+
+ /*
+ * Note: Allows override through the command line argument. The default
+ * is otherwise the default and the value in the properties file (if
+ * any) will be used unless it is overridden.
+ */
+ final BufferMode bufferMode = bufferModeOverride == null ? BufferMode
+ .valueOf(properties.getProperty(Journal.Options.BUFFER_MODE,
+ Journal.Options.DEFAULT_BUFFER_MODE)) : bufferModeOverride;
+
+ properties.setProperty(Journal.Options.BUFFER_MODE, bufferMode.name());
+
+ final boolean isTransient = !bufferMode.isStable();
+
+ if (isTransient) {
+
+ isTemporary = true;
+
+ } else {
+
+ final String fileStr = properties.getProperty(Journal.Options.FILE);
+
+ if (fileStr == null) {
+
+ /*
+ * We will use a temporary file that we create here. The journal
+ * will be destroyed below.
+ */
+ isTemporary = true;
+
+ final File tmpFile = File.createTempFile(
+ BigdataGASRunner.class.getSimpleName(),
+ Journal.Options.JNL);
+
+ // Set this on the Properties so it will be used by the jnl.
+ properties.setProperty(Journal.Options.FILE,
+ tmpFile.getAbsolutePath());
+
+ } else {
+
+ // real file is named.
+ isTemporary = false;
+
+ }
+
+ }
+
+ // The effective KB name.
+ namespace = namespaceOverride == null ? properties
+ .getProperty(BigdataSail.Options.NAMESPACE,
+ BigdataSail.Options.DEFAULT_NAMESPACE) : namespaceOverride;
+
+ properties.setProperty(BigdataSail.Options.NAMESPACE, namespace);
+
+ // Open Journal.
+ jnl = new Journal(properties);
+
+ // Locate/create KB.
+ {
+ final AbstractTripleStore kb;
+ if (isTemporary) {
+
+ kb = BigdataSail.createLTS(jnl, properties);
+ newKB = true;
+
+ } else {
+
+ final AbstractTripleStore tmp = (AbstractTripleStore) jnl
+ .getResourceLocator().locate(namespace,
+ ITx.UNISOLATED);
+
+ if (tmp == null) {
+
+ // create.
+ kb = BigdataSail.createLTS(jnl, properties);
+ newKB = true;
+
+ } else {
+
+ kb = tmp;
+ newKB = kb.getStatementCount() == 0L;
+
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+
+ if (jnl != null) {
+
+ if (isTemporary) {
+
+ log.warn("Destroying temporary journal.");
+
+ jnl.destroy();
+
+ } else {
+
+ jnl.close();
+
+ }
+
+ }
+
+ super.shutdown();
+
+ }
+
+ /**
+ * Return <code>true</code>iff one or more arguments can be parsed
+ * starting at the specified index.
+ *
+ * @param i
+ * The index into the arguments.
+ * @param args
+ * The arguments.
+ * @return <code>true</code> iff any arguments were recognized.
+ */
+ @Override
+ public boolean handleArg(final AtomicInteger i, final String[] args) {
+ if (super.handleArg(i, args)) {
+ return true;
+ }
+ final String arg = args[i.get()];
+ if (arg.equals("-bufferMode")) {
+ final String s = args[i.incrementAndGet()];
+ bufferModeOverride = BufferMode.valueOf(s);
+ } else if (arg.equals("-namespace")) {
+ final String s = args[i.incrementAndGet()];
+ namespaceOverride = s;
+ } else {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * TODO report #of vertices (DISTINCT UNION (?s, ?o)
+ *
+ * TODO What happened to the predicate summary/histogram/distribution
+ * code?
+ */
+ @Override
+ public void report(final StringBuilder sb) {
+ sb.append(", edges(kb)=" + nedges);
+ sb.append(", namespace=" + namespace);//
+ sb.append(", bufferMode=" + jnl.getBufferStrategy().getBufferMode());
+ }
+ }
+
+ /**
+ * Factory for the {@link OptionData}.
+ */
+ @Override
+ protected OptionData newOptionData() {
+
+ return new BigdataOptionData();
+
+ }
+
+ @Override
+ protected BigdataGASEngine newGASEngine() {
+
+ final BigdataOptionData opt = getOptionData();
+
+ return new BigdataGASEngine(opt.jnl, opt.nthreads);
+
+ }
+
+ @Override
+ protected IGraphAccessor newGraphAccessor() {
+
+ final BigdataOptionData opt = getOptionData();
+
+ /*
+ * Use a read-only view (sampling depends on access to the BTree rather
+ * than the ReadCommittedIndex).
+ */
+ final BigdataGraphAccessor graphAccessor = new BigdataGraphAccessor(
+ opt.jnl, opt.namespace, opt.jnl.getLastCommitTime());
+
+ return graphAccessor;
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected BigdataOptionData getOptionData() {
+
+ return (BigdataOptionData) super.getOptionData();
+
+ }
+
+ /**
+ * Run a GAS analytic against some data set.
+ *
+ * @param args
+ * USAGE:<br/>
+ * <code>(options) analyticClass propertyFile</code>
+ * <p>
+ * <i>Where:</i>
+ * <dl>
+ * <dt>propertyFile</dt>
+ * <dd>A java properties file for a standalone {@link Journal}.</dd>
+ * </dl>
+ * and <i>options</i> are any of the options defined for the
+ * {@link GASRunnerBase} PLUS any of:
+ * <dl>
+ * <dt>-bufferMode</dt>
+ * <dd>Overrides the {@link BufferMode} (if any) specified in the
+ * <code>propertyFile</code>.</dd>
+ * <dt>-namespace</dt>
+ * <dd>The namespace of the default SPARQL endpoint (the
+ * namespace will be <code>kb</code> if none was specified when
+ * the triple/quad store was created).</dd>
+ * </p>
+ * @throws ClassNotFoundException
+ */
+ public BigdataGASRunner(final String[] args) throws ClassNotFoundException {
+
+ super(args);
+
+ Banner.banner();
+
+ }
+
+ /**
+ * Return an instance of the {@link IGASProgram} to be evaluated.
+ */
+ protected IGASProgram<VS, ES, ST> newGASProgram() {
+
+ final Class<IGASProgram<VS, ES, ST>> cls = getOptionData().analyticClass;
+
+ try {
+
+ final Constructor<IGASProgram<VS, ES, ST>> ctor = cls
+ .getConstructor(new Class[] {});
+
+ final IGASProgram<VS, ES, ST> gasProgram = ctor
+ .newInstance(new Object[] {});
+
+ return gasProgram;
+
+ } catch (Exception e) {
+
+ throw new RuntimeException(e);
+
+ }
+
+ }
+
+ @Override
+ public void loadFiles() throws IOException {
+
+ final BigdataOptionData opt = getOptionData();
+
+ final Journal jnl = opt.jnl;
+ final String namespace = opt.namespace;
+ final String[] loadSet = opt.loadSet.toArray(new String[0]);
+
+ // Load data using the unisolated view.
+ final AbstractTripleStore kb = (AbstractTripleStore) jnl
+ .getResourceLocator().locate(namespace, ITx.UNISOLATED);
+
+ if (opt.newKB && loadSet.length > 0) {
+
+ final BigdataSail sail = new BigdataSail(kb);
+ try {
+ try {
+ sail.initialize();
+ loadFiles(sail, loadSet);
+ } finally {
+ if (sail.isOpen())
+ sail.shutDown();
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ // total #of edges in that graph.
+ opt.nedges = kb.getStatementCount();
+
+ }
+
+ private void loadFiles(final BigdataSail sail, final String[] loadSet)
+ throws Exception {
+ boolean ok = false;
+ final SailConnection cxn = sail.getUnisolatedConnection();
+ try {
+ for (String f : loadSet) {
+ new GASUtil()
+ .loadGraph(cxn, null/* fallback */, f/* resource */);
+ }
+ cxn.commit();
+ ok = true;
+ } finally {
+ if (!ok)
+ cxn.rollback();
+ cxn.close();
+ }
+ }
+
+ /**
+ * Performance testing harness.
+ *
+ * @see #GASRunner(String[])
+ */
+ @SuppressWarnings("rawtypes")
+ public static void main(final String[] args) throws Exception {
+
+ new BigdataGASRunner(args).call();
+
+ }
+
+}
Deleted: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASRunner.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASRunner.java 2013-09-10 13:55:55 UTC (rev 7392)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASRunner.java 2013-09-10 15:31:04 UTC (rev 7393)
@@ -1,737 +0,0 @@
-package com.bigdata.rdf.graph.impl.bd;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.lang.reflect.Constructor;
-import java.util.LinkedHashSet;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Callable;
-
-import org.apache.log4j.Logger;
-import org.openrdf.model.Value;
-import org.openrdf.sail.SailConnection;
-
-import com.bigdata.Banner;
-import com.bigdata.journal.BufferMode;
-import com.bigdata.journal.ITx;
-import com.bigdata.journal.Journal;
-import com.bigdata.rdf.graph.IGASContext;
-import com.bigdata.rdf.graph.IGASEngine;
-import com.bigdata.rdf.graph.IGASProgram;
-import com.bigdata.rdf.graph.IGASScheduler;
-import com.bigdata.rdf.graph.IGASSchedulerImpl;
-import com.bigdata.rdf.graph.IGASState;
-import com.bigdata.rdf.graph.IGASStats;
-import com.bigdata.rdf.graph.impl.GASEngine;
-import com.bigdata.rdf.graph.impl.GASState;
-import com.bigdata.rdf.graph.impl.GASStats;
-import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor;
-import com.bigdata.rdf.graph.impl.util.VertexDistribution;
-import com.bigdata.rdf.graph.util.GASUtil;
-import com.bigdata.rdf.sail.BigdataSail;
-import com.bigdata.rdf.store.AbstractTripleStore;
-
-/**
- * Base class for running performance tests.
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- *
- * TODO Do we need a different driver if the algorithm always visits all
- * vertices? For such algorithms, we just run them once per graph
- * (unless the graph is dynamic).
- *
- * FIXME API: GASRunner needs to use a factory pattern for different
- * backends. It should then be moved to the util package and the
- * bigdata-gas module.
- */
-public class GASRunner<VS, ES, ST> implements Callable<IGASStats> {
-
- private static final Logger log = Logger.getLogger(GASRunner.class);
-
- /**
- * The seed used for the random number generator.
- */
- private final long seed;
-
- /**
- * Random number generated used for sampling the starting vertices.
- */
- private final Random r;
-
- /**
- * The #of random starting vertices to use.
- */
- private final int nsamples;
-
- /**
- * The #of threads to use for GATHER and SCATTER operators.
- */
- private final int nthreads;
-
- /**
- * The analytic class to be executed.
- */
- private final Class<IGASProgram<VS, ES, ST>> analyticClass;
-
- /**
- * The property file
- */
- private final String propertyFile;
-
- /**
- * When non-<code>null</code>, this overrides the current buffer mode.
- */
- private final BufferMode bufferModeOverride;
-
- /**
- * When non-<code>null</code>, this overrides the KB namespace.
- */
- private final String namespaceOverride;
-
- /**
- * The {@link IGASSchedulerImpl} class to use.
- */
- private final Class<IGASSchedulerImpl> schedulerClassOverride;
-
- /**
- * When non-<code>null</code>, a list of zero or more resources to be
- * loaded. The resources will be searched for as URLs, on the CLASSPATH, and
- * in the file system.
- */
- private final String[] loadSet;
-
- /**
- * Print the optional message on stderr, print the usage information on
- * stderr, and then force the program to exit with the given status code.
- *
- * @param status
- * The status code.
- * @param msg
- * The optional message
- */
- private static void usage(final int status, final String msg) {
-
- if (msg != null) {
-
- System.err.println(msg);
-
- }
-
- System.err.println("[options] analyticClass propertyFile");
-
- System.exit(status);
-
- }
-
- /**
- * Run a GAS analytic against some data set.
- *
- * @param args
- * USAGE:<br/>
- * <code>(options) analyticClass propertyFile</code>
- * <p>
- * <i>Where:</i>
- * <dl>
- * <dt>propertyFile</dt>
- * <dd>A java properties file for a standalone {@link Journal}.</dd>
- * </dl>
- * and <i>options</i> are any of:
- * <dl>
- * <dt>-nthreads</dt>
- * <dd>The #of threads which will be used for GATHER and SCATTER
- * operations.</dd>
- * <dt>-nsamples</dt>
- * <dd>The #of random sample starting vertices that will be
- * selected. The algorithm will be run ONCE for EACH sampled
- * vertex.</dd>
- * <dt>-seed</dt>
- * <dd>The seed for the random number generator (default is
- * <code>217L</code>).</dd>
- * <dt>-bufferMode</dt>
- * <dd>Overrides the {@link BufferMode} (if any) specified in the
- * <code>propertyFile</code>.</dd>
- * <dt>-schedulerClass</dt>
- * <dd>Override the default {@link IGASScheduler}. Class must
- * implement {@link IGASSchedulerImpl}.</dd>
- * <dt>-namespace</dt>
- * <dd>The namespace of the default SPARQL endpoint (the
- * namespace will be <code>kb</code> if none was specified when
- * the triple/quad store was created).</dd>
- * <dt>-load</dt>
- * <dd>Loads the named resource IFF the KB is empty (or does not
- * exist) at the time this utility is executed. This option may
- * appear multiple times. The resources will be searched for as
- * URLs, on the CLASSPATH, and in the file system.</dd>
- * </p>
- * @throws ClassNotFoundException
- */
- public GASRunner(final String[] args) throws ClassNotFoundException {
-
- Banner.banner();
-
- long seed = 217L;
- int nsamples = 100;
- int nthreads = 4;
- BufferMode bufferMode = null; // override only.
- Class<IGASSchedulerImpl> schedulerClass = null; // override only.
- String namespace = "kb";
- // Set of files to load (optional).
- LinkedHashSet<String> loadSet = new LinkedHashSet<String>();
-
- /*
- * Handle all arguments starting with "-". These should appear before
- * any non-option arguments to the program.
- */
- int i = 0;
- while (i < args.length) {
- final String arg = args[i];
- if (arg.startsWith("-")) {
- if (arg.equals("-seed")) {
- seed = Long.valueOf(args[++i]);
- } else if (arg.equals("-nsamples")) {
- final String s = args[++i];
- nsamples = Integer.valueOf(s);
- if (nsamples <= 0) {
- usage(1/* status */,
- "-nsamples must be positive, not: " + s);
- }
- } else if (arg.equals("-nthreads")) {
- final String s = args[++i];
- nthreads = Integer.valueOf(s);
- if (nthreads < 0) {
- usage(1/* status */,
- "-nthreads must be non-negative, not: " + s);
- }
- } else if (arg.equals("-bufferMode")) {
- final String s = args[++i];
- bufferMode = BufferMode.valueOf(s);
- } else if (arg.equals("-schedulerClass")) {
- final String s = args[++i];
- schedulerClass = (Class<IGASSchedulerImpl>) Class.forName(s);
- } else if (arg.equals("-namespace")) {
- final String s = args[++i];
- namespace = s;
- } else if (arg.equals("-load")) {
- final String s = args[++i];
- loadSet.add(s);
- } else {
- usage(1/* status */, "Unknown argument: " + arg);
- }
- } else {
- break;
- }
- i++;
- }
-
- /*
- * Check for the remaining (required) argument(s).
- */
- final int nremaining = args.length - i;
- if (nremaining != 2) {
- /*
- * There are either too many or too few arguments remaining.
- */
- usage(1/* status */, nremaining < 1 ? "Too few arguments."
- : "Too many arguments");
- }
-
- /*
- * The analytic to be executed.
- */
- {
-
- final String s = args[i++];
-
- this.analyticClass = (Class<IGASProgram<VS, ES, ST>>) Class
- .forName(s);
-
- }
-
- /*
- * Property file.
- */
- this.propertyFile = args[i++];
-
- /*
- * Assign parsed values.
- */
- this.seed = seed;
- this.nsamples = nsamples;
- this.nthreads = nthreads;
- this.namespaceOverride = namespace;
- this.bufferModeOverride = bufferMode;
- this.schedulerClassOverride = schedulerClass;
- this.loadSet = loadSet.isEmpty() ? null : loadSet
- .toArray(new String[loadSet.size()]);
-
- // Setup the random number generator.
- this.r = new Random(seed);
-
- }
-
- /**
- * Return an instance of the {@link IGASProgram} to be evaluated.
- */
- protected IGASProgram<VS, ES, ST> newGASProgram() {
-
- final Class<IGASProgram<VS, ES, ST>> cls = analyticClass;
-
- try {
-
- final Constructor<IGASProgram<VS, ES, ST>> ctor = cls
- .getConstructor(new Class[] {});
-
- final IGASProgram<VS, ES, ST> gasProgram = ctor
- .newInstance(new Object[] {});
-
- return gasProgram;
-
- } catch (Exception e) {
-
- throw new RuntimeException(e);
-
- }
-
- }
-
- private Properties getProperties(final String resource) throws IOException {
-
- if (log.isInfoEnabled())
- log.info("Reading properties: " + resource);
-
- InputStream is = null;
- try {
-
- // try the classpath
- is = getClass().getResourceAsStream(resource);
-
- if (is != null) {
-
- } else {
-
- // try file system.
- final File file = new File(resource);
-
- if (file.exists()) {
-
- is = new FileInputStream(file);
-
- } else {
-
- throw new IOException("Could not locate resource: "
- + resource);
-
- }
-
- }
-
- /*
- * Obtain a buffered reader on the input stream.
- */
-
- final Properties properties = new Properties();
-
- final Reader reader = new BufferedReader(new InputStreamReader(is));
-
- try {
-
- properties.load(reader);
-
- } finally {
-
- try {
-
- reader.close();
-
- } catch (Throwable t) {
-
- log.error(t);
-
- }
-
- }
-
- /*
- * Allow override of select options from the command line.
- */
- {
- final String[] overrides = new String[] {
- // Journal options.
- com.bigdata.journal.Options.FILE,
-// // RDFParserOptions.
-// RDFParserOptions.Options.DATATYPE_HANDLING,
-// RDFParserOptions.Options.PRESERVE_BNODE_IDS,
-// RDFParserOptions.Options.STOP_AT_FIRST_ERROR,
-// RDFParserOptions.Options.VERIFY_DATA,
-// // DataLoader options.
-// DataLoader.Options.BUFFER_CAPACITY,
-// DataLoader.Options.CLOSURE,
-// DataLoader.Options.COMMIT,
-// DataLoader.Options.FLUSH,
- };
- for (String s : overrides) {
- if (System.getProperty(s) != null) {
- // Override/set from the environment.
- final String v = System.getProperty(s);
- if (log.isInfoEnabled())
- log.info("OVERRIDE:: Using: " + s + "=" + v);
- properties.setProperty(s, v);
- }
- }
- }
-
- return properties;
-
- } finally {
-
- if (is != null) {
-
- try {
-
- is.close();
-
- } catch (Throwable t) {
-
- log.error(t);
-
- }
-
- }
-
- }
-
- }
-
- /**
- * Run the test.
- * <p>
- * This provides a safe pattern for either loading data into a temporary
- * journal, which is then destroyed, or using an exiting journal and
- * optionally loading in some data set. When we load the data the journal is
- * destroyed afterwards and when the journal is pre-existing and we neither
- * load the data nor destroy the journal. This has to do with the effective
- * BufferMode (if transient) and whether the file is specified and whether a
- * temporary file is created (CREATE_TEMP_FILE). If we do our own file
- * create if the effective buffer mode is non-transient, then we can get all
- * this information.
- */
- public IGASStats call() throws Exception {
-
- final Properties properties = getProperties(propertyFile);
-
- /*
- * Note: Allows override through the command line argument. The default
- * is otherwise the default and the value in the properties file (if
- * any) will be used unless it is overridden.
- */
- final BufferMode bufferMode = this.bufferModeOverride == null ? BufferMode
- .valueOf(properties.getProperty(Journal.Options.BUFFER_MODE,
- Journal.Options.DEFAULT_BUFFER_MODE)) : this.bufferModeOverride;
-
- properties.setProperty(Journal.Options.BUFFER_MODE, bufferMode.name());
-
- final boolean isTransient = !bufferMode.isStable();
-
- final boolean isTemporary;
- if (isTransient) {
-
- isTemporary = true;
-
- } else {
-
- final String fileStr = properties.getProperty(Journal.Options.FILE);
-
- if (fileStr == null) {
-
- /*
- * We will use a temporary file that we create here. The journal
- * will be destroyed below.
- */
- isTemporary = true;
-
- final File tmpFile = File.createTempFile(
- GASRunner.class.getSimpleName(), Journal.Options.JNL);
-
- // Set this on the Properties so it will be used by the jnl.
- properties.setProperty(Journal.Options.FILE,
- tmpFile.getAbsolutePath());
-
- } else {
-
- // real file is named.
- isTemporary = false;
-
- }
-
- }
-
- // The effective KB name.
- final String namespace = this.namespaceOverride == null ? properties
- .getProperty(BigdataSail.Options.NAMESPACE,
- BigdataSail.Options.DEFAULT_NAMESPACE) : this.namespaceOverride;
-
- properties.setProperty(BigdataSail.Options.NAMESPACE, namespace);
-
- /*
- * TODO Could start NSS and use SPARQL UPDATE "LOAD" to load the data.
- * That exposes the SPARQL end point for other purposes during the test.
- * Is this useful? It could also let us run the GASEngine on a remote
- * service (submit a callable to an HA server or define a REST API for
- * submitting these GAS algorithms).
- */
- final Journal jnl = new Journal(properties);
-
- try {
-
- // Locate/create KB.
- final boolean newKB;
- {
- final AbstractTripleStore kb;
- if (isTemporary) {
-
- kb = BigdataSail.createLTS(jnl, properties);
- newKB = true;
-
- } else {
-
- final AbstractTripleStore tmp = (AbstractTripleStore) jnl
- .getResourceLocator().locate(namespace,
- ITx.UNISOLATED);
-
- if (tmp == null) {
-
- // create.
- kb = BigdataSail.createLTS(jnl, properties);
- newKB = true;
-
- } else {
-
- kb = tmp;
- newKB = kb.getStatementCount() == 0L;
-
- }
-
- }
- }
-
- /*
- * Load data sets (iff new KB).
- */
- if (newKB && (loadSet != null && loadSet.length > 0)) {
-
- loadFiles(jnl, namespace, loadSet);
-
- }
-
- return runAnalytic(jnl, namespace);
-
- } finally {
-
- if (isTemporary) {
-
- log.warn("Destroying temporary journal.");
-
- jnl.destroy();
-
- } else {
-
- jnl.close();
-
- }
-
- }
-
- }
-
- /**
- * Load files into the journal.
- *
- * @param jnl
- * The journal.
- * @param namespace
- * The KB namespace.
- * @param loadSet
- * The files.
- * @throws IOException
- */
- private void loadFiles(final Journal jnl, final String namespace,
- final String[] loadSet) throws IOException {
-
- // Load data using the unisolated view.
- final AbstractTripleStore kb = (AbstractTripleStore) jnl
- .getResourceLocator().locate(namespace, ITx.UNISOLATED);
- final BigdataSail sail = new BigdataSail(kb);
- try {
- try {
- sail.initialize();
- boolean ok = false;
- final SailConnection cxn = sail.getUnisolatedConnection();
- try {
- for (String f : loadSet) {
- new GASUtil()
- .loadGraph(cxn, null/* fallback */, f/* resource */);
- }
- cxn.commit();
- ok = true;
- } finally {
- if (!ok)
- cxn.rollback();
- cxn.close();
- }
- } finally {
- if (sail.isOpen())
- sail.shutDown();
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
-
- // final String path = "bigdata-rdf/src/resources/data/foaf";
- // final String dataFile[] = new String[] {//
- // path + "/data-0.nq.gz",//
- // path + "/data-1.nq.gz",//
- // path + "/data-2.nq.gz",//
- // path + "/data-3.nq.gz",//
- // };
-// final String baseUrl[] = new String[loadSet.le...
[truncated message content] |