From: <tho...@us...> - 2013-09-10 15:31:13
|
Revision: 7393 http://bigdata.svn.sourceforge.net/bigdata/?rev=7393&view=rev Author: thompsonbry Date: 2013-09-10 15:31:04 +0000 (Tue, 10 Sep 2013) Log Message: ----------- Refactored the GASRunner into a base class and derived a BigdataGASRunner and a SAILGASRunner from it. Initial performance with the memory sail does not look very good. About 1/2 of the throughput that we observe with bigdata. I am going to try this on a machine with more RAM and see how that helps. See #629 (Graph mining API) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASRunner.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASRunner.java Removed Paths: ------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASRunner.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2013-09-10 13:55:55 UTC (rev 7392) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2013-09-10 15:31:04 UTC (rev 7393) @@ -32,6 +32,7 @@ * the generic type for the per-edge state, but that is not always * true. The SUM type is scoped to the GATHER + SUM operation (NOT * the computation). + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ public interface IGASProgram<VS, ES, ST> extends IGASOptions<VS, ES, ST> { Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASRunner.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASRunner.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASRunner.java 2013-09-10 15:31:04 UTC (rev 7393) @@ -0,0 +1,181 @@ +package com.bigdata.rdf.graph.impl.sail; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.Logger; +import org.openrdf.model.Resource; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; +import org.openrdf.sail.memory.MemoryStore; + +import com.bigdata.rdf.graph.IGASEngine; +import com.bigdata.rdf.graph.IGraphAccessor; +import com.bigdata.rdf.graph.impl.sail.SAILGASEngine.SAILGraphAccessor; +import com.bigdata.rdf.graph.impl.util.GASRunnerBase; +import com.bigdata.rdf.graph.util.GASUtil; + +/** + * Class for running GAS performance tests against the SAIL. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class SAILGASRunner<VS, ES, ST> extends GASRunnerBase<VS, ES, ST> { + + private static final Logger log = Logger.getLogger(SAILGASRunner.class); + + public SAILGASRunner(String[] args) throws ClassNotFoundException { + super(args); + } + + protected class SAILOptionData extends GASRunnerBase<VS, ES, ST>.OptionData { + + private Sail sail = null; + + private SailConnection cxn = null; + + @Override + public void init() throws Exception { + + super.init(); + + sail = new MemoryStore(); + + sail.initialize(); + + cxn = sail.getConnection(); + + } + + @Override + public void shutdown() { + + if (cxn != null) { + + try { + + cxn.close(); + + } catch (SailException e) { + + log.error(e, e); + + } finally { + + cxn = null; + + } + + } + + if (sail != null) { + + try { + + sail.shutDown(); + + } catch (SailException e) { + + log.error(e,e); + + } finally { + + sail = null; + + } + + } + + } + @Override + public boolean handleArg(final AtomicInteger i, final String[] args) { + if (super.handleArg(i, args)) { + return true; + } +// final String arg = args[i.get()]; +// if (arg.equals("-bufferMode")) { +// final String s = args[i.incrementAndGet()]; +// bufferModeOverride = BufferMode.valueOf(s); +// } else if (arg.equals("-namespace")) { +// final String s = args[i.incrementAndGet()]; +// namespaceOverride = s; +// } else { +// return false; +// } + return false; + } + + @Override + public void report(final StringBuilder sb) { + // NOP + } + + } // class SAILOptionData + + @Override + protected SAILOptionData newOptionData() { + + return new SAILOptionData(); + + } + + @Override + protected IGASEngine newGASEngine() { + + return new SAILGASEngine(getOptionData().nthreads); + + } + + @Override + protected void loadFiles() throws Exception { + + final SAILOptionData opt = getOptionData(); + final String[] resources = opt.loadSet.toArray(new String[0]); + + boolean ok = false; + SailConnection cxn = null; + try { + cxn = opt.cxn; + new GASUtil().loadGraph(cxn, null/* fallback */, resources); + cxn.commit(); + ok = true; + } finally { + if (cxn != null) { + if (!ok) + cxn.rollback(); + // Note: using the same connection, so don't close here. +// cxn.close(); + } + } + + } + + @SuppressWarnings("unchecked") + @Override + protected SAILOptionData getOptionData() { + + return (SAILOptionData) super.getOptionData(); + + } + + @Override + protected IGraphAccessor newGraphAccessor() { + + return new SAILGraphAccessor(getOptionData().cxn, + false/* includeInferred */, new Resource[0]/* defaultContext */); + + } + + /** + * Performance testing harness. + * + * @see #GASRunner(String[]) + */ + @SuppressWarnings("rawtypes") + public static void main(final String[] args) throws Exception { + + new SAILGASRunner(args).call(); + + } + +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java 2013-09-10 15:31:04 UTC (rev 7393) @@ -0,0 +1,464 @@ +package com.bigdata.rdf.graph.impl.util; + +import java.lang.reflect.Constructor; +import java.util.LinkedHashSet; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.Logger; +import org.openrdf.model.Value; + +import com.bigdata.rdf.graph.IGASContext; +import com.bigdata.rdf.graph.IGASEngine; +import com.bigdata.rdf.graph.IGASProgram; +import com.bigdata.rdf.graph.IGASScheduler; +import com.bigdata.rdf.graph.IGASSchedulerImpl; +import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IGASStats; +import com.bigdata.rdf.graph.IGraphAccessor; +import com.bigdata.rdf.graph.impl.GASEngine; +import com.bigdata.rdf.graph.impl.GASState; +import com.bigdata.rdf.graph.impl.GASStats; + +/** + * Base class for running performance tests. + * + * @param <VS> + * The generic type for the per-vertex state. This is scoped to the + * computation of the {@link IGASProgram}. + * @param <ES> + * The generic type for the per-edge state. This is scoped to the + * computation of the {@link IGASProgram}. + * @param <ST> + * The generic type for the SUM. This is often directly related to + * the generic type for the per-edge state, but that is not always + * true. The SUM type is scoped to the GATHER + SUM operation (NOT + * the computation). + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * + * TODO Do we need a different driver if the algorithm always visits all + * vertices? For such algorithms, we just run them once per graph + * (unless the graph is dynamic). + */ +//* @param <GE> +//* The generic type for the {@link IGASEngine}. +//* @param <BE> +//* The generic type for the backend implementation. + +public abstract class GASRunnerBase<VS, ES, ST> implements + Callable<IGASStats> { + + private static final Logger log = Logger.getLogger(GASRunnerBase.class); + + /** + * Configured options for the {@link GASRunner}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + protected class OptionData { + /** + * The seed used for the random number generator (default {@value #seed} + * ). + */ + public long seed = 217L; + /** + * Random number generated used for sampling the starting vertices. Set + * by #init(). + */ + public Random r = null; + /** + * The #of random starting vertices to use. + */ + public int nsamples = 100; + /** + * The #of threads to use for GATHER and SCATTER operators. + */ + public int nthreads = 4; + /** + * The analytic class to be executed. + */ + public Class<IGASProgram<VS, ES, ST>> analyticClass; + /** + * The {@link IGASSchedulerImpl} class to use. + * + * TODO Override or always? If always, then where to get the default? + */ + public Class<IGASSchedulerImpl> schedulerClassOverride; + + /** Set of files to load (may be empty). */ + public final LinkedHashSet<String> loadSet = new LinkedHashSet<String>(); + + /** The name of the implementation specific configuration file. */ + public String propertyFile; + + protected OptionData() { + + } + + /** + * Initialize any resources, including the connection to the backend. + */ + public void init() throws Exception { + + // Setup the random number generator. + this.r = new Random(seed); + + r = new Random(seed); + + } + + /** + * Shutdown any resources, including the connection to the backend. + * <p> + * Note: This method must be safe. It may be called if {@link #init()} + * fails. It may be called more than once. + */ + public void shutdown() { + + } + + /** + * Return <code>true</code>iff one or more arguments can be parsed + * starting at the specified index. + * + * @param i + * The index into the arguments. + * @param args + * The arguments. + * @return <code>true</code> iff any arguments were recognized. + */ + public boolean handleArg(final AtomicInteger i, final String[] args) { + + return false; + + } + + /** + * Print the optional message on stderr, print the usage information on + * stderr, and then force the program to exit with the given status code. + * + * @param status + * The status code. + * @param msg + * The optional message + */ + public void usage(final int status, final String msg) { + + if (msg != null) { + + System.err.println(msg); + + } + + System.err.println("[options] analyticClass propertyFile"); + + System.exit(status); + + } + + /** + * Extension hook for reporting at the end of the test run. + * + * @param sb A buffer into which more information may be appended. + */ + public void report(final StringBuilder sb) { + + // NOP + + } + + } // class OptionData + + /** + * The configuration metadata for the run. + */ + private final OptionData opt; + + /** + * Factory for the {@link OptionData}. + */ + abstract protected OptionData newOptionData(); + + /** + * The {@link OptionData} for the run. + */ + protected OptionData getOptionData() { + + return opt; + + } + + /** + * Factory for the {@link IGASEngine}. + */ + abstract protected IGASEngine newGASEngine(); + + /** + * Load files into the backend if they can not be assumed to already exist + * (a typical pattern is that files are loaded into an empty KB instance, + * but not loaded into a pre-existing one). + * + * @throws Exception + */ + abstract protected void loadFiles() throws Exception; + + /** + * Run a GAS analytic against some data set. + * + * @param args + * USAGE:<br/> + * <code>(options) analyticClass propertyFile</code> + * <p> + * <i>Where:</i> + * <dl> + * <dt>propertyFile</dt> + * <dd>The implementation specific property file or other type of + * configuration file.</dd> + * </dl> + * and <i>options</i> are any of: + * <dl> + * <dt>-nthreads</dt> + * <dd>The #of threads which will be used for GATHER and SCATTER + * operations.</dd> + * <dt>-nsamples</dt> + * <dd>The #of random sample starting vertices that will be + * selected. The algorithm will be run ONCE for EACH sampled + * vertex.</dd> + * <dt>-seed</dt> + * <dd>The seed for the random number generator (default is + * <code>217L</code>).</dd> + * <dt>-schedulerClass</dt> + * <dd>Override the default {@link IGASScheduler}. Class must + * implement {@link IGASSchedulerImpl}.</dd> + * <dt>-load</dt> + * <dd>Loads the named resource IFF the KB is empty (or does not + * exist) at the time this utility is executed. This option may + * appear multiple times. The resources will be searched for as + * URLs, on the CLASSPATH, and in the file system.</dd> + * </p> + * @throws ClassNotFoundException + */ + public GASRunnerBase(final String[] args) throws ClassNotFoundException { + + final OptionData opt = newOptionData(); + + /* + * Handle all arguments starting with "-". These should appear before + * any non-option arguments to the program. + */ + final AtomicInteger i = new AtomicInteger(0); + while (i.get() < args.length) { + final String arg = args[i.get()]; + if (arg.startsWith("-")) { + if (arg.equals("-seed")) { + opt.seed = Long.valueOf(args[i.incrementAndGet()]); + } else if (arg.equals("-nsamples")) { + final String s = args[i.incrementAndGet()]; + opt.nsamples = Integer.valueOf(s); + if (opt.nsamples <= 0) { + opt.usage(1/* status */, + "-nsamples must be positive, not: " + s); + } + } else if (arg.equals("-nthreads")) { + final String s = args[i.incrementAndGet()]; + opt.nthreads = Integer.valueOf(s); + if (opt.nthreads < 0) { + opt.usage(1/* status */, + "-nthreads must be non-negative, not: " + s); + } + } else if (arg.equals("-schedulerClass")) { + final String s = args[i.incrementAndGet()]; + opt.schedulerClassOverride = (Class<IGASSchedulerImpl>) Class.forName(s); + } else if (arg.equals("-load")) { + final String s = args[i.incrementAndGet()]; + opt.loadSet.add(s); + } else { + if (!opt.handleArg(i, args)) { + opt.usage(1/* status */, "Unknown argument: " + arg); + } + } + } else { + break; + } + i.incrementAndGet(); + } + + /* + * Check for the remaining (required) argument(s). + */ + final int nremaining = args.length - i.get(); + if (nremaining != 2) { + /* + * There are either too many or too few arguments remaining. + */ + opt.usage(1/* status */, nremaining < 1 ? "Too few arguments." + : "Too many arguments"); + } + + /* + * The analytic to be executed. + */ + { + + final String s = args[i.getAndIncrement()]; + + opt.analyticClass = (Class<IGASProgram<VS, ES, ST>>) Class + .forName(s); + + } + + /* + * Property file. + */ + opt.propertyFile = args[i.getAndIncrement()]; + + this.opt = opt; // assign options. + + } + + /** + * Return the object used to access the as-configured graph. + */ + abstract protected IGraphAccessor newGraphAccessor(); + + /** + * Return an instance of the {@link IGASProgram} to be evaluated. + */ + protected IGASProgram<VS, ES, ST> newGASProgram() { + + final Class<IGASProgram<VS, ES, ST>> cls = (Class<IGASProgram<VS, ES, ST>>)opt.analyticClass; + + try { + + final Constructor<IGASProgram<VS, ES, ST>> ctor = cls + .getConstructor(new Class[] {}); + + final IGASProgram<VS, ES, ST> gasProgram = ctor + .newInstance(new Object[] {}); + + return gasProgram; + + } catch (Exception e) { + + throw new RuntimeException(e); + + } + + } + + /** + * Run the test. + * <p> + * This provides a safe pattern for either loading data into a temporary + * journal, which is then destroyed, or using an exiting journal and + * optionally loading in some data set. When we load the data the journal is + * destroyed afterwards and when the journal is pre-existing and we neither + * load the data nor destroy the journal. This has to do with the effective + * BufferMode (if transient) and whether the file is specified and whether a + * temporary file is created (CREATE_TEMP_FILE). If we do our own file + * create if the effective buffer mode is non-transient, then we can get all + * this information. + */ + @Override + final public IGASStats call() throws Exception { + + try { + + // initialize backend / connection to backend. + opt.init(); + + // Load data sets + loadFiles(); + + // Run GAS program. + return runAnalytic(); + + } finally { + + // Shutdown backend / connection to backend. + opt.shutdown(); + + } + + } + + /** + * Run the analytic. + * + * @return The performance statistics for the run. + * + * @throws Exception + */ + final protected IGASStats runAnalytic() throws Exception { + + final IGASEngine gasEngine = newGASEngine(); + + try { + + if (opt.schedulerClassOverride != null) { + + ((GASEngine) gasEngine) + .setSchedulerClass(opt.schedulerClassOverride); + + } + + final IGASProgram<VS, ES, ST> gasProgram = newGASProgram(); + + final IGraphAccessor graphAccessor = newGraphAccessor(); + + final IGASContext<VS, ES, ST> gasContext = gasEngine.newGASContext( + graphAccessor, gasProgram); + + final IGASState<VS, ES, ST> gasState = gasContext.getGASState(); + + final VertexDistribution dist = graphAccessor.getDistribution(opt.r); + + final Value[] samples = dist.getWeightedSample(opt.nsamples); + + final IGASStats total = new GASStats(); + + for (int i = 0; i < samples.length; i++) { + + final Value startingVertex = samples[i]; + + gasState.init(startingVertex); + + final IGASStats stats = (IGASStats) gasContext.call(); + + total.add(stats); + + if (log.isInfoEnabled()) { + log.info("Run complete: vertex[" + i + "] of " + + samples.length + " : startingVertex=" + + startingVertex + ", stats(sample)=" + stats); + } + + } + + // Total over all sampled vertices. + final StringBuilder sb = new StringBuilder(); + sb.append("TOTAL"); + sb.append(": analytic=" + gasProgram.getClass().getSimpleName()); + sb.append(", nseed=" + opt.seed); + sb.append(", nsamples=" + opt.nsamples); + sb.append(", nthreads=" + opt.nthreads); + sb.append(", scheduler=" + ((GASState<VS, ES, ST>)gasState).getScheduler().getClass().getSimpleName()); + sb.append(", gasEngine=" + gasEngine.getClass().getSimpleName()); + opt.report(sb); // extension hook. + // performance results. + sb.append(", stats(total)=" + total); + System.out.println(sb); + + return total; + + } finally { + + gasEngine.shutdownNow(); + + } + + } + +} Copied: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASRunner.java (from rev 7382, branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASRunner.java) =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASRunner.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASRunner.java 2013-09-10 15:31:04 UTC (rev 7393) @@ -0,0 +1,544 @@ +package com.bigdata.rdf.graph.impl.bd; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.lang.reflect.Constructor; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.Logger; +import org.openrdf.sail.SailConnection; + +import com.bigdata.Banner; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; +import com.bigdata.rdf.graph.IGASProgram; +import com.bigdata.rdf.graph.IGraphAccessor; +import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor; +import com.bigdata.rdf.graph.impl.util.GASRunnerBase; +import com.bigdata.rdf.graph.util.GASUtil; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.store.AbstractTripleStore; + +/** + * Base class for running performance tests against the bigdata backend. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class BigdataGASRunner<VS, ES, ST> extends GASRunnerBase<VS, ES, ST> { + + private static final Logger log = Logger.getLogger(BigdataGASRunner.class); + + /** + * Configured options for the {@link GASRunner}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + protected class BigdataOptionData extends + GASRunnerBase<VS, ES, ST>.OptionData { + + /** + * The {@link BufferMode} to use. + */ + private BufferMode bufferModeOverride = null; // override only. + + /** + * The namespace of the bigdata KB instance. + */ + private String namespaceOverride = "kb"; + + /** + * The as-configured {@link Properties} for the {@link Journal}. + */ + private Properties properties; + + /** + * The effective KB name. This is set by consulting + * {@link #namespaceOverride} and the as configured {@link #properties}. + */ + private String namespace; + + /** + * The backend. + * + * TODO Could start NSS and use SPARQL UPDATE "LOAD" to load the data. + * That exposes the SPARQL end point for other purposes during the test. + * Is this useful? It could also let us run the GASEngine on a remote + * service (submit a callable to an HA server or define a REST API for + * submitting these GAS algorithms). + */ + private Journal jnl; + + /** + * <code>true</code> iff the backend is temporary (created on a + * temporary backing file). Temporary backends are destroyed in + * {@link #shutdown()}. + */ + private boolean isTemporary; + + /** + * Set to <code>true</code> iff we determine that the data needs to be + * loaded (e.g., the KB was empty, so we have to load the data sets). + * + * TODO Rename for clearer semantics. Basically, do we have to load the + * data files or can we assume that the data are already loaded. Lift + * into base class? + */ + private boolean newKB = false; + + /** + * The #of edges in the KB instance and <code>-1</code> until set by + * {@link BigdataGASRunner#loadFiles()}. + */ + private long nedges = -1; + + protected BigdataOptionData() { + + super(); + + } + + private Properties getProperties(final String resource) throws IOException { + + if (log.isInfoEnabled()) + log.info("Reading properties: " + resource); + + InputStream is = null; + try { + + // try the classpath + is = getClass().getResourceAsStream(resource); + + if (is != null) { + + } else { + + // try file system. + final File file = new File(resource); + + if (file.exists()) { + + is = new FileInputStream(file); + + } else { + + throw new IOException("Could not locate resource: " + + resource); + + } + + } + + /* + * Obtain a buffered reader on the input stream. + */ + + final Properties properties = new Properties(); + + final Reader reader = new BufferedReader(new InputStreamReader(is)); + + try { + + properties.load(reader); + + } finally { + + try { + + reader.close(); + + } catch (Throwable t) { + + log.error(t); + + } + + } + + /* + * Allow override of select options from the command line. + */ + { + final String[] overrides = new String[] { + // Journal options. + com.bigdata.journal.Options.FILE, +// // RDFParserOptions. +// RDFParserOptions.Options.DATATYPE_HANDLING, +// RDFParserOptions.Options.PRESERVE_BNODE_IDS, +// RDFParserOptions.Options.STOP_AT_FIRST_ERROR, +// RDFParserOptions.Options.VERIFY_DATA, +// // DataLoader options. +// DataLoader.Options.BUFFER_CAPACITY, +// DataLoader.Options.CLOSURE, +// DataLoader.Options.COMMIT, +// DataLoader.Options.FLUSH, + }; + for (String s : overrides) { + if (System.getProperty(s) != null) { + // Override/set from the environment. + final String v = System.getProperty(s); + if (log.isInfoEnabled()) + log.info("OVERRIDE:: Using: " + s + "=" + v); + properties.setProperty(s, v); + } + } + } + + return properties; + + } finally { + + if (is != null) { + + try { + + is.close(); + + } catch (Throwable t) { + + log.error(t); + + } + + } + + } + + } + + /** + * Initialization after all arguments have been set. + */ + @Override + public void init() throws Exception { + + super.init(); + + properties = getProperties(propertyFile); + + /* + * Note: Allows override through the command line argument. The default + * is otherwise the default and the value in the properties file (if + * any) will be used unless it is overridden. + */ + final BufferMode bufferMode = bufferModeOverride == null ? BufferMode + .valueOf(properties.getProperty(Journal.Options.BUFFER_MODE, + Journal.Options.DEFAULT_BUFFER_MODE)) : bufferModeOverride; + + properties.setProperty(Journal.Options.BUFFER_MODE, bufferMode.name()); + + final boolean isTransient = !bufferMode.isStable(); + + if (isTransient) { + + isTemporary = true; + + } else { + + final String fileStr = properties.getProperty(Journal.Options.FILE); + + if (fileStr == null) { + + /* + * We will use a temporary file that we create here. The journal + * will be destroyed below. + */ + isTemporary = true; + + final File tmpFile = File.createTempFile( + BigdataGASRunner.class.getSimpleName(), + Journal.Options.JNL); + + // Set this on the Properties so it will be used by the jnl. + properties.setProperty(Journal.Options.FILE, + tmpFile.getAbsolutePath()); + + } else { + + // real file is named. + isTemporary = false; + + } + + } + + // The effective KB name. + namespace = namespaceOverride == null ? properties + .getProperty(BigdataSail.Options.NAMESPACE, + BigdataSail.Options.DEFAULT_NAMESPACE) : namespaceOverride; + + properties.setProperty(BigdataSail.Options.NAMESPACE, namespace); + + // Open Journal. + jnl = new Journal(properties); + + // Locate/create KB. + { + final AbstractTripleStore kb; + if (isTemporary) { + + kb = BigdataSail.createLTS(jnl, properties); + newKB = true; + + } else { + + final AbstractTripleStore tmp = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, + ITx.UNISOLATED); + + if (tmp == null) { + + // create. + kb = BigdataSail.createLTS(jnl, properties); + newKB = true; + + } else { + + kb = tmp; + newKB = kb.getStatementCount() == 0L; + + } + + } + } + } + + @Override + public void shutdown() { + + if (jnl != null) { + + if (isTemporary) { + + log.warn("Destroying temporary journal."); + + jnl.destroy(); + + } else { + + jnl.close(); + + } + + } + + super.shutdown(); + + } + + /** + * Return <code>true</code>iff one or more arguments can be parsed + * starting at the specified index. + * + * @param i + * The index into the arguments. + * @param args + * The arguments. + * @return <code>true</code> iff any arguments were recognized. + */ + @Override + public boolean handleArg(final AtomicInteger i, final String[] args) { + if (super.handleArg(i, args)) { + return true; + } + final String arg = args[i.get()]; + if (arg.equals("-bufferMode")) { + final String s = args[i.incrementAndGet()]; + bufferModeOverride = BufferMode.valueOf(s); + } else if (arg.equals("-namespace")) { + final String s = args[i.incrementAndGet()]; + namespaceOverride = s; + } else { + return false; + } + return true; + } + + /** + * {@inheritDoc} + * + * TODO report #of vertices (DISTINCT UNION (?s, ?o) + * + * TODO What happened to the predicate summary/histogram/distribution + * code? + */ + @Override + public void report(final StringBuilder sb) { + sb.append(", edges(kb)=" + nedges); + sb.append(", namespace=" + namespace);// + sb.append(", bufferMode=" + jnl.getBufferStrategy().getBufferMode()); + } + } + + /** + * Factory for the {@link OptionData}. + */ + @Override + protected OptionData newOptionData() { + + return new BigdataOptionData(); + + } + + @Override + protected BigdataGASEngine newGASEngine() { + + final BigdataOptionData opt = getOptionData(); + + return new BigdataGASEngine(opt.jnl, opt.nthreads); + + } + + @Override + protected IGraphAccessor newGraphAccessor() { + + final BigdataOptionData opt = getOptionData(); + + /* + * Use a read-only view (sampling depends on access to the BTree rather + * than the ReadCommittedIndex). + */ + final BigdataGraphAccessor graphAccessor = new BigdataGraphAccessor( + opt.jnl, opt.namespace, opt.jnl.getLastCommitTime()); + + return graphAccessor; + + } + + @SuppressWarnings("unchecked") + @Override + protected BigdataOptionData getOptionData() { + + return (BigdataOptionData) super.getOptionData(); + + } + + /** + * Run a GAS analytic against some data set. + * + * @param args + * USAGE:<br/> + * <code>(options) analyticClass propertyFile</code> + * <p> + * <i>Where:</i> + * <dl> + * <dt>propertyFile</dt> + * <dd>A java properties file for a standalone {@link Journal}.</dd> + * </dl> + * and <i>options</i> are any of the options defined for the + * {@link GASRunnerBase} PLUS any of: + * <dl> + * <dt>-bufferMode</dt> + * <dd>Overrides the {@link BufferMode} (if any) specified in the + * <code>propertyFile</code>.</dd> + * <dt>-namespace</dt> + * <dd>The namespace of the default SPARQL endpoint (the + * namespace will be <code>kb</code> if none was specified when + * the triple/quad store was created).</dd> + * </p> + * @throws ClassNotFoundException + */ + public BigdataGASRunner(final String[] args) throws ClassNotFoundException { + + super(args); + + Banner.banner(); + + } + + /** + * Return an instance of the {@link IGASProgram} to be evaluated. + */ + protected IGASProgram<VS, ES, ST> newGASProgram() { + + final Class<IGASProgram<VS, ES, ST>> cls = getOptionData().analyticClass; + + try { + + final Constructor<IGASProgram<VS, ES, ST>> ctor = cls + .getConstructor(new Class[] {}); + + final IGASProgram<VS, ES, ST> gasProgram = ctor + .newInstance(new Object[] {}); + + return gasProgram; + + } catch (Exception e) { + + throw new RuntimeException(e); + + } + + } + + @Override + public void loadFiles() throws IOException { + + final BigdataOptionData opt = getOptionData(); + + final Journal jnl = opt.jnl; + final String namespace = opt.namespace; + final String[] loadSet = opt.loadSet.toArray(new String[0]); + + // Load data using the unisolated view. + final AbstractTripleStore kb = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, ITx.UNISOLATED); + + if (opt.newKB && loadSet.length > 0) { + + final BigdataSail sail = new BigdataSail(kb); + try { + try { + sail.initialize(); + loadFiles(sail, loadSet); + } finally { + if (sail.isOpen()) + sail.shutDown(); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + // total #of edges in that graph. + opt.nedges = kb.getStatementCount(); + + } + + private void loadFiles(final BigdataSail sail, final String[] loadSet) + throws Exception { + boolean ok = false; + final SailConnection cxn = sail.getUnisolatedConnection(); + try { + for (String f : loadSet) { + new GASUtil() + .loadGraph(cxn, null/* fallback */, f/* resource */); + } + cxn.commit(); + ok = true; + } finally { + if (!ok) + cxn.rollback(); + cxn.close(); + } + } + + /** + * Performance testing harness. + * + * @see #GASRunner(String[]) + */ + @SuppressWarnings("rawtypes") + public static void main(final String[] args) throws Exception { + + new BigdataGASRunner(args).call(); + + } + +} Deleted: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASRunner.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASRunner.java 2013-09-10 13:55:55 UTC (rev 7392) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASRunner.java 2013-09-10 15:31:04 UTC (rev 7393) @@ -1,737 +0,0 @@ -package com.bigdata.rdf.graph.impl.bd; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.lang.reflect.Constructor; -import java.util.LinkedHashSet; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Callable; - -import org.apache.log4j.Logger; -import org.openrdf.model.Value; -import org.openrdf.sail.SailConnection; - -import com.bigdata.Banner; -import com.bigdata.journal.BufferMode; -import com.bigdata.journal.ITx; -import com.bigdata.journal.Journal; -import com.bigdata.rdf.graph.IGASContext; -import com.bigdata.rdf.graph.IGASEngine; -import com.bigdata.rdf.graph.IGASProgram; -import com.bigdata.rdf.graph.IGASScheduler; -import com.bigdata.rdf.graph.IGASSchedulerImpl; -import com.bigdata.rdf.graph.IGASState; -import com.bigdata.rdf.graph.IGASStats; -import com.bigdata.rdf.graph.impl.GASEngine; -import com.bigdata.rdf.graph.impl.GASState; -import com.bigdata.rdf.graph.impl.GASStats; -import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor; -import com.bigdata.rdf.graph.impl.util.VertexDistribution; -import com.bigdata.rdf.graph.util.GASUtil; -import com.bigdata.rdf.sail.BigdataSail; -import com.bigdata.rdf.store.AbstractTripleStore; - -/** - * Base class for running performance tests. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * - * TODO Do we need a different driver if the algorithm always visits all - * vertices? For such algorithms, we just run them once per graph - * (unless the graph is dynamic). - * - * FIXME API: GASRunner needs to use a factory pattern for different - * backends. It should then be moved to the util package and the - * bigdata-gas module. - */ -public class GASRunner<VS, ES, ST> implements Callable<IGASStats> { - - private static final Logger log = Logger.getLogger(GASRunner.class); - - /** - * The seed used for the random number generator. - */ - private final long seed; - - /** - * Random number generated used for sampling the starting vertices. - */ - private final Random r; - - /** - * The #of random starting vertices to use. - */ - private final int nsamples; - - /** - * The #of threads to use for GATHER and SCATTER operators. - */ - private final int nthreads; - - /** - * The analytic class to be executed. - */ - private final Class<IGASProgram<VS, ES, ST>> analyticClass; - - /** - * The property file - */ - private final String propertyFile; - - /** - * When non-<code>null</code>, this overrides the current buffer mode. - */ - private final BufferMode bufferModeOverride; - - /** - * When non-<code>null</code>, this overrides the KB namespace. - */ - private final String namespaceOverride; - - /** - * The {@link IGASSchedulerImpl} class to use. - */ - private final Class<IGASSchedulerImpl> schedulerClassOverride; - - /** - * When non-<code>null</code>, a list of zero or more resources to be - * loaded. The resources will be searched for as URLs, on the CLASSPATH, and - * in the file system. - */ - private final String[] loadSet; - - /** - * Print the optional message on stderr, print the usage information on - * stderr, and then force the program to exit with the given status code. - * - * @param status - * The status code. - * @param msg - * The optional message - */ - private static void usage(final int status, final String msg) { - - if (msg != null) { - - System.err.println(msg); - - } - - System.err.println("[options] analyticClass propertyFile"); - - System.exit(status); - - } - - /** - * Run a GAS analytic against some data set. - * - * @param args - * USAGE:<br/> - * <code>(options) analyticClass propertyFile</code> - * <p> - * <i>Where:</i> - * <dl> - * <dt>propertyFile</dt> - * <dd>A java properties file for a standalone {@link Journal}.</dd> - * </dl> - * and <i>options</i> are any of: - * <dl> - * <dt>-nthreads</dt> - * <dd>The #of threads which will be used for GATHER and SCATTER - * operations.</dd> - * <dt>-nsamples</dt> - * <dd>The #of random sample starting vertices that will be - * selected. The algorithm will be run ONCE for EACH sampled - * vertex.</dd> - * <dt>-seed</dt> - * <dd>The seed for the random number generator (default is - * <code>217L</code>).</dd> - * <dt>-bufferMode</dt> - * <dd>Overrides the {@link BufferMode} (if any) specified in the - * <code>propertyFile</code>.</dd> - * <dt>-schedulerClass</dt> - * <dd>Override the default {@link IGASScheduler}. Class must - * implement {@link IGASSchedulerImpl}.</dd> - * <dt>-namespace</dt> - * <dd>The namespace of the default SPARQL endpoint (the - * namespace will be <code>kb</code> if none was specified when - * the triple/quad store was created).</dd> - * <dt>-load</dt> - * <dd>Loads the named resource IFF the KB is empty (or does not - * exist) at the time this utility is executed. This option may - * appear multiple times. The resources will be searched for as - * URLs, on the CLASSPATH, and in the file system.</dd> - * </p> - * @throws ClassNotFoundException - */ - public GASRunner(final String[] args) throws ClassNotFoundException { - - Banner.banner(); - - long seed = 217L; - int nsamples = 100; - int nthreads = 4; - BufferMode bufferMode = null; // override only. - Class<IGASSchedulerImpl> schedulerClass = null; // override only. - String namespace = "kb"; - // Set of files to load (optional). - LinkedHashSet<String> loadSet = new LinkedHashSet<String>(); - - /* - * Handle all arguments starting with "-". These should appear before - * any non-option arguments to the program. - */ - int i = 0; - while (i < args.length) { - final String arg = args[i]; - if (arg.startsWith("-")) { - if (arg.equals("-seed")) { - seed = Long.valueOf(args[++i]); - } else if (arg.equals("-nsamples")) { - final String s = args[++i]; - nsamples = Integer.valueOf(s); - if (nsamples <= 0) { - usage(1/* status */, - "-nsamples must be positive, not: " + s); - } - } else if (arg.equals("-nthreads")) { - final String s = args[++i]; - nthreads = Integer.valueOf(s); - if (nthreads < 0) { - usage(1/* status */, - "-nthreads must be non-negative, not: " + s); - } - } else if (arg.equals("-bufferMode")) { - final String s = args[++i]; - bufferMode = BufferMode.valueOf(s); - } else if (arg.equals("-schedulerClass")) { - final String s = args[++i]; - schedulerClass = (Class<IGASSchedulerImpl>) Class.forName(s); - } else if (arg.equals("-namespace")) { - final String s = args[++i]; - namespace = s; - } else if (arg.equals("-load")) { - final String s = args[++i]; - loadSet.add(s); - } else { - usage(1/* status */, "Unknown argument: " + arg); - } - } else { - break; - } - i++; - } - - /* - * Check for the remaining (required) argument(s). - */ - final int nremaining = args.length - i; - if (nremaining != 2) { - /* - * There are either too many or too few arguments remaining. - */ - usage(1/* status */, nremaining < 1 ? "Too few arguments." - : "Too many arguments"); - } - - /* - * The analytic to be executed. - */ - { - - final String s = args[i++]; - - this.analyticClass = (Class<IGASProgram<VS, ES, ST>>) Class - .forName(s); - - } - - /* - * Property file. - */ - this.propertyFile = args[i++]; - - /* - * Assign parsed values. - */ - this.seed = seed; - this.nsamples = nsamples; - this.nthreads = nthreads; - this.namespaceOverride = namespace; - this.bufferModeOverride = bufferMode; - this.schedulerClassOverride = schedulerClass; - this.loadSet = loadSet.isEmpty() ? null : loadSet - .toArray(new String[loadSet.size()]); - - // Setup the random number generator. - this.r = new Random(seed); - - } - - /** - * Return an instance of the {@link IGASProgram} to be evaluated. - */ - protected IGASProgram<VS, ES, ST> newGASProgram() { - - final Class<IGASProgram<VS, ES, ST>> cls = analyticClass; - - try { - - final Constructor<IGASProgram<VS, ES, ST>> ctor = cls - .getConstructor(new Class[] {}); - - final IGASProgram<VS, ES, ST> gasProgram = ctor - .newInstance(new Object[] {}); - - return gasProgram; - - } catch (Exception e) { - - throw new RuntimeException(e); - - } - - } - - private Properties getProperties(final String resource) throws IOException { - - if (log.isInfoEnabled()) - log.info("Reading properties: " + resource); - - InputStream is = null; - try { - - // try the classpath - is = getClass().getResourceAsStream(resource); - - if (is != null) { - - } else { - - // try file system. - final File file = new File(resource); - - if (file.exists()) { - - is = new FileInputStream(file); - - } else { - - throw new IOException("Could not locate resource: " - + resource); - - } - - } - - /* - * Obtain a buffered reader on the input stream. - */ - - final Properties properties = new Properties(); - - final Reader reader = new BufferedReader(new InputStreamReader(is)); - - try { - - properties.load(reader); - - } finally { - - try { - - reader.close(); - - } catch (Throwable t) { - - log.error(t); - - } - - } - - /* - * Allow override of select options from the command line. - */ - { - final String[] overrides = new String[] { - // Journal options. - com.bigdata.journal.Options.FILE, -// // RDFParserOptions. -// RDFParserOptions.Options.DATATYPE_HANDLING, -// RDFParserOptions.Options.PRESERVE_BNODE_IDS, -// RDFParserOptions.Options.STOP_AT_FIRST_ERROR, -// RDFParserOptions.Options.VERIFY_DATA, -// // DataLoader options. -// DataLoader.Options.BUFFER_CAPACITY, -// DataLoader.Options.CLOSURE, -// DataLoader.Options.COMMIT, -// DataLoader.Options.FLUSH, - }; - for (String s : overrides) { - if (System.getProperty(s) != null) { - // Override/set from the environment. - final String v = System.getProperty(s); - if (log.isInfoEnabled()) - log.info("OVERRIDE:: Using: " + s + "=" + v); - properties.setProperty(s, v); - } - } - } - - return properties; - - } finally { - - if (is != null) { - - try { - - is.close(); - - } catch (Throwable t) { - - log.error(t); - - } - - } - - } - - } - - /** - * Run the test. - * <p> - * This provides a safe pattern for either loading data into a temporary - * journal, which is then destroyed, or using an exiting journal and - * optionally loading in some data set. When we load the data the journal is - * destroyed afterwards and when the journal is pre-existing and we neither - * load the data nor destroy the journal. This has to do with the effective - * BufferMode (if transient) and whether the file is specified and whether a - * temporary file is created (CREATE_TEMP_FILE). If we do our own file - * create if the effective buffer mode is non-transient, then we can get all - * this information. - */ - public IGASStats call() throws Exception { - - final Properties properties = getProperties(propertyFile); - - /* - * Note: Allows override through the command line argument. The default - * is otherwise the default and the value in the properties file (if - * any) will be used unless it is overridden. - */ - final BufferMode bufferMode = this.bufferModeOverride == null ? BufferMode - .valueOf(properties.getProperty(Journal.Options.BUFFER_MODE, - Journal.Options.DEFAULT_BUFFER_MODE)) : this.bufferModeOverride; - - properties.setProperty(Journal.Options.BUFFER_MODE, bufferMode.name()); - - final boolean isTransient = !bufferMode.isStable(); - - final boolean isTemporary; - if (isTransient) { - - isTemporary = true; - - } else { - - final String fileStr = properties.getProperty(Journal.Options.FILE); - - if (fileStr == null) { - - /* - * We will use a temporary file that we create here. The journal - * will be destroyed below. - */ - isTemporary = true; - - final File tmpFile = File.createTempFile( - GASRunner.class.getSimpleName(), Journal.Options.JNL); - - // Set this on the Properties so it will be used by the jnl. - properties.setProperty(Journal.Options.FILE, - tmpFile.getAbsolutePath()); - - } else { - - // real file is named. - isTemporary = false; - - } - - } - - // The effective KB name. - final String namespace = this.namespaceOverride == null ? properties - .getProperty(BigdataSail.Options.NAMESPACE, - BigdataSail.Options.DEFAULT_NAMESPACE) : this.namespaceOverride; - - properties.setProperty(BigdataSail.Options.NAMESPACE, namespace); - - /* - * TODO Could start NSS and use SPARQL UPDATE "LOAD" to load the data. - * That exposes the SPARQL end point for other purposes during the test. - * Is this useful? It could also let us run the GASEngine on a remote - * service (submit a callable to an HA server or define a REST API for - * submitting these GAS algorithms). - */ - final Journal jnl = new Journal(properties); - - try { - - // Locate/create KB. - final boolean newKB; - { - final AbstractTripleStore kb; - if (isTemporary) { - - kb = BigdataSail.createLTS(jnl, properties); - newKB = true; - - } else { - - final AbstractTripleStore tmp = (AbstractTripleStore) jnl - .getResourceLocator().locate(namespace, - ITx.UNISOLATED); - - if (tmp == null) { - - // create. - kb = BigdataSail.createLTS(jnl, properties); - newKB = true; - - } else { - - kb = tmp; - newKB = kb.getStatementCount() == 0L; - - } - - } - } - - /* - * Load data sets (iff new KB). - */ - if (newKB && (loadSet != null && loadSet.length > 0)) { - - loadFiles(jnl, namespace, loadSet); - - } - - return runAnalytic(jnl, namespace); - - } finally { - - if (isTemporary) { - - log.warn("Destroying temporary journal."); - - jnl.destroy(); - - } else { - - jnl.close(); - - } - - } - - } - - /** - * Load files into the journal. - * - * @param jnl - * The journal. - * @param namespace - * The KB namespace. - * @param loadSet - * The files. - * @throws IOException - */ - private void loadFiles(final Journal jnl, final String namespace, - final String[] loadSet) throws IOException { - - // Load data using the unisolated view. - final AbstractTripleStore kb = (AbstractTripleStore) jnl - .getResourceLocator().locate(namespace, ITx.UNISOLATED); - final BigdataSail sail = new BigdataSail(kb); - try { - try { - sail.initialize(); - boolean ok = false; - final SailConnection cxn = sail.getUnisolatedConnection(); - try { - for (String f : loadSet) { - new GASUtil() - .loadGraph(cxn, null/* fallback */, f/* resource */); - } - cxn.commit(); - ok = true; - } finally { - if (!ok) - cxn.rollback(); - cxn.close(); - } - } finally { - if (sail.isOpen()) - sail.shutDown(); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - - // final String path = "bigdata-rdf/src/resources/data/foaf"; - // final String dataFile[] = new String[] {// - // path + "/data-0.nq.gz",// - // path + "/data-1.nq.gz",// - // path + "/data-2.nq.gz",// - // path + "/data-3.nq.gz",// - // }; -// final String baseUrl[] = new String[loadSet.le... [truncated message content] |