From: <tho...@us...> - 2014-04-01 20:16:47
|
Revision: 8024 http://sourceforge.net/p/bigdata/code/8024 Author: thompsonbry Date: 2014-04-01 20:16:43 +0000 (Tue, 01 Apr 2014) Log Message: ----------- Adding a stress test from a customer for #871. We have not been able to make this test fail yet. Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/StressTest_ClosedByInterrupt_RW.java Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/StressTest_ClosedByInterrupt_RW.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/StressTest_ClosedByInterrupt_RW.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/StressTest_ClosedByInterrupt_RW.java 2014-04-01 20:16:43 UTC (rev 8024) @@ -0,0 +1,324 @@ +package com.bigdata.rdf.sail; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import junit.framework.TestCase; + +import org.apache.log4j.Logger; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.RepositoryResult; + +public class StressTest_ClosedByInterrupt_RW extends TestCase { + + private static final Logger log = Logger + .getLogger(StressTest_ClosedByInterrupt_RW.class); + + public StressTest_ClosedByInterrupt_RW() { + super(); + } + + public StressTest_ClosedByInterrupt_RW(String name) { + super(name); + } + + private static final int NUM_INSERT_DELETE_LOOPS = 10; + private static final int NUM_INSERTS_PER_LOOP = 200000; + private static final int NUM_DELETES_PER_LOOP = 23000; + private static final long MILLIS_BETWEEN_INSERTS = -1; + private static final long MILLIS_BETWEEN_DELETES = -1; + private static final int NUM_STATEMENTS_PER_INSERT = 50; + + private static final int NUM_SELECTS = 5000; + private static final int NUM_STATEMENTS_PER_SELECT = 23000; + private static final long MILLIS_BETWEEN_QUERY_BURSTS = 1000; + + private static boolean HALT_ON_ERROR = true; + + private volatile boolean stopRequested = false; + + private void snooze(final long millis) throws InterruptedException { + if (millis > 0) { + Thread.sleep(millis); + } + } + +// @Test + public void test() throws RepositoryException, InterruptedException { + final File jnlFile = new File("interrupted.jnl"); + + if (jnlFile.exists()) { + jnlFile.delete(); + } + + final Properties props = new Properties(); + props.setProperty("com.bigdata.rdf.sail.namespace", "emc.srm.topology.kb"); + props.setProperty("com.bigdata.journal.AbstractJournal.bufferMode", "DiskRW"); + props.setProperty("com.bigdata.btree.writeRetentionQueue.capacity", "4000"); + props.setProperty("com.bigdata.btree.BTree.branchingFactor", "128"); + props.setProperty("com.bigdata.service.AbstractTransactionService.minReleaseAge", "1"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.textIndex", "false"); + props.setProperty( + "com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlTransitiveProperty", "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlSameAsClosure", + "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlSameAsProperties", + "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlInverseOf", "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlEquivalentClass", + "false"); + props.setProperty( + "com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlEquivalentProperty", "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlHasValue", "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainRdfTypeRdfsResource", + "false"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.axiomsClass", + "com.bigdata.rdf.axioms.NoAxioms"); + props.setProperty("com.bigdata.rdf.sail.truthMaintenance", "false"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.justify", "false"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.statementIdentifiers", "false"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.quadsMode", "true"); + props.setProperty("com.bigdata.journal.AbstractJournal.maximumExtent", "209715200"); + props.setProperty("com.bigdata.service.IBigdataClient.collectPlatformStatistics", "false"); + props.setProperty("com.bigdata.service.IBigdataClient.httpdPort", "-1"); + props.setProperty("com.bigdata.rdf.sail.bufferCapacity", "100000"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.bloomFilter", "false"); + + props.setProperty(BigdataSail.Options.CREATE_TEMP_FILE, Boolean.FALSE.toString()); + props.setProperty(BigdataSail.Options.FILE, jnlFile.toString()); + + final BigdataSail sail = new BigdataSail(props); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + repo.initialize(); + + final InsertDeleteRunner mapper = new InsertDeleteRunner(repo); + final ReadOnlyRunner mdp = new ReadOnlyRunner(repo); + + final Thread mapperThread = new Thread(mapper); + final Thread mdpThread = new Thread(mdp); + + mapperThread.start(); + mdpThread.start(); + + mapperThread.join(); + System.out.println("Mapper is done"); + + stopRequested = true; + mdpThread.join(); + System.out.println("MDP is done"); + + repo.shutDown(); + System.out.println("Repository has shut down"); + + } + + private class InsertDeleteRunner implements Runnable { + + private final BigdataSailRepository repo; + + public InsertDeleteRunner(final BigdataSailRepository repo) { + this.repo = repo; + } + + @Override + public void run() { + + for (int loop = 0; loop < NUM_INSERT_DELETE_LOOPS; ++loop) { + + System.out.println("[Read/Write] enter loop " + loop); + RepositoryConnection conn = null; + + try { + System.out.println("[Read/Write] inserting ..."); + conn = repo.getConnection(); + conn.setAutoCommit(false); + + for (int index = 0; index < NUM_INSERTS_PER_LOOP; ++index) { + doInsert(conn, loop, index); + snooze(MILLIS_BETWEEN_INSERTS); + } + + conn.commit(); + conn.close(); + conn = null; + + } catch (Throwable t) { + printError("Read/Write threw on insert in loop " + loop, t); + } finally { + closeNoException(conn); + } + + try { + System.out.println("[Read/Write] deleting ..."); + conn = repo.getConnection(); + conn.setAutoCommit(false); + + for (int index = 0; index < NUM_DELETES_PER_LOOP; ++index) { + doDelete(conn, loop, index); + snooze(MILLIS_BETWEEN_DELETES); + } + + conn.commit(); + conn.close(); + conn = null; + + } catch (Throwable t) { + printError("Read/Write threw on delete in loop " + loop, t); + } finally { + closeNoException(conn); + } + System.out.println("[Read/Write] leave loop " + loop); + } + } + + private void doInsert(final RepositoryConnection conn, final int loop, final int index) + throws RepositoryException { + final ValueFactory vf = conn.getValueFactory(); + final URI c = vf.createURI("context:loop:" + loop + ":item:" + index); + final URI s = vf.createURI("subject:loop:" + loop + ":item:" + index); + for (int x = 0; x < NUM_STATEMENTS_PER_INSERT; ++x) { + final URI p = vf.createURI("predicate:" + x); + final Literal o = vf.createLiteral("SomeValue"); + conn.add(s, p, o, c); + } + } + + private void doDelete(final RepositoryConnection conn, final int loop, final int index) + throws RepositoryException { + final ValueFactory vf = conn.getValueFactory(); + final URI context = vf.createURI("context:loop:" + loop + ":item:" + index); + final Collection<Statement> statements = getStatementsForContext(conn, context); + for (Statement statement : statements) { + conn.remove(statement, context); + } + } + + private Collection<Statement> getStatementsForContext(final RepositoryConnection conn, + final URI context) throws RepositoryException { + RepositoryResult<Statement> res = null; + final Collection<Statement> statements = new ArrayList<Statement>(); + try { + res = conn.getStatements(null, null, null, false, context); + while (res.hasNext()) { + statements.add(res.next()); + } + } finally { + res.close(); + } + return statements; + } + } + + private class ReadOnlyRunner implements Runnable { + + private final BigdataSailRepository repo; + + public ReadOnlyRunner(final BigdataSailRepository repo) { + this.repo = repo; + } + + @Override + public void run() { + + RepositoryConnection conn = null; + TupleQueryResult result = null; + int loop = 0; + + while (stopRequested == false) { + + try { + + System.out.println("[Read ] snooze"); + snooze(MILLIS_BETWEEN_QUERY_BURSTS); + System.out.println("[Read ] enter loop " + loop); + + for (int invocation = 0; invocation < NUM_SELECTS; ++invocation) { + + conn = repo.getReadOnlyConnection(); + conn.setAutoCommit(false); + + final String sparql = "SELECT ?s WHERE { ?s ?p ?o } LIMIT " + + NUM_STATEMENTS_PER_SELECT; + final TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL, sparql); + result = query.evaluate(); + + final List<String> duds = new ArrayList<String>(); + + while (result.hasNext()) { + final BindingSet bindingSet = result.next(); + for (final Iterator<Binding> i = bindingSet.iterator(); i.hasNext();) { + final Binding b = i.next(); + if (b.getValue() != null) { + duds.add(b.getValue().stringValue()); + } + } + } + + result.close(); + result = null; + + conn.close(); + conn = null; + + } + + } catch (Throwable t) { + printError("Read Only threw in loop " + loop, t); + } finally { + closeNoException(result); + closeNoException(conn); + } + + System.out.println("[Read ] leave loop " + loop); + ++loop; + + } + } + + } + + private void closeNoException(RepositoryConnection conn) { + if (conn != null) { + try { + conn.close(); + } catch (RepositoryException e) { + log.error("closeNoException(conn)", e); + } + } + } + + private void closeNoException(TupleQueryResult result) { + if (result != null) { + try { + result.close(); + } catch (QueryEvaluationException e) { + log.error("closeNoException(result)", e); + } + } + } + + private void printError(final String message, final Throwable cause) { + log.error(message, cause); +// Exception e = new Exception(message, cause); +// e.printStackTrace(); + if (HALT_ON_ERROR) { + System.exit(123); + } + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |