This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <tho...@us...> - 2013-08-23 21:24:21
|
Revision: 7326 http://bigdata.svn.sourceforge.net/bigdata/?rev=7326&view=rev Author: thompsonbry Date: 2013-08-23 21:24:14 +0000 (Fri, 23 Aug 2013) Log Message: ----------- Modified to not execute GATHER and SCATTER stages when NoEdges was specified (9% of cost is a NOP GATHER for BFS). Added more info to the GASRunner reporting. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-08-23 15:05:39 UTC (rev 7325) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-08-23 21:24:14 UTC (rev 7326) @@ -578,10 +578,19 @@ */ final long beginGather = System.nanoTime(); - - final long gatherEdgeCount = gatherEdges(kb, gatherEdges, - pushDownApplyInGather); + final long gatherEdgeCount; + if (gatherEdges == EdgesEnum.NoEdges) { + + gatherEdgeCount = 0L; + + } else { + + gatherEdgeCount = gatherEdges(kb, gatherEdges, + pushDownApplyInGather); + + } + final long elapsedGather = System.nanoTime() - beginGather; /* @@ -609,10 +618,20 @@ */ final long beginScatter = System.nanoTime(); + + final long scatterEdgeCount; - final long scatterEdgeCount = scatterEdges(kb, scatterEdges, - pushDownApplyInScatter); + if (scatterEdges == EdgesEnum.NoEdges) { + scatterEdgeCount = 0L; + + } else { + + scatterEdgeCount = scatterEdges(kb, scatterEdges, + pushDownApplyInScatter); + + } + final long elapsedScatter = System.nanoTime() - beginScatter; /* Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-23 15:05:39 UTC (rev 7325) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-23 21:24:14 UTC (rev 7326) @@ -523,8 +523,10 @@ * Setup and run over those samples. */ + final IGASProgram<VS, ES, ST> gasProgram = newGASProgram(); + final IGASEngine<VS, ES, ST> gasEngine = new GASEngine<VS, ES, ST>(jnl, - namespace, ITx.READ_COMMITTED, newGASProgram(), nthreads); + namespace, ITx.READ_COMMITTED, gasProgram, nthreads); final GASStats total = new GASStats(); @@ -549,10 +551,11 @@ } // Total over all sampled vertices. - System.out.println("TOTAL: nseed=" + seed + ", nsamples=" + nsamples - + ", nthreads=" + nthreads + ", bufferMode=" - + jnl.getBufferStrategy().getBufferMode() + ", edges(kb)=" - + nedges + ", stats(total)=" + total); + System.out.println("TOTAL: analytic=" + + gasProgram.getClass().getSimpleName() + ", nseed=" + seed + + ", nsamples=" + nsamples + ", nthreads=" + nthreads + + ", bufferMode=" + jnl.getBufferStrategy().getBufferMode() + + ", edges(kb)=" + nedges + ", stats(total)=" + total); return total; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2013-08-23 15:05:47
|
Revision: 7325 http://bigdata.svn.sourceforge.net/bigdata/?rev=7325&view=rev Author: martyncutcher Date: 2013-08-23 15:05:39 +0000 (Fri, 23 Aug 2013) Log Message: ----------- Remove long sleep from BounceFollower left behind after analysis needing pause form thread dump! Modified Paths: -------------- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java Modified: branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-08-23 14:34:09 UTC (rev 7324) +++ branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-08-23 15:05:39 UTC (rev 7325) @@ -588,6 +588,22 @@ * HAJournalServer needs to handle ZK client connection loss </a> */ public void testStartAB_BounceFollower() throws Exception { + doBounceFollower(); + } + + public void _testStressStartAB_BounceFollower() throws Exception { + for (int test = 0; test < 5; test++) { + try { + doBounceFollower(); + } catch (Throwable t) { + fail("Run " + test, t); + } finally { + destroyAll(); + } + } + } + + public void doBounceFollower() throws Exception { final HAGlue serverA = startA(); final HAGlue serverB = startB(); @@ -636,7 +652,7 @@ ((HAGlueTest) serverA).bounceZookeeperConnection().get(); } - Thread.sleep(100000); + // Thread.sleep(100000); // sleep to allow thread dump for analysis // Okay, is the problem that the quorum doesn't break? // assertFalse(quorum.isQuorumMet()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-23 14:34:16
|
Revision: 7324 http://bigdata.svn.sourceforge.net/bigdata/?rev=7324&view=rev Author: thompsonbry Date: 2013-08-23 14:34:09 +0000 (Fri, 23 Aug 2013) Log Message: ----------- Modified the HA CI test harness to automatically generate a thread dump (kill -QUIT) and then kill (kill -KILL) a hung child process. This was done to debug a problem with child process termination that resulted from the HAClient refactor. The same mechanism can now be used to write sure kill tests to support #530. See #728 Modified Paths: -------------- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java Modified: branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-08-23 12:58:52 UTC (rev 7323) +++ branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-08-23 14:34:09 UTC (rev 7324) @@ -81,6 +81,7 @@ import com.bigdata.jini.util.JiniUtil; import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.jini.ha.HAJournalServer.ConfigurationOptions; +import com.bigdata.journal.jini.ha.HAJournalTest.HAGlueTest; import com.bigdata.quorum.AbstractQuorumClient; import com.bigdata.quorum.AsynchronousQuorumCloseException; import com.bigdata.quorum.Quorum; @@ -129,6 +130,7 @@ private volatile HAGlue haGlue; private volatile ProcessHelper processHelper; private volatile boolean dead = false; + private volatile int childPID = 0; public ServiceListener() { @@ -695,15 +697,97 @@ log.warn("Service is down: " + t); } else { - + // Some other problem. log.error(t, t); - + + if (serviceListener != null && serviceListener.childPID != 0) { + + final int childPID = serviceListener.childPID; + + // Try to request a thread dump. + if (trySignal(SignalEnum.QUIT, childPID)) { + + // Give the child a moment to respond. + try { + Thread.sleep(2000/* ms */); + } catch (InterruptedException e) { + // Propagate the interrupt. + Thread.currentThread().interrupt(); + } + + } + + // Sure kill on the child. + trySignal(SignalEnum.KILL, childPID); + + } + } } } + + /** + * Some signals understood by Java. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + protected enum SignalEnum { + /** Heap dump. */ + INT(2,"INT"), + /** Thread dump (^C). */ + QUIT(3,"QUIT"), + /** Sure kill. */ + KILL(9,"KILL"); + private final int code; + private final String symbol; + private SignalEnum(int code, String symbol) { + this.code = code; + this.symbol = symbol; + } + public int code() { + return code; + } + public String symbol() { + return symbol; + } + }; + + /** + * Attempt to signal a process (presumably a child process). + * + * @param signalEnum + * The signal. + * @param pid + * The pid of the process. + * + * @return true unless an exception was encountered (a <code>true</code> + * return is not a sure indicator of success). + */ + protected boolean trySignal(final SignalEnum signalEnum, final int pid){ + + final String cmd = "kill -" + signalEnum.symbol() + " " + pid; + + log.warn("SIGNAL: " + cmd); + + try { + + Runtime.getRuntime().exec(cmd); + + return true; + + } catch (IOException e) { + + log.error("FAILED: " + cmd, e); + + return false; + + } + + } protected void destroyA() { safeDestroy(serverA, getServiceDirA(), serviceListenerA); @@ -1463,6 +1547,21 @@ // Set the HAGlue interface on the ServiceListener. serviceListener.setService(haGlue); + + try { + + // Have the child self-report its PID (best guess). + serviceListener.childPID = ((HAGlueTest) haGlue).getPID(); + + if (log.isInfoEnabled()) + log.info("CHILD: name=" + name + ", childPID=" + + serviceListener.childPID); + + } catch (IOException ex) { + + log.warn("Could not get childPID: " + ex, ex); + + } /* * Wait until the server is running. Modified: branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-08-23 12:58:52 UTC (rev 7323) +++ branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-08-23 14:34:09 UTC (rev 7324) @@ -27,7 +27,6 @@ package com.bigdata.journal.jini.ha; import java.io.IOException; -import java.io.Serializable; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.rmi.Remote; @@ -36,7 +35,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -52,6 +50,7 @@ import org.apache.log4j.Logger; import com.bigdata.concurrent.FutureTaskMon; +import com.bigdata.counters.PIDUtil; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; import com.bigdata.ha.QuorumService; @@ -85,7 +84,6 @@ import com.bigdata.ha.msg.IHAWriteSetStateRequest; import com.bigdata.ha.msg.IHAWriteSetStateResponse; import com.bigdata.journal.AbstractJournal; -import com.bigdata.journal.IIndexManager; import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.jini.ha.HAJournalServer.HAQuorumService; import com.bigdata.quorum.AsynchronousQuorumCloseException; @@ -165,6 +163,15 @@ * The message. */ public void log(String msg) throws IOException; + + /** + * Have the child self-report its <code>pid</code>. + * + * @return The child's PID. + * + * @see PIDUtil + */ + public int getPID() throws IOException; /** * Force the end point to enter into an error state from which it will @@ -515,6 +522,14 @@ } @Override + public int getPID() throws IOException { + + // Best guess at the child's PID. + return PIDUtil.getPID(); + + } + + @Override public Future<Void> enterErrorState() { final FutureTask<Void> ft = new FutureTaskMon<Void>( Modified: branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-08-23 12:58:52 UTC (rev 7323) +++ branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-08-23 14:34:09 UTC (rev 7324) @@ -26,6 +26,7 @@ */ package com.bigdata.journal.jini.ha; +import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -41,9 +42,7 @@ import com.bigdata.journal.AbstractTask; import com.bigdata.journal.jini.ha.HAJournalTest.HAGlueTest; import com.bigdata.journal.jini.ha.HAJournalTest.SpuriousTestException; -import com.bigdata.quorum.zk.ZKQuorum; import com.bigdata.quorum.zk.ZKQuorumImpl; -import com.bigdata.rdf.sail.webapp.client.HttpException; import com.bigdata.rdf.sail.webapp.client.RemoteRepository; import com.bigdata.util.ClocksNotSynchronizedException; import com.bigdata.util.InnerCause; @@ -779,5 +778,45 @@ } } - + + /** + * Attempt to start a service. Once it is running, request a thread dump and + * then issue a sure kill - both of these operations are done using a SIGNAL + * rather than RMI. However, the SIGNAL depends on the child PID. That is + * obtained from {@link HAGlueTest#getPID()}. + * + * @throws Exception + */ + public void testStartA_sureKill() throws Exception { + + final HAGlue serverA = startA(); + + final int pidA = ((HAGlueTest)serverA).getPID(); + + // Request thread dump of the child. + trySignal(SignalEnum.QUIT, pidA); + + // Wait for the thread dump. + Thread.sleep(2000/*ms*/); + + // Child should still be there. + assertEquals(pidA, ((HAGlueTest) serverA).getPID()); + + // Request sure kill of the child. + trySignal(SignalEnum.KILL, pidA); + + // Wait just a little bit. + Thread.sleep(100/* ms */); + + // RMI should fail (child process should be dead). + try { + ((HAGlueTest) serverA).getPID(); + fail("Expecting " + IOException.class); + } catch (IOException ex) { + if (log.isInfoEnabled()) + log.info("Ignoring expected exception: " + ex); + } + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-23 12:58:59
|
Revision: 7323 http://bigdata.svn.sourceforge.net/bigdata/?rev=7323&view=rev Author: thompsonbry Date: 2013-08-23 12:58:52 +0000 (Fri, 23 Aug 2013) Log Message: ----------- Added log @ WARN if destroying a temporary journal. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-23 12:56:10 UTC (rev 7322) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-23 12:58:52 UTC (rev 7323) @@ -427,8 +427,18 @@ } finally { - jnl.close(); + if (isTemporary) { + log.warn("Destroying temporary journal."); + + jnl.destroy(); + + } else { + + jnl.close(); + + } + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-23 12:56:18
|
Revision: 7322 http://bigdata.svn.sourceforge.net/bigdata/?rev=7322&view=rev Author: thompsonbry Date: 2013-08-23 12:56:10 +0000 (Fri, 23 Aug 2013) Log Message: ----------- improving the GASRunner environment. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/log4j.properties branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASGraphUtil.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/log4j.properties =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/log4j.properties 2013-08-23 12:05:50 UTC (rev 7321) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/log4j.properties 2013-08-23 12:56:10 UTC (rev 7322) @@ -12,7 +12,8 @@ # Loggers. # Note: logging here at INFO or DEBUG will significantly impact throughput! #log4j.logger.com.bigdata=INFO -log4j.logger.com.bigdata.rdf.graph.impl.GASEngine=INFO +#log4j.logger.com.bigdata.rdf.graph.impl.GASEngine=INFO +log4j.logger.com.bigdata.rdf.graph.impl.GASRunner=INFO #log4j.logger.com.bigdata.LRUNexus=INFO #log4j.logger.com.bigdata.rdf.sail.BigdataSail=INFO Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASGraphUtil.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASGraphUtil.java 2013-08-23 12:05:50 UTC (rev 7321) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASGraphUtil.java 2013-08-23 12:56:10 UTC (rev 7322) @@ -27,6 +27,11 @@ private static final Logger log = Logger.getLogger(GASGraphUtil.class); /** + * Return a sample (without duplicates) of vertices from the graph. + * <p> + * Note: This sampling procedure has a bias in favor of the vertices with + * the most edges and properties (vertices are choosen randomly in + * proportion to the #of edges and properties for the vertex). * * @param desiredSampleSize * The desired sample size. Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-23 12:05:50 UTC (rev 7321) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-23 12:56:10 UTC (rev 7322) @@ -38,6 +38,11 @@ private static final Logger log = Logger.getLogger(GASRunner.class); /** + * The seed used for the random number generator. + */ + private final long seed; + + /** * Random number generated used for sampling the starting vertices. */ private final Random r; @@ -208,26 +213,17 @@ /* * Assign parsed values. */ + this.seed = seed; this.nsamples = nsamples; this.nthreads = nthreads; - this.r = new Random(seed); this.namespaceOverride = namespace; this.bufferModeOverride = bufferMode; this.loadSet = loadSet.isEmpty() ? null : loadSet .toArray(new String[loadSet.size()]); - } + // Setup the random number generator. + this.r = new Random(seed); - /** - * Return a sample of random vertices. - * - * @param kb - */ - @SuppressWarnings("rawtypes") - protected IV[] getRandomSamples(final AbstractTripleStore kb) { - - return GASGraphUtil.getRandomSample(r, kb, nsamples); - } /** @@ -490,25 +486,33 @@ * The KB namespace. * @return * @throws Exception + * + * TODO What happened to the predicate summary/histogram/distribution code? + * + * TODO Are we better off using sampling based on distinct vertices or with + * a bais based on the #of edges for those vertices. */ private GASStats runAnalytic(final Journal jnl, final String namespace) throws Exception { + /* + * Use a read-only view (sampling depends on access to the BTree rather + * than the ReadCommittedIndex). + */ + final AbstractTripleStore kb = (AbstractTripleStore) jnl + .getResourceLocator() + .locate(namespace, jnl.getLastCommitTime()); + @SuppressWarnings("rawtypes") - final IV[] samples; - { - /* - * Use a read-only view (sampling depends on access to the BTree - * rather than the ReadCommittedIndex). - */ - final AbstractTripleStore kb = (AbstractTripleStore) jnl - .getResourceLocator().locate(namespace, - jnl.getLastCommitTime()); + final IV[] samples = GASGraphUtil.getRandomSample(r, kb, nsamples); - samples = getRandomSamples(kb); + // total #of edges in that graph. + final long nedges = kb.getStatementCount(); - } - + /* + * Setup and run over those samples. + */ + final IGASEngine<VS, ES, ST> gasEngine = new GASEngine<VS, ES, ST>(jnl, namespace, ITx.READ_COMMITTED, newGASProgram(), nthreads); @@ -522,12 +526,23 @@ gasEngine.init(startingVertex); // TODO Pure interface for this. - total.add((GASStats) gasEngine.call()); + final GASStats stats = (GASStats) gasEngine.call(); + total.add(stats); + + if (log.isInfoEnabled()) { + log.info("Run complete: vertex[" + i + "] of " + samples.length + + " : startingVertex=" + startingVertex + + ", stats(sample)=" + stats); + } + } - + // Total over all sampled vertices. - System.out.println("TOTAL: " + total); + System.out.println("TOTAL: nseed=" + seed + ", nsamples=" + nsamples + + ", nthreads=" + nthreads + ", bufferMode=" + + jnl.getBufferStrategy().getBufferMode() + ", edges(kb)=" + + nedges + ", stats(total)=" + total); return total; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-23 12:05:58
|
Revision: 7321 http://bigdata.svn.sourceforge.net/bigdata/?rev=7321&view=rev Author: thompsonbry Date: 2013-08-23 12:05:50 +0000 (Fri, 23 Aug 2013) Log Message: ----------- Modified the HA CI test suite to override the log level on the ProcessHelper class so that we get to see all output the child processes in the parent process log output. This let's us observe the full text of any startup failure problems for the child processes. Most of the output of the children (after startup) is directed into a named log file, so the parent will mainly report those startup messages that have not been redirected into another log file. Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/BigdataStatics.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/BigdataStatics.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/BigdataStatics.java 2013-08-22 21:45:26 UTC (rev 7320) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/BigdataStatics.java 2013-08-23 12:05:50 UTC (rev 7321) @@ -49,7 +49,7 @@ * The #of lines of output from a child process which will be echoed onto * {@link System#out} when that child process is executed. This makes it * easy to track down why a child process dies during service start. If you - * want to see more output from the child process, then you should set the + * want to see all output from the child process, then you should set the * log level for the {@link ProcessHelper} class to INFO. * <p> * Note: This needs to be more than the length of the {@link Banner} output Modified: branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2013-08-22 21:45:26 UTC (rev 7320) +++ branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2013-08-23 12:05:50 UTC (rev 7321) @@ -55,6 +55,8 @@ import org.apache.http.conn.ClientConnectionManager; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.openrdf.model.Value; import org.openrdf.query.BindingSet; import org.openrdf.query.TupleQueryResult; @@ -68,6 +70,7 @@ import com.bigdata.ha.msg.HARootBlockRequest; import com.bigdata.ha.msg.HASnapshotDigestRequest; import com.bigdata.io.TestCase3; +import com.bigdata.jini.start.process.ProcessHelper; import com.bigdata.journal.DumpJournal; import com.bigdata.journal.Journal; import com.bigdata.rdf.sail.TestConcurrentKBCreate; @@ -145,6 +148,8 @@ */ protected ClientConnectionManager ccm = null; + private Level oldProcessHelperLevel = null; + @Override protected void setUp() throws Exception { @@ -153,6 +158,19 @@ ccm = DefaultClientConnectionManagerFactory.getInstance().newInstance(); + /* + * Override the log level for the ProcessHelper to ensure that we + * observe all output from the child process in the console of the + * process running the unit test. This will allow us to observe any + * failures in the test startup. + */ + final Logger tmp = Logger.getLogger(ProcessHelper.class); + + oldProcessHelperLevel = tmp.getLevel(); + + // Must be at least INFO to see all output of the child processes. + tmp.setLevel(Level.INFO); + } @Override @@ -174,6 +192,21 @@ } + /* + * Restore the log level for the utility class that logs the + * output of the child process to its default. + */ + + if (oldProcessHelperLevel != null) { + + final Logger tmp = Logger.getLogger(ProcessHelper.class); + + tmp.setLevel(oldProcessHelperLevel); + + oldProcessHelperLevel = null; + + } + } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-22 21:45:35
|
Revision: 7320 http://bigdata.svn.sourceforge.net/bigdata/?rev=7320&view=rev Author: thompsonbry Date: 2013-08-22 21:45:26 +0000 (Thu, 22 Aug 2013) Log Message: ----------- Updated the GAS performance testing harness in preparation to some trial runs on other hardware. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/BFS.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/SSSP.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASGraphUtil.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/TestGather.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/ branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/README.txt branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/RWStore.properties branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/build.properties branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/build.xml branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/log4j.properties branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java Removed Paths: ------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/PerformanceTest.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/README.txt =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/README.txt (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/README.txt 2013-08-22 21:45:26 UTC (rev 7320) @@ -0,0 +1,22 @@ +This directory contains a setup for some Gather Apply Scatter (GAS) based +graph mining algorithms against bigdata. + +The files in this directory include: + +- build.properties - configuration properties for the ant script. + +- build.xml - an ant script which may be used to load a generated data set + a local bigdata database instance and start a SPARQL + end point for that database instance. You will then run the + benchmark against that SPARQL end point. + +- RWStore.properties - configuration properties for a bigdata database instance + suitable for BSBM and backed by the RW persistence engine + (single machine read/write bigdata database). This is the + recommended database mode for BSBM as it has significantly + better performance. + +- WORMStore.properties - configuration properties for a bigdata database instance + suitable for BSBM and backed by the WORM persistence + engine (single machine write once, read many bigdata + database). Copied: branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/RWStore.properties (from rev 7305, branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties) =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/RWStore.properties (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/RWStore.properties 2013-08-22 21:45:26 UTC (rev 7320) @@ -0,0 +1,35 @@ +# +# Note: These options are applied when the journal and the triple store are +# first created. + +## +## Journal options. +## + +# The backing file. This contains all your data. You want to put this someplace +# safe. The default locator will wind up in the directory from which you start +# your servlet container. +com.bigdata.journal.AbstractJournal.file=bigdata.jnl + +# The persistence engine. Use 'Disk' for the WORM or 'DiskRW' for the RWStore. +com.bigdata.journal.AbstractJournal.bufferMode=DiskRW +#com.bigdata.journal.AbstractJournal.bufferMode=MemStore + +com.bigdata.btree.writeRetentionQueue.capacity=4000 +com.bigdata.btree.BTree.branchingFactor=128 + +# 200M initial extent. +com.bigdata.journal.AbstractJournal.initialExtent=209715200 +com.bigdata.journal.AbstractJournal.maximumExtent=209715200 + +## +## Setup for QUADS mode without the full text index. +## +com.bigdata.rdf.sail.truthMaintenance=false +com.bigdata.rdf.store.AbstractTripleStore.quads=true +com.bigdata.rdf.store.AbstractTripleStore.statementIdentifiers=false +com.bigdata.rdf.store.AbstractTripleStore.textIndex=false +com.bigdata.rdf.store.AbstractTripleStore.axiomsClass=com.bigdata.rdf.axioms.NoAxioms +#com.bigdata.rdf.store.AbstractTripleStore.inlineDateTimes=true + +com.bigdata.rdf.rio.RDFParserOptions.stopAtFirstError=false \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/build.properties =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/build.properties (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/build.properties 2013-08-22 21:45:26 UTC (rev 7320) @@ -0,0 +1,106 @@ +# ant build properties. + +# the bigdata base directory +bigdata.dir=../.. + +# Where to find the pre-build bigdata classes. +bigdata.build.dir=${bigdata.dir}/ant-build + +bigdata.install.lib.dir=${bigdata.dir}/ + +# TODO This is ignored. Parameterize the test to use the specified data file. +#rdf.data.dir=${bigdata.dir}/bigdata-rdf/src/resources/data/foaf + +# TODO Parameterize the test to use the specified parallelism/strategy. +nparallel=8 + +## +# javac options +## + +# debug=on|off +javac.debug=off +# debuglevel=lines,vars,source (or any combination thereof). +javac.debuglevel=lines,vars,source +javac.verbose=off +#javac.target=1.6 +#javac.source=1.6 +javac.encoding=Cp1252 + +# GAS properties. +# +# Note: By default, the files will wind up in ./ant-build/bin +# +# Note: By degault, the server jvm will optimize for throughput and can have +# high variation in throughput due to long GC pauses for larger heaps. You can +# use the CMS-I GC mode to minimize latency at the expense of some throughput. + +# The port at which the NanoSparqlServer will respond (if started). +nanoServerPort=80 + +# The maximum size of the java heap. +maxMem=4g + +# The namespace of the KB instance (multiple KBs can be in the same database). +namespace=kb + +# The name of the file used to configure the Journal. +journalPropertyFile=RWStore.properties + +# The name of the file used for the journal. +bufferMode=DiskRW +#bufferMode=MemStore + +# files to load. +#load=-load xxx -load yyy +load= + +# The #of threads to use for GATHER and SCATTER +nthreads=4 + +# The #of random starting vertices to select. +nsamples=1000 + +# The seed for the random number generator. +seed=217 + +# BFS, SSSP, etc. Will run corresponding XXX class. +analytic=BFS + +# +# Profiler parameters. +# + +# No profiler. +profilerAgent= +# linux-64 +#profilerAgent=-agentpath:/usr/java/yjp-9.0.3/bin/linux-x86-64/libyjpagent.so +# Windows +#profilerAgent="-agentpath:C:/Program Files/YourKit Java Profiler 9.0.2/bin/win32/yjpagent.dll" +# Windows Server 2008 +#profilerAgent="-agentpath:C:/Program Files (x86)/YourKit Java Profiler 9.0.4/bin/win64/yjpagent.dll" + +# No profiler. +profilerAgentOptions= +# all profiling initially disabled. +#profilerAgentOptions=-agentlib:yjpagent=disableexceptiontelemetry,disablestacktelemetry + +profiler=${profilerAgent} ${profilerAgentOptions} + +# Configure GC. +gcopts= +#gcopts=-verbose:gc +#gcopts=-XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode +#gcopts=-XX:+UseParallelOldGC +#gcopts=-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC + +# Generates detailed logging on the JVM GC behavior. The service will +# start in the configured service directory, so the log file will be in +# that directory as well. The service directory is typically on local +# disk, so that is where you need to look for this file. +gcdebug= +#gcdebug=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:jvm_gc.log + +# all jvm args for query. +jvmArgs=-server -Xmx${maxMem} -showversion ${gcopts} ${gcdebug} ${profiler} -Dlog4j.configuration=file:log4j.properties +# -Dlog4j.debug Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/build.xml =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/build.xml (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/build.xml 2013-08-22 21:45:26 UTC (rev 7320) @@ -0,0 +1,44 @@ +<!-- $Id: build.xml 5779 2011-12-14 18:17:54Z thompsonbry $ --> +<!-- --> +<!-- do "ant bundle-jar" in the parent directory first. --> +<!-- --> +<project name="gas" basedir="."> + + <property file="build.properties" /> + + <path id="runtime.classpath"> + <!-- The bigdata dependencies (for the nano-server). --> + <fileset dir="${bigdata.build.dir}/lib"> + <include name="**/*.jar" /> + </fileset> + </path> + + <target name="start-sparql-server" + description="Start a small http server fronting for a bigdata database instance."> + <java classname="com.bigdata.rdf.sail.webapp.NanoSparqlServer" + fork="true" failonerror="true" + > + <arg line="${nanoServerPort} ${namespace} ${journalPropertyFile}" /> + <!-- specify/override the journal file name. --> + <jvmarg line="${jvmArgs}" /> + <classpath> + <path refid="runtime.classpath" /> + </classpath> + </java> + </target> + + <target name="run-gas-engine" + description="Run a GAS Engine performance test."> + <java classname="com.bigdata.rdf.graph.analytics.${analytic}" + fork="true" failonerror="true" + > + <arg line="-bufferMode ${bufferMode} -namespace ${namespace} -seed ${seed} -nsamples ${nsamples} -nthreads ${nthreads} ${journalPropertyFile}" /> + <!-- specify/override the journal file name. --> + <jvmarg line="${jvmArgs}" /> + <classpath> + <path refid="runtime.classpath" /> + </classpath> + </java> + </target> + +</project> Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/log4j.properties =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/log4j.properties (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-perf/gas/log4j.properties 2013-08-22 21:45:26 UTC (rev 7320) @@ -0,0 +1,76 @@ +# Default log4j configuration. See the individual classes for the +# specific loggers, but generally they are named for the class in +# which they are defined. + +# Default log4j configuration for testing purposes. +# +# You probably want to set the default log level to ERROR. +# +log4j.rootCategory=WARN, dest1 +#log4j.rootCategory=WARN, dest2 + +# Loggers. +# Note: logging here at INFO or DEBUG will significantly impact throughput! +#log4j.logger.com.bigdata=INFO +log4j.logger.com.bigdata.rdf.graph.impl.GASEngine=INFO + +#log4j.logger.com.bigdata.LRUNexus=INFO +#log4j.logger.com.bigdata.rdf.sail.BigdataSail=INFO +#log4j.logger.com.bigdata.rdf.sail.BigdataEvaluationStrategyImpl3=INFO +#log4j.logger.com.bigdata.rdf.sail.webapp.NanoSparqlServer=INFO +#log4j.logger.com.bigdata.rdf.sail.webapp.BigdataRDFServletContextListener=INFO +#log4j.logger.com.bigdata.relation.accesspath.BlockingBuffer=ERROR +#log4j.logger.com.bigdata.rdf.store.DataLoader=INFO + +# dest1 +log4j.appender.dest1=org.apache.log4j.ConsoleAppender +log4j.appender.dest1.layout=org.apache.log4j.PatternLayout +log4j.appender.dest1.layout.ConversionPattern=%-5p: %F:%L: %m%n +#log4j.appender.dest1.layout.ConversionPattern=%-5p: %r %l: %m%n +#log4j.appender.dest1.layout.ConversionPattern=%-5p: %m%n +#log4j.appender.dest1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +#log4j.appender.dest1.layout.ConversionPattern=%-4r(%d) [%t] %-5p %c(%l:%M) %x - %m%n + +# dest2 includes the thread name and elapsed milliseconds. +# Note: %r is elapsed milliseconds. +# Note: %t is the thread name. +# See http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html +log4j.appender.dest2=org.apache.log4j.ConsoleAppender +log4j.appender.dest2.layout=org.apache.log4j.PatternLayout +log4j.appender.dest2.layout.ConversionPattern=%-5p: %r %X{hostname} %X{serviceUUID} %X{taskname} %X{timestamp} %X{resources} %t %l: %m%n + +## +# Rule execution log. This is a formatted log file (comma delimited). +#log4j.logger.com.bigdata.relation.rule.eval.RuleLog=INFO,ruleLog +log4j.additivity.com.bigdata.relation.rule.eval.RuleLog=false +log4j.appender.ruleLog=org.apache.log4j.FileAppender +log4j.appender.ruleLog.Threshold=ALL +log4j.appender.ruleLog.File=rules.log +log4j.appender.ruleLog.Append=true +log4j.appender.ruleLog.BufferedIO=true +log4j.appender.ruleLog.layout=org.apache.log4j.PatternLayout +log4j.appender.ruleLog.layout.ConversionPattern=%m + +## +# Summary query evaluation log (tab delimited file). Uncomment the next line to enable. +#log4j.logger.com.bigdata.bop.engine.QueryLog=INFO,queryLog +log4j.additivity.com.bigdata.bop.engine.QueryLog=false +log4j.appender.queryLog=org.apache.log4j.FileAppender +log4j.appender.queryLog.Threshold=ALL +log4j.appender.queryLog.File=queryLog.csv +log4j.appender.queryLog.Append=true +log4j.appender.queryLog.BufferedIO=true +log4j.appender.queryLog.layout=org.apache.log4j.PatternLayout +log4j.appender.queryLog.layout.ConversionPattern=%m + +## +# BOp run state trace (tab delimited file). Uncomment the next line to enable. +#log4j.logger.com.bigdata.bop.engine.RunState$TableLog=INFO,queryRunStateLog +log4j.additivity.com.bigdata.bop.engine.RunState$TableLog=false +log4j.appender.queryRunStateLog=org.apache.log4j.FileAppender +log4j.appender.queryRunStateLog.Threshold=ALL +log4j.appender.queryRunStateLog.File=queryRunState.log +log4j.appender.queryRunStateLog.Append=true +log4j.appender.queryRunStateLog.BufferedIO=true +log4j.appender.queryRunStateLog.layout=org.apache.log4j.PatternLayout +log4j.appender.queryRunStateLog.layout.ConversionPattern=%m Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/BFS.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2013-08-22 19:21:47 UTC (rev 7319) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2013-08-22 21:45:26 UTC (rev 7320) @@ -4,6 +4,7 @@ import com.bigdata.rdf.graph.Factory; import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASProgram; +import com.bigdata.rdf.graph.impl.GASRunner; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.spo.ISPO; @@ -185,4 +186,22 @@ } + /** + * Performance testing harness. + */ + public static void main(final String[] args) throws Exception { + + new GASRunner<BFS.VS, BFS.ES, Void>(args) { + + @Override + protected IGASProgram<BFS.VS, BFS.ES, Void> newGASProgram() { + + return new BFS(); + + } + + }.call(); + + } + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/SSSP.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-08-22 19:21:47 UTC (rev 7319) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-08-22 21:45:26 UTC (rev 7320) @@ -7,6 +7,7 @@ import com.bigdata.rdf.graph.GASUtil; import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASProgram; +import com.bigdata.rdf.graph.impl.GASRunner; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.spo.ISPO; @@ -256,6 +257,24 @@ * {@link ISPO#s()}. The remote vertex is {@link ISPO#o()}. * <p> * {@inheritDoc} + * + * FIXME Test both variations on a variety of data sets and see which is + * better: + * + * <p> + * Zhisong wrote: In the original GASengine, the scatter operator only need + * to access the status of the source: src.changes. + * + * To check the status of destination, it needs to load destination data: + * dst.dist and edge data: e. And then check if new dist is different from + * the old value. + * + * Bryan wrote: I will have to think about this more. It sounds like it + * depends on the fan-out of the scatter at time t versus the fan-in of the + * gather at time t+1. The optimization might only benefit if a reasonable + * fraction of the destination vertices wind up NOT being retriggered. I + * will try on these variations in the Java code as well. * + * </p> */ @Override public void scatter(final IGASContext<SSSP.VS, SSSP.ES, Integer> ctx, @@ -291,4 +310,22 @@ } + /** + * Performance test harness. + */ + public static void main(final String[] args) throws Exception { + + new GASRunner<SSSP.VS, SSSP.ES, Integer>(args) { + + @Override + protected IGASProgram<SSSP.VS, SSSP.ES, Integer> newGASProgram() { + + return new SSSP(); + + } + + }.call(); + + } + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-08-22 19:21:47 UTC (rev 7319) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-08-22 21:45:26 UTC (rev 7320) @@ -304,7 +304,7 @@ * </pre> * Parameterize execution runs againt these runtime options! */ - private final int nparallel = 1; + private final int nthreads; /** * @@ -333,7 +333,8 @@ * through the history index). */ public GASEngine(final IIndexManager indexManager, final String namespace, - final long timestamp, final IGASProgram<VS, ES, ST> program) { + final long timestamp, final IGASProgram<VS, ES, ST> program, + final int nthreads) { if (indexManager == null) throw new IllegalArgumentException(); @@ -341,6 +342,9 @@ if (program == null) throw new IllegalArgumentException(); + if (nthreads <= 0) + throw new IllegalArgumentException(); + this.indexManager = indexManager; this.namespace = namespace; @@ -349,6 +353,8 @@ this.program = program; + this.nthreads = nthreads; + this.executorService = indexManager.getExecutorService(); this.vsf = program.getVertexStateFactory(); @@ -926,11 +932,11 @@ private Callable<Long> newFrontierStrategy( final VertexTaskFactory<Long> taskFactory) { - if (nparallel == 1) + if (nthreads == 1) return new RunInCallersThreadFrontierStrategy(taskFactory); return new LatchedExecutorFrontierStrategy(taskFactory, - executorService, nparallel); + executorService, nthreads); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASGraphUtil.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASGraphUtil.java 2013-08-22 19:21:47 UTC (rev 7319) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASGraphUtil.java 2013-08-22 21:45:26 UTC (rev 7320) @@ -81,22 +81,23 @@ @SuppressWarnings("rawtypes") public static IV getRandomVertex(final Random r, final AbstractTripleStore kb) { + /* + * TODO This assumes a local, non-sharded index. The specific + * approach to identifying a starting vertex relies on the + * ILinearList API. If the caller is specifying the starting vertex + * then we do not need to do this. + * + * TODO The bias here is towards vertices having more out-edges + * and/or attributes since the sample is uniform over the triples in + * the index and a triple may be either an edge or an attribute + * value (or a link attribute using RDR). + */ + final BTree ndx = (BTree) kb.getSPORelation().getPrimaryIndex(); + // Select a random starting vertex. IV startingVertex = null; { - /* - * TODO This assumes a local, non-sharded index. The specific - * approach to identifying a starting vertex relies on the - * ILinearList API. If the caller is specifying the starting vertex - * then we do not need to do this. - * - * TODO The bias here is towards vertices having more out-edges - * and/or attributes since the sample is uniform over the triples in - * the index and a triple may be either an edge or an attribute - * value (or a link attribute using RDR). - */ - final BTree ndx = (BTree) kb.getSPORelation().getPrimaryIndex(); - + // Truncate at MAX_INT. final int size = (int) Math .min(ndx.rangeCount(), Integer.MAX_VALUE); @@ -143,8 +144,9 @@ } if (startingVertex == null) - throw new RuntimeException("No starting vertex"); - + throw new RuntimeException("No starting vertex: nedges=" + + ndx.rangeCount()); + return startingVertex; } Copied: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java (from rev 7305, branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/PerformanceTest.java) =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASRunner.java 2013-08-22 21:45:26 UTC (rev 7320) @@ -0,0 +1,536 @@ +package com.bigdata.rdf.graph.impl; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.LinkedHashSet; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.Callable; + +import org.apache.log4j.Logger; +import org.openrdf.rio.RDFFormat; + +import com.bigdata.Banner; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; +import com.bigdata.rdf.graph.IGASEngine; +import com.bigdata.rdf.graph.IGASProgram; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.rio.LoadStats; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.store.AbstractTripleStore; + +/** + * Base class for running performance tests. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * + * TODO Need a different driver if the algorithm always visits all vertices. + */ +abstract public class GASRunner<VS, ES, ST> implements Callable<GASStats> { + + private static final Logger log = Logger.getLogger(GASRunner.class); + + /** + * Random number generated used for sampling the starting vertices. + */ + private final Random r; + + /** + * The #of random starting vertices to use. + */ + private final int nsamples; + + /** + * The #of threads to use for GATHER and SCATTER operators. + */ + private final int nthreads; + + /** + * The property file + */ + private final String propertyFile; + + /** + * When non-<code>null</code>, this overrides the current buffer mode. + */ + private final BufferMode bufferModeOverride; + + /** + * When non-<code>null</code>, this overrides the KB namespace. + */ + private final String namespaceOverride; + + /** + * When non-<code>null</code>, a list of zero or more resources to be + * loaded. The resources will be searched for as URLs, on the CLASSPATH, and + * in the file system. + */ + private final String[] loadSet; + + /** + * Print the optional message on stderr, print the usage information on + * stderr, and then force the program to exit with the given status code. + * + * @param status + * The status code. + * @param msg + * The optional message + */ + private static void usage(final int status, final String msg) { + + if (msg != null) { + + System.err.println(msg); + + } + + System.err.println("[options] propertyFile"); + + System.exit(status); + + } + + /** + * Run a GAS analytic against some data set. + * + * @param args + * USAGE:<br/> + * <code>(options) propertyFile</code> + * <p> + * <i>Where:</i> + * <dl> + * <dt>propertyFile</dt> + * <dd>A java properties file for a standalone {@link Journal}.</dd> + * </dl> + * and <i>options</i> are any of: + * <dl> + * <dt>-nthreads</dt> + * <dd>The #of threads which will be used for GATHER and SCATTER + * operations.</dd> + * <dt>-nsamples</dt> + * <dd>The #of random sample starting vertices that will be + * selected. The algorithm will be run ONCE for EACH sampled + * vertex.</dd> + * <dt>-seed</dt> + * <dd>The seed for the random number generator (default is + * <code>217L</code>).</dd> + * <dt>-namespace</dt> + * <dd>The namespace of the default SPARQL endpoint (the + * namespace will be <code>kb</code> if none was specified when + * the triple/quad store was created).</dd> + * <dt>-load</dt> + * <dd>Loads the named resource. This option may appear multiple + * times. The resources will be searched for as URLs, on the + * CLASSPATH, and in the file system.</dd> + * <dt>-bufferMode</dt> + * <dd>Overrides the {@link BufferMode} (if any) specified in the + * <code>propertyFile</code>.</dd> + * </p> + */ + public GASRunner(final String[] args) { + + Banner.banner(); + + long seed = 217L; + int nsamples = 100; + int nthreads = 4; + BufferMode bufferMode = null; // override only. + String namespace = "kb"; + // Set of files to load (optional). + LinkedHashSet<String> loadSet = new LinkedHashSet<String>(); + + /* + * Handle all arguments starting with "-". These should appear before + * any non-option arguments to the program. + */ + int i = 0; + while (i < args.length) { + final String arg = args[i]; + if (arg.startsWith("-")) { + if (arg.equals("-seed")) { + seed = Long.valueOf(args[++i]); + } else if (arg.equals("-nsamples")) { + final String s = args[++i]; + nsamples = Integer.valueOf(s); + if (nsamples <= 0) { + usage(1/* status */, + "-nsamples must be positive, not: " + s); + } + } else if (arg.equals("-nthreads")) { + final String s = args[++i]; + nthreads = Integer.valueOf(s); + if (nthreads < 0) { + usage(1/* status */, + "-nthreads must be non-negative, not: " + s); + } + } else if (arg.equals("-bufferMode")) { + final String s = args[++i]; + bufferMode = BufferMode.valueOf(s); + } else if (arg.equals("-namespace")) { + final String s = args[++i]; + namespace = s; + } else if (arg.equals("-load")) { + final String s = args[++i]; + loadSet.add(s); + } else { + usage(1/* status */, "Unknown argument: " + arg); + } + } else { + break; + } + i++; + } + + /* + * Check for the remaining (required) argument(s). + */ + final int nremaining = args.length - i; + if (nremaining != 1) { + /* + * There are either too many or too few arguments remaining. + */ + usage(1/* status */, nremaining < 1 ? "Too few arguments." + : "Too many arguments"); + } + + /* + * Property file. + */ + this.propertyFile = args[i++]; + + /* + * Assign parsed values. + */ + this.nsamples = nsamples; + this.nthreads = nthreads; + this.r = new Random(seed); + this.namespaceOverride = namespace; + this.bufferModeOverride = bufferMode; + this.loadSet = loadSet.isEmpty() ? null : loadSet + .toArray(new String[loadSet.size()]); + + } + + /** + * Return a sample of random vertices. + * + * @param kb + */ + @SuppressWarnings("rawtypes") + protected IV[] getRandomSamples(final AbstractTripleStore kb) { + + return GASGraphUtil.getRandomSample(r, kb, nsamples); + + } + + /** + * Return the {@link IGASProgram} to be evaluated. + */ + abstract protected IGASProgram<VS, ES, ST> newGASProgram(); + + private Properties getProperties(final String resource) throws IOException { + + if (log.isInfoEnabled()) + log.info("Reading properties: " + resource); + + InputStream is = null; + try { + + // try the classpath + is = getClass().getResourceAsStream(resource); + + if (is != null) { + + } else { + + // try file system. + final File file = new File(resource); + + if (file.exists()) { + + is = new FileInputStream(file); + + } else { + + throw new IOException("Could not locate resource: " + + resource); + + } + + } + + /* + * Obtain a buffered reader on the input stream. + */ + + final Properties properties = new Properties(); + + final Reader reader = new BufferedReader(new InputStreamReader(is)); + + try { + + properties.load(reader); + + } finally { + + try { + + reader.close(); + + } catch (Throwable t) { + + log.error(t); + + } + + } + + return properties; + + } finally { + + if (is != null) { + + try { + + is.close(); + + } catch (Throwable t) { + + log.error(t); + + } + + } + + } + + } + + /** + * Run the test. + * <p> + * This provides a safe pattern for either loading data into a temporary + * journal, which is then destroyed, or using an exiting journal and + * optionally loading in some data set. When we load the data the journal is + * destroyed afterwards and when the journal is pre-existing and we neither + * load the data nor destroy the journal. This has to do with the effective + * BufferMode (if transient) and whether the file is specified and whether a + * temporary file is created (CREATE_TEMP_FILE). If we do our own file + * create if the effective buffer mode is non-transient, then we can get all + * this information. + */ + public GASStats call() throws Exception { + + final Properties properties = getProperties(propertyFile); + + /* + * Note: Allows override through the command line argument. The default + * is otherwise the default and the value in the properties file (if + * any) will be used unless it is overridden. + */ + final BufferMode bufferMode = this.bufferModeOverride == null ? BufferMode + .valueOf(properties.getProperty(Journal.Options.BUFFER_MODE, + Journal.Options.DEFAULT_BUFFER_MODE)) : this.bufferModeOverride; + + final boolean isTransient = !bufferMode.isStable(); + + final boolean isTemporary; + if (isTransient) { + + isTemporary = true; + + } else { + + final String fileStr = properties.getProperty(Journal.Options.FILE); + + if (fileStr == null) { + + /* + * We will use a temporary file that we create here. The journal + * will be destroyed below. + */ + isTemporary = true; + + final File tmpFile = File.createTempFile( + GASRunner.class.getSimpleName(), Journal.Options.JNL); + + // Set this on the Properties so it will be used by the jnl. + properties.setProperty(Journal.Options.FILE, + tmpFile.getAbsolutePath()); + + } else { + + // real file is named. + isTemporary = false; + + } + + } + + // The effective KB name. + final String namespace = this.namespaceOverride == null ? properties + .getProperty(BigdataSail.Options.NAMESPACE, + BigdataSail.Options.DEFAULT_NAMESPACE) : this.namespaceOverride; + + /* + * TODO Could start NSS and use SPARQL UPDATE "LOAD" to load the data. + * That exposes the SPARQL end point for other purposes during the test. + * Is this useful? It could also let us run the GASEngine on a remote + * service (submit a callable to an HA server or define a REST API for + * submitting these GAS algorithms). + */ + final Journal jnl = new Journal(properties); + + try { + + // Locate/create KB. + { + final AbstractTripleStore kb; + if (isTemporary) { + + kb = BigdataSail.createLTS(jnl, properties); + + } else { + + final AbstractTripleStore tmp = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, + ITx.UNISOLATED); + + if (tmp == null) { + + // create. + kb = BigdataSail.createLTS(jnl, properties); + + } else { + + kb = tmp; + + } + + } + } + + /* + * Load data sets. + */ + if (isTemporary || (loadSet != null && loadSet.length > 0)) { + + loadFiles(jnl, namespace, loadSet); + + } + + return runAnalytic(jnl, namespace); + + } finally { + + jnl.close(); + + } + + } + + /** + * Load files into the journal. + * + * @param jnl + * The journal. + * @param namespace + * The KB namespace. + * @param loadSet + * The files. + * @throws IOException + */ + private LoadStats loadFiles(final Journal jnl, final String namespace, + final String[] loadSet) throws IOException { + + // final String path = "bigdata-rdf/src/resources/data/foaf"; + // final String dataFile[] = new String[] {// + // path + "/data-0.nq.gz",// + // path + "/data-1.nq.gz",// + // path + "/data-2.nq.gz",// + // path + "/data-3.nq.gz",// + // }; + final String baseUrl[] = new String[loadSet.length]; + for (int i = 0; i < loadSet.length; i++) { + baseUrl[i] = "file:" + loadSet[i]; + } + // fall back RDFFormat. + final RDFFormat[] rdfFormat = new RDFFormat[loadSet.length]; + for (int i = 0; i < loadSet.length; i++) { + rdfFormat[i] = RDFFormat.RDFXML; + } + + // Load data using the unisolated view. + final AbstractTripleStore kb = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, ITx.UNISOLATED); + + final LoadStats stats = kb.getDataLoader().loadData(loadSet, baseUrl, + rdfFormat); + + System.out.println(stats.toString()); + + return stats; + + } + + /** + * Run the analytic. + * + * @param jnl + * The journal. + * @param namespace + * The KB namespace. + * @return + * @throws Exception + */ + private GASStats runAnalytic(final Journal jnl, final String namespace) + throws Exception { + + @SuppressWarnings("rawtypes") + final IV[] samples; + { + /* + * Use a read-only view (sampling depends on access to the BTree + * rather than the ReadCommittedIndex). + */ + final AbstractTripleStore kb = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, + jnl.getLastCommitTime()); + + samples = getRandomSamples(kb); + + } + + final IGASEngine<VS, ES, ST> gasEngine = new GASEngine<VS, ES, ST>(jnl, + namespace, ITx.READ_COMMITTED, newGASProgram(), nthreads); + + final GASStats total = new GASStats(); + + for (int i = 0; i < samples.length; i++) { + + @SuppressWarnings("rawtypes") + final IV startingVertex = samples[i]; + + gasEngine.init(startingVertex); + + // TODO Pure interface for this. + total.add((GASStats) gasEngine.call()); + + } + + // Total over all sampled vertices. + System.out.println("TOTAL: " + total); + + return total; + + } + +} Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2013-08-22 19:21:47 UTC (rev 7319) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2013-08-22 21:45:26 UTC (rev 7320) @@ -24,12 +24,9 @@ package com.bigdata.rdf.graph.analytics; import com.bigdata.journal.ITx; -import com.bigdata.journal.Journal; import com.bigdata.rdf.graph.AbstractGraphTestCase; import com.bigdata.rdf.graph.IGASEngine; -import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.graph.impl.GASEngine; -import com.bigdata.rdf.graph.impl.PerformanceTest; /** * Test class for Breadth First Search (BFS) traversal. @@ -54,7 +51,7 @@ final IGASEngine<BFS.VS, BFS.ES, Void> gasEngine = new GASEngine<BFS.VS, BFS.ES, Void>( sail.getDatabase().getIndexManager(), sail.getDatabase() - .getNamespace(), ITx.READ_COMMITTED, new BFS()); + .getNamespace(), ITx.READ_COMMITTED, new BFS(), 1/* nthreads */); // Initialize the froniter. gasEngine.init(p.mike.getIV()); @@ -76,23 +73,4 @@ } - /** - * Test routine to running against a {@link Journal} in which some data set - * has already been loaded. - */ - public static void main(final String[] args) throws Exception { - - new PerformanceTest<BFS.VS, BFS.ES, Void>(args) { - - @Override - protected IGASProgram<BFS.VS, BFS.ES, Void> newGASProgram() { - - return new BFS(); - - } - - }.call(); - - } - } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java 2013-08-22 19:21:47 UTC (rev 7319) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java 2013-08-22 21:45:26 UTC (rev 7320) @@ -24,12 +24,9 @@ package com.bigdata.rdf.graph.analytics; import com.bigdata.journal.ITx; -import com.bigdata.journal.Journal; import com.bigdata.rdf.graph.AbstractGraphTestCase; import com.bigdata.rdf.graph.IGASEngine; -import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.graph.impl.GASEngine; -import com.bigdata.rdf.graph.impl.PerformanceTest; /** * Test class for SSP traversal. @@ -52,7 +49,7 @@ final IGASEngine<SSSP.VS, SSSP.ES, Integer> gasEngine = new GASEngine<SSSP.VS, SSSP.ES, Integer>( sail.getDatabase().getIndexManager(), sail.getDatabase() - .getNamespace(), ITx.READ_COMMITTED, new SSSP()); + .getNamespace(), ITx.READ_COMMITTED, new SSSP(), 1/* nthreads */); // Initialize the froniter. gasEngine.init(p.mike.getIV()); @@ -74,23 +71,4 @@ } - /** - * Test routine to running against a {@link Journal} in which some data set - * has already been loaded. - */ - public static void main(final String[] args) throws Exception { - - new PerformanceTest<SSSP.VS, SSSP.ES, Integer>(args) { - - @Override - protected IGASProgram<SSSP.VS, SSSP.ES, Integer> newGASProgram() { - - return new SSSP(); - - } - - }.call(); - - } - } Deleted: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/PerformanceTest.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/PerformanceTest.java 2013-08-22 19:21:47 UTC (rev 7319) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/PerformanceTest.java 2013-08-22 21:45:26 UTC (rev 7320) @@ -1,342 +0,0 @@ -package com.bigdata.rdf.graph.impl; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Callable; - -import org.apache.log4j.Logger; -import org.openrdf.rio.RDFFormat; - -import com.bigdata.journal.BufferMode; -import com.bigdata.journal.ITx; -import com.bigdata.journal.Journal; -import com.bigdata.rdf.graph.IGASEngine; -import com.bigdata.rdf.graph.IGASProgram; -import com.bigdata.rdf.internal.IV; -import com.bigdata.rdf.sail.BigdataSail; -import com.bigdata.rdf.store.AbstractTripleStore; - -/** - * Base class for running performance tests. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * - * TODO Need a different driver if the algorithm always visits all vertices. - * - * TODO Parameterize for sampling of vertices. - */ -abstract public class PerformanceTest<VS, ES, ST> implements Callable<Void> { - - private static final Logger log = Logger.getLogger(PerformanceTest.class); - - private final String[] args; - private final Random r; - - /* - * TODO args[] are ignored. Should parse for [nsamples], [seed], the data - * file (to load when run), memstore (must load when run) verus RWStore (for - * preexisting data). - */ - public PerformanceTest(final String[] args) { - - this.args = args; - - this.r = new Random(seed()); - - } - - protected long seed() { - - return 217L; - - } - - protected int sampleSize() { - - return 100; - - } - - /** - * - * @param kb - */ - protected IV[] getRandomSamples(final AbstractTripleStore kb) { - - return GASGraphUtil.getRandomSample(r, kb, sampleSize()); - - } - - /** - * Return the {@link IGASProgram} to be evaluated. - */ - abstract protected IGASProgram<VS, ES, ST> newGASProgram(); - - private Properties getProperties(final String resource) throws IOException { - - if (log.isInfoEnabled()) - log.info("Reading properties: " + resource); - - InputStream is = null; - try { - - // try the classpath - is = getClass().getResourceAsStream(resource); - - if (is != null) { - - } else { - - // try file system. - final File file = new File(resource); - - if (file.exists()) { - - is = new FileInputStream(file); - - } else { - - throw new IOException("Could not locate resource: " + resource); - - } - - } - - /* - * Obtain a buffered reader on the input stream. - */ - - final Properties properties = new Properties(); - - final Reader reader = new BufferedReader(new InputStreamReader(is)); - - try { - - properties.load(reader); - - } finally { - - try { - - reader.close(); - - } catch (Throwable t) { - - log.error(t); - - } - - } - - return properties; - - } finally { - - if (is != null) { - - try { - - is.close(); - - } catch (Throwable t) { - - log.error(t); - - } - - } - - } - - } - - /** - * Run the test. - */ - public Void call() throws Exception { - - /* - * The property file - * - * TODO Use different files for MemStore and RWStore (command line arg) - */ - final String propertyFile = "bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties"; - - final Properties properties = getProperties(propertyFile); - - final BufferMode bufferMode = BufferMode.valueOf(properties - .getProperty(Journal.Options.BUFFER_MODE, - Journal.Options.DEFAULT_BUFFER_MODE)); - - final boolean isTransient = !bufferMode.isStable(); - - final boolean isTemporary; - if (isTransient) { - - isTemporary = true; - - } else { - - final String fileStr = properties.getProperty(Journal.Options.FILE); - - if (fileStr == null) { - - /* - * We will use a temporary file that we create here. The journal - * will be destroyed below. - */ - isTemporary = true; - - final File tmpFile = File.createTempFile( - PerformanceTest.class.getSimpleName(), - Journal.Options.JNL); - - // Set this on the Properties so it will be used by the jnl. - properties.setProperty(Journal.Options.FILE, - tmpFile.getAbsolutePath()); - - } else { - - // real file is named. - isTemporary = false; - - } - - } - - // The effective KB name. - final String namespace = properties.getProperty( - BigdataSail.Options.NAMESPACE, - BigdataSail.Options.DEFAULT_NAMESPACE); - - /* - * TODO Could start NSS and use SPARQL UPDATE "LOAD" to load the data. - * That exposes the SPARQL end point for other purposes during the test. - * Is this useful? It could also let us run the GASEngine on a remote - * service (submit a callable to an HA server or define a REST API for - * submitting these GAS algorithms). - * - * TODO we need a safe pattern for when we load the data the journal is - * destroyed afterwards and when the journal is pre-existing and we - * neither load the data nor destroy the journal. This has to do with - * the effective BufferMode (if transient) and whether the file is - * specified and whether a temporary file is created (CREATE_TEMP_FILE). - * If we do our own file create if the effective buffer mode is - * non-transient, then we can get all this information. - */ - final Journal jnl = new Journal(properties); - - try { - - // Locate/create KB. - { - final AbstractTripleStore kb; - if (isTemporary) { - - kb = BigdataSail.createLTS(jnl, properties); - - } else { - - final AbstractTripleStore tmp = (AbstractTripleStore) jnl - .getResourceLocator().locate(namespace, - ITx.UNISOLATED); - - if (tmp == null) { - - // create. - kb = BigdataSail.createLTS(jnl, properties); - - } else { - - kb = tmp; - - } - - } - } - - /* - * Load data sets. - */ - if (isTemporary) { - - final String path = "bigdata-rdf/src/resources/data/foaf"; - final String dataFile[] = new String[] {// - path + "/data-0.nq.gz",// - path + "/data-1.nq.gz",// - path + "/data-2.nq.gz",// - path + "/data-3.nq.gz",// - }; - final String baseUrl[] = new String[dataFile.length]; - for (int i = 0; i < dataFile.length; i++) { - baseUrl[i] = "file:" + dataFile[i]; - } - final RDFFormat[] rdfFormat = new RDFFormat[] {// - RDFFormat.NQUADS,// - RDFFormat.NQUADS,// - RDFFormat.NQUADS,// - RDFFormat.NQUADS,// - }; - - // Load data using the unisolated view. - final AbstractTripleStore kb = (AbstractTripleStore) jnl - .getResourceLocator().locate(namespace, ITx.UNISOLATED); - - kb.getDataLoader().loadData(dataFile, baseUrl, rdfFormat); - - } - - final IGASEngine<VS, ES, ST> gasEngine = new GASEngine<VS, ES, ST>( - jnl, namespace, ITx.READ_COMMITTED, newGASProgram()); - - @SuppressWarnings("rawtypes") - final IV[] samples; - { - /* - * Use a read-only view (sampling depends on access to the BTree - * rather than the ReadCommittedIndex). - */ - final AbstractTripleStore kb = (AbstractTripleStore) jnl - .getResourceLocator().locate(namespace, - jnl.getLastCommitTime()); - - samples = getRandomSamples(kb); - - } - - final GASStats total = new GASStats(); - - for (int i = 0; i < samples.length; i++) { - - @SuppressWarnings("rawtypes") - final IV startingVertex = samples[i]; - - gasEngine.init(startingVertex); - - // TODO Pure interface for this. - total.add((GASStats)gasEngine.call()); - - } - - // Total over all sampled vertices. - System.out.println("TOTAL: " + total); - - // Done. - return null; - - } finally { - - jnl.close(); - - } - - } - -} Deleted: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties 2013-08-22 19:21:47 UTC (rev 7319) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties 2013-08-22 21:45:26 UTC (rev 7320) @@ -1,35 +0,0 @@ -# -# Note: These options are applied when the journal and the triple store are -# first created. - -## -## Journal options. -## - -# The backing file. This contains all your data. You want to put this someplace -# safe. The default locator will wind up in the directory from which you start -# your servlet container. -com.bigdata.journal.AbstractJournal.file=bigdata.jnl - -# The persistence engine. Use 'Disk' for the WORM or 'DiskRW' for the RWStore. -com.bigdata.journal.AbstractJournal.bufferMode=DiskRW -#com.bigdata.journal.AbstractJournal.bufferMode=MemStore - -com.bigdata.btree.writeRetentionQueue.capacity=4000 -com.bigdata.btree.BTree.branchingFactor=128 - -# 200M initial extent. -com.bigdata.journal.AbstractJournal.initialExtent=209715200 -com.bigdata.journal.AbstractJournal.maximumExtent=209715200 - -## -## Setup for QUADS mode without the full text index. -## -com.bigdata.rdf.sail.truthMaintenance=false -com.bigdata.rdf.store.AbstractTripleStore.quads=true -com.bigdata.rdf.store.AbstractTripleStore.statementIdentifiers=false -com.bigdata.rdf.store.AbstractTripleStore.textIndex=false -com.bigdata.rdf.store.AbstractTripleStore.axiomsClass=com.bigdata.rdf.axioms.NoAxioms -#com.bigdata.rdf.store.AbstractTripleStore.inlineDateTimes=true - -com.bigdata.rdf.rio.RDFParserOptions.stopAtFirstError=false \ No newline at end of file Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/TestGather.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/TestGather.java 2013-08-22 19:21:47 UTC (rev 7319) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/TestGather.java 2013-08-22 21:45:26 UTC (rev 7320) @@ -244,7 +244,7 @@ final IGASEngine<Set<ISPO>, Set<ISPO>, Set<ISPO>> gasEngine = new GASEngine<Set<ISPO>, Set<ISPO>, Set<ISPO>>( sail.getDatabase().getIndexManager(), sail.getDatabase() .getNamespace(), ITx.READ_COMMITTED, - new MockGASProgram(gatherEdges)); + new MockGASProgram(gatherEdges), 1/* nthreads */); // Initialize the froniter. gasEngine.init(startingVertex); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-08-22 19:21:58
|
Revision: 7319 http://bigdata.svn.sourceforge.net/bigdata/?rev=7319&view=rev Author: mrpersonick Date: 2013-08-22 19:21:47 +0000 (Thu, 22 Aug 2013) Log Message: ----------- got rid of the extra copy ops we were using to make Tee work (for Unions) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpContext.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpContext.java 2013-08-22 18:42:59 UTC (rev 7318) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpContext.java 2013-08-22 19:21:47 UTC (rev 7319) @@ -66,6 +66,13 @@ private final AtomicInteger idFactory; /** + * Temporary "next id" bypasses the idFactory when we want to be explicit + * about the next bop id. Used for Tees (Unions). nextId = -1 means use + * the idFactory. + */ + private transient int nextId = -1; + + /** * The KB instance. */ protected final AbstractTripleStore db; @@ -456,9 +463,34 @@ } + /** + * Temporarily set the next bop Id to come out of the context. + */ + public void setNextId(final int nextId) { + + this.nextId = nextId; + + } + + /** + * Return the next id from the idFactory, unless there is a temporary + * bop id set, in which case return it and clear it. + */ public int nextId() { - return idFactory.incrementAndGet(); + if (nextId == -1) { + + return idFactory.incrementAndGet(); + + } else { + + final int tmp = nextId; + + nextId = -1; + + return tmp; + + } } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java 2013-08-22 18:42:59 UTC (rev 7318) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java 2013-08-22 19:21:47 UTC (rev 7319) @@ -2243,10 +2243,11 @@ * Need to make sure the first operator in the group has the right * Id. */ - left = new CopyOp(leftOrEmpty(left), NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, subqueryIds[i++]),// - })); - +// left = new CopyOp(leftOrEmpty(left), NV.asMap(new NV[] {// +// new NV(Predicate.Annotations.BOP_ID, subqueryIds[i++]),// +// })); + ctx.setNextId(subqueryIds[i++]); + // Start with everything already known to be materialized. final Set<IVariable<?>> tmp = new LinkedHashSet<IVariable<?>>( doneSet); @@ -2278,11 +2279,12 @@ /* * All the subqueries get routed here when they are done. */ - left = applyQueryHints(new CopyOp(leftOrEmpty(left),// - new NV(Predicate.Annotations.BOP_ID, downstreamId),// - new NV(BOp.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.CONTROLLER)// - ), ctx.queryHints); +// left = applyQueryHints(new CopyOp(leftOrEmpty(left),// +// new NV(Predicate.Annotations.BOP_ID, downstreamId),// +// new NV(BOp.Annotations.EVALUATION_CONTEXT, +// BOpEvaluationContext.CONTROLLER)// +// ), ctx.queryHints); + ctx.setNextId(downstreamId); // Add in anything which was known materialized for all child groups. doneSet.addAll(doneSetsIntersection); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-22 18:43:06
|
Revision: 7318 http://bigdata.svn.sourceforge.net/bigdata/?rev=7318&view=rev Author: thompsonbry Date: 2013-08-22 18:42:59 +0000 (Thu, 22 Aug 2013) Log Message: ----------- added a version of the heisenbug for the double-bind Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestBindHeisenbug708.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.rq branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.srx branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.ttl Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestBindHeisenbug708.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestBindHeisenbug708.java 2013-08-22 18:00:26 UTC (rev 7317) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestBindHeisenbug708.java 2013-08-22 18:42:59 UTC (rev 7318) @@ -26,10 +26,13 @@ import junit.framework.AssertionFailedError; /** - * Test suite for a hesienbug involving BIND. - * Unlike the other issues this sometimes happens, and is sometimes OK, - * so we run the test in a loop 20 times. + * Test suite for a hesienbug involving BIND. Unlike the other issues this + * sometimes happens, and is sometimes OK, so we run the test in a loop 20 + * times. * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/708"> + * Heisenbug </a> + * * @version $Id$ */ public class TestBindHeisenbug708 extends AbstractDataDrivenSPARQLTestCase { @@ -52,13 +55,40 @@ "heisenbug-708.ttl", // dataURI "heisenbug-708.srx" // resultURI ).runTest(); +// return; // stop @ success } catch (AssertionFailedError e) { cnt++; +// throw e;// halt @ 1st failure. } } assertTrue("Test failed " + cnt + "/" + max + " times", cnt==0); } - + + /** + * Demonstrates the same problem using two BIND()s. This rules out the + * JOIN as the problem. + */ + public void test_heisenbug708_doubleBind() throws Exception { + int cnt = 0; + int max = 10; + for (int i=0; i<max; i++) { + try { + new TestHelper( + "heisenbug-708-doubleBind",// testURI + "heisenbug-708-doubleBind.rq", // queryURI + "heisenbug-708-doubleBind.ttl", // dataURI + "heisenbug-708-doubleBind.srx" // resultURI + ).runTest(); +// return; // stop @ success + } + catch (AssertionFailedError e) { + cnt++; +// throw e;// halt @ 1st failure. + } + } + assertTrue("Test failed " + cnt + "/" + max + " times", cnt==0); + } + } Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.rq 2013-08-22 18:42:59 UTC (rev 7318) @@ -0,0 +1,15 @@ +PREFIX : <http://example/> + + +SELECT ?a +{ + { +# ?a :label "RNK-16-2B3" + BIND ( :bound2 as ?a ) + } + UNION + { + BIND ( :bound as ?a ) + } +} + Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.srx =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.srx 2013-08-22 18:42:59 UTC (rev 7318) @@ -0,0 +1,21 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="a"/> + </head> + <results> + <result> + <binding name="a"> + <uri>http://example/bound2</uri> + </binding> + </result> + <result> + <binding name="a"> + <uri>http://example/bound</uri> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.ttl =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.ttl (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708-doubleBind.ttl 2013-08-22 18:42:59 UTC (rev 7318) @@ -0,0 +1,3 @@ +@prefix : <http://example/> . + +#:z :label "RNK-16-2B3" . This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-08-22 18:00:32
|
Revision: 7317 http://bigdata.svn.sourceforge.net/bigdata/?rev=7317&view=rev Author: mrpersonick Date: 2013-08-22 18:00:26 +0000 (Thu, 22 Aug 2013) Log Message: ----------- fixed a problem with Mock IVs getting resolved by lexicon Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java 2013-08-22 16:43:35 UTC (rev 7316) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java 2013-08-22 18:00:26 UTC (rev 7317) @@ -218,6 +218,9 @@ } + if (iv.hasValue()) + continue; + ids.add(iv); } @@ -241,6 +244,9 @@ } + if (iv.hasValue()) + continue; + ids.add(iv); } @@ -350,6 +356,9 @@ } final IV<?,?> iv = (IV<?,?>) boundValue; + + if (iv.hasValue()) + continue; final BigdataValue value = terms.get(iv); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-22 16:43:41
|
Revision: 7316 http://bigdata.svn.sourceforge.net/bigdata/?rev=7316&view=rev Author: thompsonbry Date: 2013-08-22 16:43:35 +0000 (Thu, 22 Aug 2013) Log Message: ----------- Bug fix. Was not using toArray() to output the joinVars and projectedInVars Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/GroupNodeBase.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/GroupNodeBase.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/GroupNodeBase.java 2013-08-22 16:18:38 UTC (rev 7315) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/GroupNodeBase.java 2013-08-22 16:43:35 UTC (rev 7316) @@ -1,5 +1,6 @@ package com.bigdata.rdf.sparql.ast; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -271,12 +272,12 @@ final IVariable<?>[] joinVars = t.getJoinVars(); if (joinVars != null) - sb.append(" [joinVars=" + joinVars + "]"); + sb.append(" [joinVars=" + Arrays.toString(joinVars) + "]"); final IVariable<?>[] projectInVars = t.getProjectInVars(); if (projectInVars != null) - sb.append(" [projectInVars=" + projectInVars + "]"); + sb.append(" [projectInVars=" + Arrays.toString(projectInVars) + "]"); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-22 16:18:44
|
Revision: 7315 http://bigdata.svn.sourceforge.net/bigdata/?rev=7315&view=rev Author: thompsonbry Date: 2013-08-22 16:18:38 +0000 (Thu, 22 Aug 2013) Log Message: ----------- more javadoc on HAClient. Modified Paths: -------------- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java Modified: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-08-22 15:56:54 UTC (rev 7314) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-08-22 16:18:38 UTC (rev 7315) @@ -91,6 +91,20 @@ * that file is passed to {@link #newInstance(String[])}. The configuration must * be consistent with the configuration of the federation to which you wish to * connect. + * <p> + * Each HA replication cluster has a logical service identifier. You can use + * this to obtain the {@link Quorum} for that cluster. See + * {@link HAConnection#getHAGlueQuorum(String)}. Once you have the quorum, you + * can get the {@link QuorumClient} and obtain the {@link UUID}s of the leader + * and the followers using {@link Quorum#token()}, + * {@link QuorumClient#getLeader(long)} and {@link Quorum#getJoined()} (which + * reports all joined services, including the leader and the followers). You can + * then use {@link HAConnection#getHAGlueService(UUID)} to obtain the RMI proxy + * for a given service using its service identifier (the UUID). + * <p> + * Once you have the RMI proxy for the service, you can use the {@link HAGlue} + * interface to talk directly to that service. Some methods of interest include + * {@link HAGlue#getHAStatus()}, {@link HAGlue#getHostname()}, etc. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-22 15:57:06
|
Revision: 7314 http://bigdata.svn.sourceforge.net/bigdata/?rev=7314&view=rev Author: thompsonbry Date: 2013-08-22 15:56:54 +0000 (Thu, 22 Aug 2013) Log Message: ----------- I have lifted the code to connect to zookeeper into the HAClient. This exposed a problem where the HAJournalServer instances knew the replicationFactor for a given logical service (a logical service is an HA replication cluster) because that information is part of their Configuration file. However, the HAClient does not have access to that information and therefore could not start a ZkQuorumImpl to monitor a replication cluster. I have fixed this by adding the replicationFactor into the QuorumTokenState. The QuorumTokenState is versioned and this data is in a new version and will be ZERO (0) for old versions. If the replicationFactor is not present in the QuorumTokenState, then the ZkQuorumImpl class will automatically apply the replicationFactor to the QuorumTokenState in zookeeper, effecting a migration, when an HAJournalServer process is restarted (however, the replication factor will not be modified if the quorum is already met and a service will refuse to run if the replication factor from the quorum token state does not agree with the replication factor configured on the service). A service MUST NOT complete a service join if the replication factor (as understood by the Quorum) does not agree with the replication factor on the quorum leader. I have not made this change. I am concerned that it might break replay of existing HALog files and could cause problems when the replication factor of a quorum is changed (again, it could break replay of HALog files). However, I have added the current replication factor of leader into the HAWriteMessage. Much like the quorum token, this needs to be interpreted differently for live writes (when it must be correct) and historical writes (when the value might have been different). See #728 Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessage.java branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorumClient.java branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorumMember.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/jini/start/ManageLogicalServiceTask.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/quorum/zk/QuorumTokenState.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkQuorum.java branches/READ_CACHE2/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java branches/READ_CACHE2/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java branches/READ_CACHE2/src/resources/HAJournal/HAJournal.config Added Paths: ----------- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAGlueServicesClient.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAddr.java Removed Paths: ------------- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalDiscoveryClient.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-08-22 00:22:30 UTC (rev 7313) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-08-22 15:56:54 UTC (rev 7314) @@ -77,6 +77,9 @@ /** The quorum token for which this message is valid. */ private long quorumToken; + /** The replication factor for the quorum leader. */ + private int replicationFactor; + /** The length of the backing file on the disk. */ private long fileExtent; @@ -130,6 +133,11 @@ return quorumToken; } + @Override + public int getReplicationFactor() { + return replicationFactor; + } + /* (non-Javadoc) * @see com.bigdata.journal.ha.IHAWriteMessage#getFileExtent() */ @@ -163,6 +171,7 @@ + ",storeType=" + getStoreType() // + ",compressorKey=" + getCompressorKey() // + ",quorumToken=" + getQuorumToken()// + + ",replicationFactor=" + getReplicationFactor() // + ",fileExtent=" + getFileExtent() // + ",firstOffset=" + getFirstOffset() // + "}"; @@ -214,8 +223,9 @@ final long firstOffset) { this(uuid, commitCounter, commitTime, sequence, sze, chk, storeType, - quorumToken, fileExtent, firstOffset, null/* compressorKey */); - + quorumToken, 0/* replicationFactor */, fileExtent, firstOffset, + null/* compressorKey */); + } /** @@ -243,6 +253,8 @@ * The type of backing store (RW or WORM). * @param quorumToken * The quorum token for which this message is valid. + * @param replicationFactor + * The replication factor in effect for the leader. * @param fileExtent * The length of the backing file on the disk. * @param firstOffset @@ -255,7 +267,9 @@ public HAWriteMessage(final UUID uuid, final long commitCounter, final long commitTime, final long sequence, final int sze, final int chk, final StoreTypeEnum storeType, - final long quorumToken, final long fileExtent, + final long quorumToken, + final int replicationFactor, + final long fileExtent, final long firstOffset, final String compressorKey) { @@ -282,6 +296,8 @@ this.fileExtent = fileExtent; this.firstOffset = firstOffset; + + this.replicationFactor = replicationFactor; this.compressorKey = compressorKey; @@ -309,9 +325,15 @@ private static final byte VERSION2 = 0x2; /** + * Adds the {@link #replicationFactor} for the quorum leader (decodes as + * ZERO (0) for older versions). + */ + private static final byte VERSION3 = 0x3; + + /** * The current version. */ - private static final byte currentVersion = VERSION2; // VERSION2; + private static final byte currentVersion = VERSION3; /** * Determine whether message data is compressed @@ -363,6 +385,7 @@ case VERSION0: uuid = null; // Note: not available. break; + case VERSION3: // fall through case VERSION2: { final boolean isNull = in.readBoolean(); compressorKey = isNull ? null : in.readUTF(); @@ -382,6 +405,11 @@ lastCommitTime = in.readLong(); sequence = in.readLong(); quorumToken = in.readLong(); + if (version >= VERSION3) { + replicationFactor = in.readInt(); + } else { + replicationFactor = 0; + } fileExtent = in.readLong(); firstOffset = in.readLong(); } @@ -406,6 +434,8 @@ out.writeLong(lastCommitTime); out.writeLong(sequence); out.writeLong(quorumToken); + if (currentVersion >= VERSION3) + out.writeInt(replicationFactor); out.writeLong(fileExtent); out.writeLong(firstOffset); } Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessage.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessage.java 2013-08-22 00:22:30 UTC (rev 7313) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessage.java 2013-08-22 15:56:54 UTC (rev 7314) @@ -70,6 +70,9 @@ /** The quorum token for which this message is valid. */ long getQuorumToken(); + /** The replication factor for the quorum leader. */ + int getReplicationFactor(); + /** The length of the backing file on the disk. */ long getFileExtent(); Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2013-08-22 00:22:30 UTC (rev 7313) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2013-08-22 15:56:54 UTC (rev 7314) @@ -1632,6 +1632,7 @@ final long lastCommitCounter,// final long lastCommitTime,// final long sequence,// + final int replicationFactor,// final ByteBuffer checksumBuffer ) { @@ -1664,7 +1665,8 @@ sequence, // send.limit(), chksum, prefixWrites ? StoreTypeEnum.RW : StoreTypeEnum.WORM, - quorumToken, fileExtent.get(), firstOffset.get(), + quorumToken, replicationFactor, + fileExtent.get(), firstOffset.get(), compressorKey); if (log.isTraceEnabled()) { Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2013-08-22 00:22:30 UTC (rev 7313) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2013-08-22 15:56:54 UTC (rev 7314) @@ -443,6 +443,8 @@ */ final private long quorumToken; + final private int replicationFactor; + /** * The object which manages {@link Quorum} state changes on the behalf of * this service. @@ -567,8 +569,11 @@ // the token under which the write cache service was established. if ((this.quorum = quorum) != null) { this.quorumToken = quorum.token(); + this.replicationFactor = quorum.replicationFactor(); } else { + // Not HA. this.quorumToken = Quorum.NO_QUORUM; + this.replicationFactor = 1; } this.reader = reader; @@ -1412,6 +1417,7 @@ quorumMember.getLastCommitCounter(),// quorumMember.getLastCommitTime(),// thisSequence,// + replicationFactor,// checksumBuffer ); Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-08-22 00:22:30 UTC (rev 7313) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-08-22 15:56:54 UTC (rev 7314) @@ -500,6 +500,7 @@ } + @Override public void terminate() { boolean interrupted = false; lock.lock(); @@ -572,7 +573,8 @@ } catch (Throwable t) { launderThrowable(t); } - watcher.terminate(); + if (watcher != null) + watcher.terminate(); if (watcherActionService != null) { watcherActionService.shutdown(); try { Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorumClient.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorumClient.java 2013-08-22 00:22:30 UTC (rev 7313) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorumClient.java 2013-08-22 15:56:54 UTC (rev 7314) @@ -49,7 +49,8 @@ private final String logicalServiceId; - public String getLogicalServiceId() { + @Override + final public String getLogicalServiceId() { return logicalServiceId; @@ -71,6 +72,7 @@ * @return The reference from an atomic variable that will be cleared if the * quorum terminates. */ + @Override public Quorum<?,?> getQuorum() { final Quorum<?,?> tmp = quorum.get(); @@ -82,6 +84,7 @@ } + @Override public void start(final Quorum<?,?> quorum) { if (quorum == null) @@ -95,12 +98,14 @@ } + @Override public void terminate() { this.quorum.set(null); } - + + @Override public S getLeader(final long token) { final Quorum<?,?> q = getQuorum(); q.assertQuorum(token); @@ -112,9 +117,12 @@ return getService(leaderId); } + @Override abstract public S getService(UUID serviceId); + @Override public void notify(QuorumEvent e) { + } } Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorumMember.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorumMember.java 2013-08-22 00:22:30 UTC (rev 7313) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorumMember.java 2013-08-22 15:56:54 UTC (rev 7314) @@ -54,6 +54,7 @@ } + @Override public S getLeader(final long token) { final Quorum<?,?> q = getQuorum(); q.assertQuorum(token); @@ -65,8 +66,10 @@ return getService(leaderId); } + @Override abstract public S getService(UUID serviceId); + @Override public UUID getServiceId() { return serviceId; } @@ -75,6 +78,7 @@ * Return the actor for this {@link QuorumMember} (it is allocated by * {@link AbstractQuorum#start(QuorumClient)}). */ + @Override public QuorumActor<S, QuorumMember<S>> getActor() { // Note: This causes a compiler error on CI builds w/ JDK1.1.6_17. // return (QuorumActor<S, QuorumMember<S>>) getQuorum().getActor(); @@ -212,6 +216,7 @@ getQuorum().assertQuorum(token); } + @Override public void assertLeader(final long token) { getQuorum().assertLeader(token); } Modified: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/jini/start/ManageLogicalServiceTask.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/jini/start/ManageLogicalServiceTask.java 2013-08-22 00:22:30 UTC (rev 7313) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/jini/start/ManageLogicalServiceTask.java 2013-08-22 15:56:54 UTC (rev 7314) @@ -220,8 +220,8 @@ /* * Setup the quorum state. */ - ZKQuorumImpl.setupQuorum(logicalServiceZPath, fed.getZookeeperAccessor(), - acl); + ZKQuorumImpl.setupQuorum(logicalServiceZPath, 1/* replicationFactor */, + fed.getZookeeperAccessor(), acl); try { Modified: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-08-22 00:22:30 UTC (rev 7313) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-08-22 15:56:54 UTC (rev 7314) @@ -58,7 +58,6 @@ import net.jini.core.lookup.ServiceRegistrar; import net.jini.discovery.DiscoveryEvent; import net.jini.discovery.DiscoveryListener; -import net.jini.discovery.LookupDiscoveryManager; import net.jini.export.Exporter; import net.jini.jeri.BasicILFactory; import net.jini.jeri.BasicJeriExporter; @@ -67,7 +66,6 @@ import net.jini.lease.LeaseRenewalEvent; import net.jini.lease.LeaseRenewalManager; import net.jini.lookup.JoinManager; -import net.jini.lookup.ServiceDiscoveryManager; import net.jini.lookup.ServiceIDListener; import net.jini.lookup.entry.Name; @@ -82,6 +80,7 @@ import com.bigdata.jini.lookup.entry.ServiceUUID; import com.bigdata.jini.start.config.ZookeeperClientConfig; import com.bigdata.jini.util.JiniUtil; +import com.bigdata.journal.jini.ha.HAClient.HAConnection; import com.bigdata.service.AbstractService; import com.bigdata.service.IService; import com.bigdata.service.IServiceShutdown; @@ -152,6 +151,9 @@ /** * Configuration options. * <p> + * Note: The options declared by this interface are in the namespace for the + * concrete implementation of the {@link AbstractServer} class. + * <p> * Note: The {@link ServiceID} is optional and may be specified using the * {@link Entry}[] for the {@link JiniClientConfig} as a {@link ServiceUUID} * . If it is not specified, then a random {@link ServiceID} will be @@ -162,7 +164,7 @@ * @see JiniClientConfig * @see ZookeeperClientConfig */ - public interface ConfigurationOptions { + public interface ConfigurationOptions extends HAClient.ConfigurationOptions { /** * The pathname of the service directory as a {@link File} (required). @@ -178,21 +180,13 @@ */ String EXPORTER = "exporter"; - /** - * The timeout in milliseconds to await the discovery of a service if - * there is a cache miss (default {@value #DEFAULT_CACHE_MISS_TIMEOUT}). - */ - String CACHE_MISS_TIMEOUT = "cacheMissTimeout"; - - long DEFAULT_CACHE_MISS_TIMEOUT = 2000L; - } - /** - * The timeout in milliseconds to await the discovery of a service if there - * is a cache miss (default {@value #DEFAULT_CACHE_MISS_TIMEOUT}). - */ - final protected long cacheMissTimeout; +// /** +// * The timeout in milliseconds to await the discovery of a service if there +// * is a cache miss (default {@value #DEFAULT_CACHE_MISS_TIMEOUT}). +// */ +// final protected long cacheMissTimeout; /** * The {@link ServiceID} for this server is either read from a local file, @@ -246,11 +240,16 @@ private RandomAccessFile lockFileRAF = null; private FileLock fileLock; private File lockFile; - - private LookupDiscoveryManager lookupDiscoveryManager; - private ServiceDiscoveryManager serviceDiscoveryManager; + /** + * The reference to the {@link HAClient}. + */ + private final HAClient haClient; +// private LookupDiscoveryManager lookupDiscoveryManager; +// +// private ServiceDiscoveryManager serviceDiscoveryManager; + /** * Used to manage the join/leave of the service hosted by this server with * Jini service registrar(s). @@ -390,24 +389,33 @@ } - /** - * An object used to manage jini service registrar discovery. - */ - public LookupDiscoveryManager getDiscoveryManagement() { - - return lookupDiscoveryManager; - - } +// /** +// * An object used to manage jini service registrar discovery. +// */ +// public LookupDiscoveryManager getDiscoveryManagement() { +// +// return lookupDiscoveryManager; +// +// } +// +// /** +// * An object used to lookup services using the discovered service registars. +// */ +// public ServiceDiscoveryManager getServiceDiscoveryManager() { +// +// return serviceDiscoveryManager; +// +// } /** - * An object used to lookup services using the discovered service registars. + * The {@link HAClient}. */ - public ServiceDiscoveryManager getServiceDiscoveryManager() { + protected final HAClient getHAClient() { - return serviceDiscoveryManager; + return haClient; } - + /* * DiscoveryListener */ @@ -611,9 +619,9 @@ config = ConfigurationProvider.getInstance(args); - cacheMissTimeout = (Long) config.getEntry(COMPONENT, - ConfigurationOptions.CACHE_MISS_TIMEOUT, Long.TYPE, - ConfigurationOptions.DEFAULT_CACHE_MISS_TIMEOUT); +// cacheMissTimeout = (Long) config.getEntry(COMPONENT, +// ConfigurationOptions.CACHE_MISS_TIMEOUT, Long.TYPE, +// ConfigurationOptions.DEFAULT_CACHE_MISS_TIMEOUT); jiniClientConfig = new JiniClientConfig( JiniClientConfig.Options.NAMESPACE, config); @@ -881,45 +889,59 @@ Runtime.getRuntime().addShutdownHook( new ShutdownThread(false/* destroy */, this)); + final HAConnection ctx; try { - /* - * Note: This class will perform multicast discovery if ALL_GROUPS - * is specified and otherwise requires you to specify one or more - * unicast locators (URIs of hosts running discovery services). As - * an alternative, you can use LookupDiscovery, which always does - * multicast discovery. - */ - lookupDiscoveryManager = new LookupDiscoveryManager( - jiniClientConfig.groups, jiniClientConfig.locators, - this /* DiscoveryListener */, config); + // Create client. + haClient = new HAClient(args); - /* - * Setup a helper class that will be notified as services join or - * leave the various registrars to which the data server is - * listening. - */ - try { + // Connect. + ctx = haClient.connect(); + +// /* +// * Note: This class will perform multicast discovery if ALL_GROUPS +// * is specified and otherwise requires you to specify one or more +// * unicast locators (URIs of hosts running discovery services). As +// * an alternative, you can use LookupDiscovery, which always does +// * multicast discovery. +// */ +// lookupDiscoveryManager = new LookupDiscoveryManager( +// jiniClientConfig.groups, jiniClientConfig.locators, +// this /* DiscoveryListener */, config); +// +// /* +// * Setup a helper class that will be notified as services join or +// * leave the various registrars to which the data server is +// * listening. +// */ +// try { +// +// serviceDiscoveryManager = new ServiceDiscoveryManager( +// lookupDiscoveryManager, new LeaseRenewalManager(), +// config); +// +// } catch (IOException ex) { +// +// throw new RuntimeException( +// "Could not initiate service discovery manager", ex); +// +// } +// +// } catch (IOException ex) { +// +// fatal("Could not setup discovery", ex); +// throw new AssertionError();// keep the compiler happy. +// + } catch (ConfigurationException ex) { - serviceDiscoveryManager = new ServiceDiscoveryManager( - lookupDiscoveryManager, new LeaseRenewalManager(), - config); + fatal("Configuration error: " + ex, ex); - } catch (IOException ex) { - - throw new RuntimeException( - "Could not initiate service discovery manager", ex); - - } - - } catch (IOException ex) { - - fatal("Could not setup discovery", ex); throw new AssertionError();// keep the compiler happy. - } catch (ConfigurationException ex) { + } catch(Throwable ex) { + + fatal("Could not connect: " + ex, ex); - fatal("Could not setup discovery", ex); throw new AssertionError();// keep the compiler happy. } @@ -1000,7 +1022,7 @@ joinManager = new JoinManager(proxy, // service proxy attributes, // attr sets serviceID, // ServiceID - getDiscoveryManagement(), // DiscoveryManager + ctx.getDiscoveryManagement(), // DiscoveryManager new LeaseRenewalManager(), // config); @@ -1013,7 +1035,7 @@ joinManager = new JoinManager(proxy, // service proxy attributes, // attr sets this, // ServiceIDListener - getDiscoveryManagement(), // DiscoveryManager + ctx.getDiscoveryManagement(), // DiscoveryManager new LeaseRenewalManager(), // config); @@ -1775,21 +1797,23 @@ } - if (serviceDiscoveryManager != null) { + haClient.disconnect(false/*immediateShutdown*/); - serviceDiscoveryManager.terminate(); - - serviceDiscoveryManager = null; - - } - - if (lookupDiscoveryManager != null) { - - lookupDiscoveryManager.terminate(); - - lookupDiscoveryManager = null; - - } +// if (serviceDiscoveryManager != null) { +// +// serviceDiscoveryManager.terminate(); +// +// serviceDiscoveryManager = null; +// +// } +// +// if (lookupDiscoveryManager != null) { +// +// lookupDiscoveryManager.terminate(); +// +// lookupDiscoveryManager = null; +// +// } // if (client != null) { // @@ -1838,9 +1862,16 @@ log.warn("Could not set thread name: " + ex); } - - startUpHook(); + boolean started = false; + try { + startUpHook(); + started = true; + } finally { + if (!started) + shutdownNow(false/*destroy*/); + } + if (runState.compareAndSet(RunState.Start, RunState.Running)) { { Modified: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-08-22 00:22:30 UTC (rev 7313) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-08-22 15:56:54 UTC (rev 7314) @@ -20,7 +20,7 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ + */ /* * Created on Mar 24, 2007 */ @@ -28,7 +28,13 @@ package com.bigdata.journal.jini.ha; import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -39,6 +45,7 @@ import net.jini.config.ConfigurationProvider; import net.jini.core.discovery.LookupLocator; import net.jini.core.lookup.ServiceItem; +import net.jini.core.lookup.ServiceRegistrar; import net.jini.discovery.DiscoveryEvent; import net.jini.discovery.DiscoveryListener; import net.jini.discovery.LookupDiscoveryManager; @@ -48,11 +55,26 @@ import net.jini.lookup.ServiceDiscoveryManager; import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; import com.bigdata.ha.HAGlue; +import com.bigdata.io.SerializerUtil; import com.bigdata.jini.start.config.ZookeeperClientConfig; import com.bigdata.jini.util.JiniUtil; +import com.bigdata.quorum.AbstractQuorumClient; +import com.bigdata.quorum.Quorum; +import com.bigdata.quorum.QuorumClient; +import com.bigdata.quorum.QuorumEvent; +import com.bigdata.quorum.QuorumListener; +import com.bigdata.quorum.zk.QuorumTokenState; +import com.bigdata.quorum.zk.ZKQuorum; +import com.bigdata.quorum.zk.ZKQuorumImpl; import com.bigdata.service.IDataService; import com.bigdata.service.IService; import com.bigdata.service.IServiceShutdown; @@ -70,47 +92,42 @@ * be consistent with the configuration of the federation to which you wish to * connect. * - * FIXME Review how we connect to a specific logical HA replication cluster and - * verify that we can connect to multiple such clusters in order to support RMI - * operations across those clusters (as long as they are in the same space). It - * might be that this is achieved through a HAConnection to each logical cluster - * instance, or not. The quorums are certainly specific to a logical instance - * and that is determined by the zkroot. Investigate! - * - * FIXME Retro fit into HAJournalServer (discoveryClient) and AbstractServer. - * - * FIXME This does not start up a quorum and set a quorum client. It should so - * we can see the quorum events. That could be a useful thing to do in main(). - * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ + * + * TODO Refactor the HA3 test suite to use the HAClient class. */ public class HAClient { private static final Logger log = Logger.getLogger(HAClient.class); - + /** * Options understood by the {@link HAClient}. * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> */ - public static interface Options { - + public static interface ConfigurationOptions { + /** - * The timeout in milliseconds that the client will await the discovery - * of a service if there is a cache miss (default - * {@value #DEFAULT_CACHE_MISS_TIMEOUT}). - * - * @see HAJournalDiscoveryClient + * The namespace for the configuration options declared by this + * interface. */ + String COMPONENT = HAClient.class.getName(); + + /** + * The timeout in milliseconds to await the discovery of a service if + * there is a cache miss (default {@value #DEFAULT_CACHE_MISS_TIMEOUT}). + */ String CACHE_MISS_TIMEOUT = "cacheMissTimeout"; - String DEFAULT_CACHE_MISS_TIMEOUT = "" + (2 * 1000); - + long DEFAULT_CACHE_MISS_TIMEOUT = 2000L; + } - + /** - * The value is the {@link HAConnection} and <code>null</code> iff not connected. + * The value is the {@link HAConnection} and <code>null</code> iff not + * connected. */ private final AtomicReference<HAConnection> fed = new AtomicReference<HAConnection>(); @@ -125,15 +142,23 @@ private final Lock connectLock = new ReentrantLock(false/* fair */); public boolean isConnected() { - + return fed.get() != null; - + } - + + /** + * Get the current {@link HAConnection}. + * + * @return The {@link HAConnection}. + * + * @throws IllegalStateException + * if the {@link HAClient} is not connected. + */ public HAConnection getConnection() { final HAConnection fed = this.fed.get(); - + if (fed == null) { throw new IllegalStateException(); @@ -170,7 +195,7 @@ public void disconnect(final boolean immediateShutdown) { connectLock.lock(); - + try { final HAConnection fed = this.fed.get(); @@ -212,7 +237,7 @@ fed = new HAConnection(jiniConfig, zooConfig); this.fed.set(fed); - + fed.start(this); } @@ -241,172 +266,172 @@ * The {@link Configuration} object used to initialize this class. */ private final Configuration config; - + /** * The {@link JiniClientConfig}. */ public JiniClientConfig getJiniClientConfig() { - + return jiniConfig; - + } - + /** * The {@link ZooKeeper} client configuration. */ public final ZookeeperClientConfig getZookeeperClientConfig() { return zooConfig; - + } - + /** * The {@link Configuration} object used to initialize this class. */ public final Configuration getConfiguration() { - + return config; - + } -// /** -// * {@inheritDoc} -// * -// * @see #getProperties(String component) -// */ -// public Properties getProperties() { -// -// return properties; -// -// } - -// /** -// * Return the {@link Properties} for the {@link JiniClient} merged with -// * those for the named component in the {@link Configuration}. Any -// * properties found for the {@link JiniClient} "component" will be read -// * first. Properties for the named component are next, and therefore will -// * override those given for the {@link JiniClient}. You can specify -// * properties for either the {@link JiniClient} or the <i>component</i> -// * using: -// * -// * <pre> -// * properties = NV[]{...}; -// * </pre> -// * -// * in the appropriate section of the {@link Configuration}. For example: -// * -// * <pre> -// * -// * // Jini client configuration -// * com.bigdata.service.jini.JiniClient { -// * -// * // ... -// * -// * // optional JiniClient properties. -// * properties = new NV[] { -// * -// * new NV("foo","bar") -// * -// * }; -// * -// * } -// * </pre> -// * -// * And overrides for a named component as: -// * -// * <pre> -// * -// * com.bigdata.service.FooBaz { -// * -// * properties = new NV[] { -// * -// * new NV("foo","baz"), -// * new NV("goo","12"), -// * -// * }; -// * -// * } -// * </pre> -// * -// * @param component -// * The name of the component. -// * -// * @return The properties specified for that component. -// * -// * @see #getConfiguration() -// */ -// public Properties getProperties(final String component) -// throws ConfigurationException { -// -// return JiniClient.getProperties(component, getConfiguration()); -// -// } - -// /** -// * Read {@value JiniClientConfig.Options#PROPERTIES} for the optional -// * application or server class identified by [cls]. -// * <p> -// * Note: Anything read for the specific class will overwrite any value for -// * the same properties specified for {@link JiniClient}. -// * -// * @param className -// * The class name of the client or service (optional). When -// * specified, properties defined for that class in the -// * configuration will be used and will override those specified -// * for the {@value Options#NAMESPACE}. -// * @param config -// * The {@link Configuration}. -// * -// * @todo this could be replaced by explicit use of the java identifier -// * corresponding to the Option and simply collecting all such -// * properties into a Properties object using their native type (as -// * reported by the ConfigurationFile). -// */ -// static public Properties getProperties(final String className, -// final Configuration config) throws ConfigurationException { -// -// final NV[] a = (NV[]) config -// .getEntry(JiniClient.class.getName(), -// JiniClientConfig.Options.PROPERTIES, NV[].class, -// new NV[] {}/* defaultValue */); -// -// final NV[] b; -// if (className != null) { -// -// b = (NV[]) config.getEntry(className, -// JiniClientConfig.Options.PROPERTIES, NV[].class, -// new NV[] {}/* defaultValue */); -// -// } else -// b = null; -// -// final NV[] tmp = ConfigMath.concat(a, b); -// -// final Properties properties = new Properties(); -// -// for (NV nv : tmp) { -// -// properties.setProperty(nv.getName(), nv.getValue()); -// -// } -// -// if (log.isInfoEnabled() || BigdataStatics.debug) { -// -// final String msg = "className=" + className + " : properties=" -// + properties.toString(); -// -// if (BigdataStatics.debug) -// System.err.println(msg); -// -// if (log.isInfoEnabled()) -// log.info(msg); -// -// } -// -// return properties; -// -// } + // /** + // * {@inheritDoc} + // * + // * @see #getProperties(String component) + // */ + // public Properties getProperties() { + // + // return properties; + // + // } + // /** + // * Return the {@link Properties} for the {@link JiniClient} merged with + // * those for the named component in the {@link Configuration}. Any + // * properties found for the {@link JiniClient} "component" will be read + // * first. Properties for the named component are next, and therefore will + // * override those given for the {@link JiniClient}. You can specify + // * properties for either the {@link JiniClient} or the <i>component</i> + // * using: + // * + // * <pre> + // * properties = NV[]{...}; + // * </pre> + // * + // * in the appropriate section of the {@link Configuration}. For example: + // * + // * <pre> + // * + // * // Jini client configuration + // * com.bigdata.service.jini.JiniClient { + // * + // * // ... + // * + // * // optional JiniClient properties. + // * properties = new NV[] { + // * + // * new NV("foo","bar") + // * + // * }; + // * + // * } + // * </pre> + // * + // * And overrides for a named component as: + // * + // * <pre> + // * + // * com.bigdata.service.FooBaz { + // * + // * properties = new NV[] { + // * + // * new NV("foo","baz"), + // * new NV("goo","12"), + // * + // * }; + // * + // * } + // * </pre> + // * + // * @param component + // * The name of the component. + // * + // * @return The properties specified for that component. + // * + // * @see #getConfiguration() + // */ + // public Properties getProperties(final String component) + // throws ConfigurationException { + // + // return JiniClient.getProperties(component, getConfiguration()); + // + // } + + // /** + // * Read {@value JiniClientConfig.Options#PROPERTIES} for the optional + // * application or server class identified by [cls]. + // * <p> + // * Note: Anything read for the specific class will overwrite any value for + // * the same properties specified for {@link JiniClient}. + // * + // * @param className + // * The class name of the client or service (optional). When + // * specified, properties defined for that class in the + // * configuration will be used and will override those specified + // * for the {@value Options#NAMESPACE}. + // * @param config + // * The {@link Configuration}. + // * + // * @todo this could be replaced by explicit use of the java identifier + // * corresponding to the Option and simply collecting all such + // * properties into a Properties object using their native type (as + // * reported by the ConfigurationFile). + // */ + // static public Properties getProperties(final String className, + // final Configuration config) throws ConfigurationException { + // + // final NV[] a = (NV[]) config + // .getEntry(JiniClient.class.getName(), + // JiniClientConfig.Options.PROPERTIES, NV[].class, + // new NV[] {}/* defaultValue */); + // + // final NV[] b; + // if (className != null) { + // + // b = (NV[]) config.getEntry(className, + // JiniClientConfig.Options.PROPERTIES, NV[].class, + // new NV[] {}/* defaultValue */); + // + // } else + // b = null; + // + // final NV[] tmp = ConfigMath.concat(a, b); + // + // final Properties properties = new Properties(); + // + // for (NV nv : tmp) { + // + // properties.setProperty(nv.getName(), nv.getValue()); + // + // } + // + // if (log.isInfoEnabled() || BigdataStatics.debug) { + // + // final String msg = "className=" + className + " : properties=" + // + properties.toString(); + // + // if (BigdataStatics.debug) + // System.err.println(msg); + // + // if (log.isInfoEnabled()) + // log.info(msg); + // + // } + // + // return properties; + // + // } + /** * Installs a {@link SecurityManager} and returns a new client. * @@ -434,7 +459,7 @@ } catch (ConfigurationException e) { throw new RuntimeException(e); - + } } @@ -468,20 +493,20 @@ * * @throws ConfigurationException */ - public HAClient(final Class cls, final Configuration config) + public HAClient(final Class<?> cls, final Configuration config) throws ConfigurationException { if (config == null) throw new IllegalArgumentException(); - -// this.properties = JiniClient.getProperties(cls.getName(), config); - + + // this.properties = JiniClient.getProperties(cls.getName(), config); + this.jiniConfig = new JiniClientConfig(cls.getName(), config); this.zooConfig = new ZookeeperClientConfig(config); this.config = config; - + } /** @@ -494,11 +519,11 @@ static protected void setSecurityManager() { final SecurityManager sm = System.getSecurityManager(); - + if (sm == null) { System.setSecurityManager(new SecurityManager()); - + if (log.isInfoEnabled()) log.info("Set security manager"); @@ -510,51 +535,51 @@ } } - -// /** -// * Read and return the content of the properties file. -// * -// * @param propertyFile -// * The properties file. -// * -// * @throws IOException -// */ -// static protected Properties getProperties(final File propertyFile) -// throws IOException { -// -// if(log.isInfoEnabled()) { -// -// log.info("Reading properties: file="+propertyFile); -// -// } -// -// final Properties properties = new Properties(); -// -// InputStream is = null; -// -// try { -// -// is = new BufferedInputStream(new FileInputStream(propertyFile)); -// -// properties.load(is); -// -// if(log.isInfoEnabled()) { -// -// log.info("Read properties: " + properties); -// -// } -// -// return properties; -// -// } finally { -// -// if (is != null) -// is.close(); -// -// } -// -// } + // /** + // * Read and return the content of the properties file. + // * + // * @param propertyFile + // * The properties file. + // * + // * @throws IOException + // */ + // static protected Properties getProperties(final File propertyFile) + // throws IOException { + // + // if(log.isInfoEnabled()) { + // + // log.info("Reading properties: file="+propertyFile); + // + // } + // + // final Properties properties = new Properties(); + // + // InputStream is = null; + // + // try { + // + // is = new BufferedInputStream(new FileInputStream(propertyFile)); + // + // properties.load(is); + // + // if(log.isInfoEnabled()) { + // + // log.info("Read properties: " + properties); + // + // } + // + // return properties; + // + // } finally { + // + // if (is != null) + // is.close(); + // + // } + // + // } + /** * Invoked when a service join is noticed. * @@ -562,8 +587,12 @@ * The RMI interface for the service. * @param serviceUUID * The service identifier. + * + * TODO It is pointless having this method and + * {@link #serviceLeave(UUID)} without a facility to delegate + * these methods to override them. Right now they just log. */ - public void serviceJoin(final IService service, final UUID serviceUUID) { + protected void serviceJoin(final IService service, final UUID serviceUUID) { if (log.isInfoEnabled()) log.info("service=" + service + ", serviceUUID" + serviceUUID); @@ -573,9 +602,10 @@ /** * Invoked when a service leave is noticed. * - * @param serviceUUID The service identifier. + * @param serviceUUID + * The service identifier. */ - public void serviceLeave(final UUID serviceUUID) { + protected void serviceLeave(final UUID serviceUUID) { if (log.isInfoEnabled()) log.info("serviceUUID=" + serviceUUID); @@ -583,34 +613,45 @@ } /** - * - * TODO Pattern after JiniFederation. Take the DiscoveryListener, etc. from - * AbstractServer but compare to JiniFederation. + * A connection to discovered {@link HAGlue} services. */ static public class HAConnection implements DiscoveryListener, ServiceDiscoveryListener, IServiceShutdown { private final JiniClientConfig jiniConfig; - + private final ZookeeperClientConfig zooConfig; - + /** * The {@link HAClient} reference. When non-<code>null</code> the client * is connected. When <code>null</code> it is disconnected. */ private final AtomicReference<HAClient> clientRef = new AtomicReference<HAClient>(); - - private ZooKeeperAccessor zooKeeperAccessor; + private ZooKeeperAccessor zka; + private LookupDiscoveryManager lookupDiscoveryManager; private ServiceDiscoveryManager serviceDiscoveryManager; /** * Caching discovery client for the {@link HAGlue} services. - */// TODO Rename as haGlueDiscoveryService - private HAJournalDiscoveryClient discoveryClient; + */ + private HAGlueServicesClient discoveryClient; + /** + * The set of quorums that were accessed through the + * {@link HAConnection} class. + * <p> + * Note: Changes to this map are synchronized on {@link #quorums}. This + * is done solely to guard against current creates of a {@link Quorum} + * for the same logical service id. The map itself is thread-safe to + * avoid contentions for a lock in + * {@link #terminateDiscoveryProcesses()}. + */ + private final Map<String, Quorum<HAGlue, QuorumClient<HAGlue>>> quorums = Collections + .synchronizedMap(new LinkedHashMap<String, Quorum<HAGlue, QuorumClient<HAGlue>>>()); + private HAConnection(final JiniClientConfig jiniConfig, final ZookeeperClientConfig zooConfig) { @@ -621,40 +662,50 @@ throw new IllegalArgumentException(); this.jiniConfig = jiniConfig; - + this.zooConfig = zooConfig; } - + /** * Return the client object that was used to obtain the connection. * * @return The {@link HAClient} reference. When non-<code>null</code> * the client is connected. When <code>null</code> it is * disconnected. - * - * @throws IllegalStateException - * if the client disconnected and this object is no longer - * valid. - * - * TODO add getOpenClient() and use to verify that the - * client is non-null for various internal methods. or use - * lambda to make this safe for things that need to be done - * conditionally. */ public HAClient getClient() { return clientRef.get(); - + } + /** + * Return the client object that was used to obtain the connection. + * + * @return The {@link HAClient} reference and never <code>null</code>. + * + * @throws IllegalStateException + * if the client disconnected. + */ + public HAClient getClientIfOpen() { + + final HAClient client = clientRef.get(); + + if (client == null) + throw new IllegalStateException(); + + return client; + + } + @Override public boolean isOpen() { - + return getClient() != null; - + } - + private void assertOpen() { if (!isOpen()) { @@ -665,19 +716,40 @@ } - public synchronized void start(final HAClient client) { + /** + * Return the zookeeper client configuration. + */ + public ZookeeperClientConfig getZooConfig() { - if(client == null) + return zooConfig; + + } + + /** + * Return an object that may be used to obtain a {@link ZooKeeper} + * client and that may be used to obtain the a new {@link ZooKeeper} + * client if the current session has been expired (an absorbing state + * for the {@link ZooKeeper} client). + */ + public ZooKeeperAccessor getZookeeperAccessor() { + + return zka; + + } + + private synchronized void start(final HAClient client) { + + if (client == null) throw new IllegalArgumentException(); - + if (isOpen()) throw new IllegalStateException(); if (log.isInfoEnabled()) log.info(jiniConfig.toString()); - + final String[] groups = jiniConfig.groups; - + final LookupLocator[] lookupLocators = jiniConfig.locators; try { @@ -687,9 +759,9 @@ * zookeeper servers. */ - zooKeeperAccessor = new ZooKeeperAccessor(zooConfig.servers, + zka = new ZooKeeperAccessor(zooConfig.servers, zooConfig.sessionTimeout); - + /* * Note: This class will perform multicast discovery if * ALL_GROUPS is specified and otherwise requires you to specify @@ -712,49 +784,55 @@ lookupDiscoveryManager, new LeaseRenewalManager(), client.getConfiguration()); - } catch(IOException ex) { - + } catch (IOException ex) { + throw new RuntimeException( "Could not initiate service discovery manager", ex); - + } - // FIXME Configuration [properties]. -// final long cacheMissTimeout = Long.valueOf(client.getProperties() -// .getProperty(JiniClient.Options.CACHE_MISS_TIMEOUT, -// JiniClient.Options.DEFAULT_CACHE_MISS_TIMEOUT)); - final long cacheMissTimeout = Long - .valueOf(JiniClient.Options.DEFAULT_CACHE_MISS_TIMEOUT); + /** + * The timeout in milliseconds to await the discovery of a + * service if there is a cache miss (default + * {@value #DEFAULT_CACHE_MISS_TIMEOUT}). + */ + final long cacheMissTimeout = (Long) client + .getConfiguration() + .getEntry(HAClient.ConfigurationOptions.COMPONENT, + ConfigurationOptions.CACHE_MISS_TIMEOUT, + Long.TYPE, + ConfigurationOptions.DEFAULT_CACHE_MISS_TIMEOUT); // Setup discovery for HAGlue clients. - // TODO Refactor out of HAJournalServer. - discoveryClient = new HAJournalDiscoveryClient( + discoveryClient = new HAGlueServicesClient( serviceDiscoveryManager, null/* serviceDiscoveryListener */, cacheMissTimeout); // And set the reference. The client is now "connected". this.clientRef.set(client); - + } catch (Exception ex) { - log.fatal("Problem initiating service discovery: " - + ex.getMessage(), ex); + log.fatal( + "Problem initiating service discovery: " + + ex.getMessage(), ex); try { shutdownNow(); - + } catch (Throwable t) { - + log.error(t.getMessage(), t); - + } throw new RuntimeException(ex); - + } } + /** * {@inheritDoc} * <p> @@ -762,18 +840,19 @@ */ @Override synchronized public void shutdown() { - - if(!isOpen()) return; + if (!isOpen()) + return; + if (log.isInfoEnabled()) log.info("begin"); - + // Disconnect. clientRef.set(null); final long begin = System.currentTimeMillis(); -// super.shutdown(); + // super.shutdown(); terminateDiscoveryProcesses(); @@ -818,23 +897,23 @@ * Stop various discovery processes. */ private void terminateDiscoveryProcesses() { - + /* * bigdata specific service discovery. */ - + if (discoveryClient != null) { discoveryClient.terminate(); discoveryClient = null; - + } /* * and the lower level jini processes. */ - + if (serviceDiscoveryManager != null) { serviceDiscoveryManager.terminate(); @@ -851,20 +930,75 @@ } - try { + // Terminate any quorums opened by the HAConnection. + for (Quorum<HAGlue, QuorumClient<HAGlue>> quorum : quorums.values()) { - // close the zookeeper client. - zooKeeperAccessor.close(); + quorum.terminate(); - } catch (InterruptedException e) { + } - throw new RuntimeException(e); + /* + * Close our zookeeper connection, invalidating all ephemeral znodes + * for this service. + * + * Note: This provides a decisive mechanism for removing this + * service from the joined services, the pipeline, withdrawing its + * vote, and removing it as a quorum member. + */ + log.warn("FORCING UNCURABLE ZOOKEEPER DISCONNECT"); + if (zka != null) { + + try { + zka.close(); + } catch (InterruptedException e) { + // propagate the interrupt. + Thread.currentThread().interrupt(); + } + } - + // try { + // + // // close the zookeeper client. + // zooKeeperAccessor.close(); + // + // } catch (InterruptedException e) { + // + // throw new RuntimeException(e); + // + // } + } /** + * An object used to manage jini service registrar discovery. + */ + public LookupDiscoveryManager getDiscoveryManagement() { + + return lookupDiscoveryManager; + + } + + /** + * An object used to lookup services using the discovered service + * registars. + */ + public ServiceDiscoveryManager getServiceDiscoveryManager() { + + return serviceDiscoveryManager; + + } + + /** + * Caching discovery client for the {@link HAGlue} services. + */ + public HAGlueServicesClient getHAGlueServicesClient() { + + return discoveryClient; + + } + + /** * Resolve the service identifier to an {@link IDataService}. * <p> * Note: Whether the returned object is a proxy or the service @@ -881,7 +1015,7 @@ public HAGlue getHAGlueService(final UUID serviceUUID) { return discoveryClient.getService(); - + } /** @@ -902,25 +1036,215 @@ } + /* + * HAGlue Quorum + */ + /** - * Return ANY {@link HAGlue} service which has been (or could be) - * discovered and which is part of the connected federation. + * Return the set of known logical service identifiers for HA + * replication clusters. These are extracted from zookeeper. * - * @return <code>null</code> if there are NO known {@link HAGlue} - * services. + * @return The known logical service identifiers. + * + * @throws InterruptedException + * @throws KeeperException */ - public HAGlue getAnyHAGlueService() { + public String[] getHALogicalServiceIds() throws KeeperException, + InterruptedException { - assertOpen(); + final ZookeeperClientConfig zkClientConfig = getZooConfig(); - return discoveryClient.getService(); - + // zpath dominating the HA replication clusters. + final String logicalServiceZPathPrefix = zkClientConfig.zroot + "/" + + HAJournalServer.class.getName(); + + final String[] children = zka.getZookeeper() + .getChildren(logicalServiceZPathPrefix, false/* watch */) + .toArray(new String[0]); + + return children; + } + /** + * Obtain an object that will reflect the state of the {@link Quorum} + * for the HA replica... [truncated message content] |
From: <jer...@us...> - 2013-08-22 00:22:37
|
Revision: 7313 http://bigdata.svn.sourceforge.net/bigdata/?rev=7313&view=rev Author: jeremy_carroll Date: 2013-08-22 00:22:30 +0000 (Thu, 22 Aug 2013) Log Message: ----------- corrected comment, and removed unused imports Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestInsertFilterFalse727.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestInsertFilterFalse727.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestInsertFilterFalse727.java 2013-08-22 00:19:56 UTC (rev 7312) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestInsertFilterFalse727.java 2013-08-22 00:22:30 UTC (rev 7313) @@ -54,43 +54,26 @@ package com.bigdata.rdf.sail.webapp; -import java.io.File; import junit.framework.Test; -import org.openrdf.model.Literal; import org.openrdf.model.Resource; import org.openrdf.model.URI; import org.openrdf.model.Value; import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.LiteralImpl; import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.model.vocabulary.RDF; import org.openrdf.model.vocabulary.RDFS; import org.openrdf.model.vocabulary.XMLSchema; import org.openrdf.query.parser.sparql.DC; import org.openrdf.query.parser.sparql.FOAF; -import org.openrdf.query.parser.sparql.SPARQLUpdateTest; import org.openrdf.repository.RepositoryException; -import org.openrdf.rio.RDFFormat; import com.bigdata.journal.IIndexManager; -import com.bigdata.rdf.sail.webapp.client.IPreparedTupleQuery; -import com.bigdata.rdf.sail.webapp.client.RemoteRepository; -import com.bigdata.rdf.sail.webapp.client.RemoteRepository.AddOp; /** * Proxied test suite. - * <p> - * Note: Also see {@link SPARQLUpdateTest}. These two test suites SHOULD be kept - * synchronized. {@link SPARQLUpdateTest} runs against a local kb instance while - * this class runs against the NSS. The two test suites are not exactly the same - * because one uses the {@link RemoteRepository} to commuicate with the NSS - * while the other uses the local API. - * - * @param <S> - * - * @see SPARQLUpdateTest + * We test the behavior reported in trac 727. */ public class TestInsertFilterFalse727<S extends IIndexManager> extends AbstractTestNanoSparqlClient<S> { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jer...@us...> - 2013-08-22 00:20:03
|
Revision: 7312 http://bigdata.svn.sourceforge.net/bigdata/?rev=7312&view=rev Author: jeremy_carroll Date: 2013-08-22 00:19:56 +0000 (Thu, 22 Aug 2013) Log Message: ----------- Test case for trac 727, passes immediately and bug report was in error Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestNanoSparqlServerWithProxyIndexManager.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestInsertFilterFalse727.java Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestInsertFilterFalse727.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestInsertFilterFalse727.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestInsertFilterFalse727.java 2013-08-22 00:19:56 UTC (rev 7312) @@ -0,0 +1,209 @@ +/** +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* +Portions of this code are: + +Copyright Aduna (http://www.aduna-software.com/) � 2001-2007 + +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + * Neither the name of the copyright holder nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +*/ + +package com.bigdata.rdf.sail.webapp; + +import java.io.File; + +import junit.framework.Test; + +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.model.vocabulary.RDFS; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.parser.sparql.DC; +import org.openrdf.query.parser.sparql.FOAF; +import org.openrdf.query.parser.sparql.SPARQLUpdateTest; +import org.openrdf.repository.RepositoryException; +import org.openrdf.rio.RDFFormat; + +import com.bigdata.journal.IIndexManager; +import com.bigdata.rdf.sail.webapp.client.IPreparedTupleQuery; +import com.bigdata.rdf.sail.webapp.client.RemoteRepository; +import com.bigdata.rdf.sail.webapp.client.RemoteRepository.AddOp; + +/** + * Proxied test suite. + * <p> + * Note: Also see {@link SPARQLUpdateTest}. These two test suites SHOULD be kept + * synchronized. {@link SPARQLUpdateTest} runs against a local kb instance while + * this class runs against the NSS. The two test suites are not exactly the same + * because one uses the {@link RemoteRepository} to commuicate with the NSS + * while the other uses the local API. + * + * @param <S> + * + * @see SPARQLUpdateTest + */ +public class TestInsertFilterFalse727<S extends IIndexManager> extends + AbstractTestNanoSparqlClient<S> { + + static public Test suite() { + return ProxySuiteHelper.suiteWhenStandalone(TestInsertFilterFalse727.class,"test.*", TestMode.quads,TestMode.sids,TestMode.triples); + } + public TestInsertFilterFalse727() { + + } + + public TestInsertFilterFalse727(final String name) { + + super(name); + + } + + private static final String EX_NS = "http://example.org/"; + + private ValueFactory f = new ValueFactoryImpl(); + private URI bob; +// protected RemoteRepository m_repo; + + + @Override + public void setUp() throws Exception { + + super.setUp(); + + bob = f.createURI(EX_NS, "bob"); + } + + public void tearDown() throws Exception { + + bob = null; + + f = null; + + super.tearDown(); + + } + + + + /** + * Get a set of useful namespace prefix declarations. + * + * @return namespace prefix declarations for rdf, rdfs, dc, foaf and ex. + */ + protected String getNamespaceDeclarations() { + final StringBuilder declarations = new StringBuilder(); + declarations.append("PREFIX rdf: <" + RDF.NAMESPACE + "> \n"); + declarations.append("PREFIX rdfs: <" + RDFS.NAMESPACE + "> \n"); + declarations.append("PREFIX dc: <" + DC.NAMESPACE + "> \n"); + declarations.append("PREFIX foaf: <" + FOAF.NAMESPACE + "> \n"); + declarations.append("PREFIX ex: <" + EX_NS + "> \n"); + declarations.append("PREFIX xsd: <" + XMLSchema.NAMESPACE + "> \n"); + declarations.append("\n"); + + return declarations.toString(); + } + + protected boolean hasStatement(final Resource subj, final URI pred, + final Value obj, final boolean includeInferred, + final Resource... contexts) throws RepositoryException { + + try { + + return m_repo.getStatements(subj, pred, obj, includeInferred, + contexts).hasNext(); + + } catch (Exception e) { + + throw new RepositoryException(e); + + } + + } + + public void testInsertWhereTrue() + throws Exception + { + executeInsert("FILTER ( true )", true); + } + private void executeInsert(String where, boolean expected) throws RepositoryException, Exception { + final StringBuilder update = new StringBuilder(); + update.append(getNamespaceDeclarations()); + update.append("INSERT { ex:bob rdfs:label \"Bob\" . } WHERE { " + where +" }"); + + assertFalse(hasStatement(bob, RDFS.LABEL, f.createLiteral("Bob"), true)); + + m_repo.prepareUpdate(update.toString()).evaluate(); + + assertEquals(expected, hasStatement(bob, RDFS.LABEL, f.createLiteral("Bob"), true)); + } + + public void testInsertWhereFalse() + throws Exception + { + executeInsert("FILTER ( false )", false); + } + + + public void testInsertWhereOptionallyTrue() + throws Exception + { + executeInsert("OPTIONAL { FILTER ( true ) }", true); + } + + public void testInsertWhereOptionallyFalse() + throws Exception + { + executeInsert("OPTIONAL { FILTER ( false ) }", true); + } + +} Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestNanoSparqlServerWithProxyIndexManager.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestNanoSparqlServerWithProxyIndexManager.java 2013-08-21 22:32:01 UTC (rev 7311) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestNanoSparqlServerWithProxyIndexManager.java 2013-08-22 00:19:56 UTC (rev 7312) @@ -228,6 +228,10 @@ // BigdataSailRemoteRepository test (nano sparql server client-wrapper) suite.addTestSuite(TestBigdataSailRemoteRepository.class); + + // TestInsertFilterFalse727 + suite.addTestSuite(TestInsertFilterFalse727.class); + // SPARQL UPDATE test suite. switch(testMode) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jer...@us...> - 2013-08-21 22:32:08
|
Revision: 7311 http://bigdata.svn.sourceforge.net/bigdata/?rev=7311&view=rev Author: jeremy_carroll Date: 2013-08-21 22:32:01 +0000 (Wed, 21 Aug 2013) Log Message: ----------- test for filter exist in subselect behavior, trac 725 Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestSubSelectFilterExist725.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725-no-sub-select.rq branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725-sub-select.rq branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725.srx branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725.ttl Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestSubSelectFilterExist725.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestSubSelectFilterExist725.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestSubSelectFilterExist725.java 2013-08-21 22:32:01 UTC (rev 7311) @@ -0,0 +1,74 @@ +/** + +Copyright (C) SYSTAP, LLC 2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.rdf.sparql.ast.eval; + +/** + * Test for trac725 + * <pre> +#select * +#where { +#{ +SELECT ( COUNT(?narrow) as ?countNarrow ) ?scheme +WHERE +{ ?narrow skos:inScheme ?scheme . +FILTER EXISTS { ?narrow skos:broader ?b } +} +GROUP BY ?scheme +#} +#} +</pre> + * + */ +public class TestSubSelectFilterExist725 extends AbstractDataDrivenSPARQLTestCase { + + public TestSubSelectFilterExist725() { + } + + public TestSubSelectFilterExist725(String name) { + super(name); + } + + public void test_without_subselect() throws Exception { + + new TestHelper( + "filter-exist-725-no-sub-select",// testURI + "filter-exist-725-no-sub-select.rq", // queryURI + "filter-exist-725.ttl", // dataURI + "filter-exist-725.srx" // resultURI + ).runTest(); + + } + + public void test_with_subselect() throws Exception { + + new TestHelper( + "filter-exist-725-sub-select",// testURI + "filter-exist-725-sub-select.rq", // queryURI + "filter-exist-725.ttl", // dataURI + "filter-exist-725.srx" // resultURI + ).runTest(); + + } + +} Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725-no-sub-select.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725-no-sub-select.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725-no-sub-select.rq 2013-08-21 22:32:01 UTC (rev 7311) @@ -0,0 +1,13 @@ +PREFIX : <http://example/> + +#select * +#where { +#{ +SELECT ( COUNT(?narrow) as ?countNarrow ) ?scheme +WHERE +{ ?narrow :inScheme ?scheme . +FILTER EXISTS { ?narrow :broader ?b } +} +GROUP BY ?scheme +#} +#} \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725-sub-select.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725-sub-select.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725-sub-select.rq 2013-08-21 22:32:01 UTC (rev 7311) @@ -0,0 +1,13 @@ +PREFIX : <http://example/> + +select * +where { +{ + SELECT ( COUNT(?narrow) as ?countNarrow ) ?scheme + WHERE + { ?narrow :inScheme ?scheme . + FILTER EXISTS { ?narrow :broader ?b } + } + GROUP BY ?scheme +} +} \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725.srx =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725.srx 2013-08-21 22:32:01 UTC (rev 7311) @@ -0,0 +1,20 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="scheme"/> + <variable name="countNarrow"/> + </head> + <results> + <result> + <binding name="countNarrow"> + <literal datatype="http://www.w3.org/2001/XMLSchema#integer">1</literal> + </binding> + <binding name="scheme"> + <uri>http://example/z</uri> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725.ttl =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725.ttl (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/filter-exist-725.ttl 2013-08-21 22:32:01 UTC (rev 7311) @@ -0,0 +1,6 @@ +@prefix : <http://example/> . + +:x :broader :y . +:x :inScheme :z . + + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jer...@us...> - 2013-08-21 22:08:46
|
Revision: 7310 http://bigdata.svn.sourceforge.net/bigdata/?rev=7310&view=rev Author: jeremy_carroll Date: 2013-08-21 22:08:39 +0000 (Wed, 21 Aug 2013) Log Message: ----------- "Bigdata does not distinguish between an empty named graph and a named graph that does not exist" Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.srx Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429empty.srx Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java 2013-08-21 21:51:37 UTC (rev 7309) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java 2013-08-21 22:08:39 UTC (rev 7310) @@ -50,7 +50,7 @@ } - public void test_empty_graph_matches() throws Exception { + public void test_empty_graph_does_not_match() throws Exception { new TestHelper("trac709empty").runTest(); @@ -65,9 +65,9 @@ new TestHelper("trac429neg").runTest(); } - public void test_work_empty_graph_matches_by_uri() throws Exception { + public void test_empty_graph_does_not_match_by_uri() throws Exception { - new TestHelper("trac429empty", "trac429.rq", "trac429empty.trig", "trac429.srx").runTest(); + new TestHelper("trac429empty", "trac429.rq", "trac429empty.trig", "trac429empty.srx").runTest(); } public void test_work_around_graph_var() throws Exception { Copied: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429empty.srx (from rev 7309, branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.srx) =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429empty.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429empty.srx 2013-08-21 22:08:39 UTC (rev 7310) @@ -0,0 +1,11 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="g"/> + </head> + <results> + </results> +</sparql> \ No newline at end of file Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.srx =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.srx 2013-08-21 21:51:37 UTC (rev 7309) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.srx 2013-08-21 22:08:39 UTC (rev 7310) @@ -12,10 +12,5 @@ <uri>http://www.bigdata.com/</uri> </binding> </result> - <result> - <binding name="g"> - <uri>http://www.bigdata.com/a</uri> - </binding> - </result> </results> </sparql> \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jer...@us...> - 2013-08-21 21:51:44
|
Revision: 7309 http://bigdata.svn.sourceforge.net/bigdata/?rev=7309&view=rev Author: jeremy_carroll Date: 2013-08-21 21:51:37 +0000 (Wed, 21 Aug 2013) Log Message: ----------- test for heisenbug concerning bind. Test demonstrates heisen effect on my quad core machine Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestBindHeisenbug708.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.rq branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.srx branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.ttl Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestBindHeisenbug708.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestBindHeisenbug708.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestBindHeisenbug708.java 2013-08-21 21:51:37 UTC (rev 7309) @@ -0,0 +1,64 @@ +/** + +Copyright (C) SYSTAP, LLC 2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.rdf.sparql.ast.eval; + +import junit.framework.AssertionFailedError; + +/** + * Test suite for a hesienbug involving BIND. + * Unlike the other issues this sometimes happens, and is sometimes OK, + * so we run the test in a loop 20 times. + * + * @version $Id$ + */ +public class TestBindHeisenbug708 extends AbstractDataDrivenSPARQLTestCase { + + public TestBindHeisenbug708() { + } + + public TestBindHeisenbug708(String name) { + super(name); + } + + public void test_heisenbug708() throws Exception { + int cnt = 0; + int max = 10; + for (int i=0; i<max; i++) { + try { + new TestHelper( + "heisenbug-708",// testURI + "heisenbug-708.rq", // queryURI + "heisenbug-708.ttl", // dataURI + "heisenbug-708.srx" // resultURI + ).runTest(); + } + catch (AssertionFailedError e) { + cnt++; + } + } + assertTrue("Test failed " + cnt + "/" + max + " times", cnt==0); + } + + +} Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.rq 2013-08-21 21:51:37 UTC (rev 7309) @@ -0,0 +1,14 @@ +PREFIX : <http://example/> + + +SELECT ?a +{ + { + ?a :label "RNK-16-2B3" + } + UNION + { + BIND ( :bound as ?a ) + } +} + Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.srx =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.srx 2013-08-21 21:51:37 UTC (rev 7309) @@ -0,0 +1,21 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="a"/> + </head> + <results> + <result> + <binding name="a"> + <uri>http://example/z</uri> + </binding> + </result> + <result> + <binding name="a"> + <uri>http://example/bound</uri> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.ttl =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.ttl (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/heisenbug-708.ttl 2013-08-21 21:51:37 UTC (rev 7309) @@ -0,0 +1,3 @@ +@prefix : <http://example/> . + +:z :label "RNK-16-2B3" . This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jer...@us...> - 2013-08-21 21:19:16
|
Revision: 7308 http://bigdata.svn.sourceforge.net/bigdata/?rev=7308&view=rev Author: jeremy_carroll Date: 2013-08-21 21:19:09 +0000 (Wed, 21 Aug 2013) Log Message: ----------- further trac429 tests Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429empty.trig branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.rq branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.srx branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.trig Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java 2013-08-21 21:05:38 UTC (rev 7307) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java 2013-08-21 21:19:09 UTC (rev 7308) @@ -60,6 +60,16 @@ new TestHelper("trac429").runTest(); } + public void test_notgraph_uri() throws Exception { + + new TestHelper("trac429neg").runTest(); + + } + public void test_work_empty_graph_matches_by_uri() throws Exception { + + new TestHelper("trac429empty", "trac429.rq", "trac429empty.trig", "trac429.srx").runTest(); + + } public void test_work_around_graph_var() throws Exception { new TestHelper("trac709workaround", "trac709workaround.rq", "trac709.trig", "trac709.srx").runTest(); Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429empty.trig =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429empty.trig (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429empty.trig 2013-08-21 21:19:09 UTC (rev 7308) @@ -0,0 +1,10 @@ +@prefix : <http://www.bigdata.com/> . +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . + +: { +} + +:a { +:y rdf:type :B . +:z rdf:type :A . +} Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.rq 2013-08-21 21:19:09 UTC (rev 7308) @@ -0,0 +1,4 @@ +prefix : <http://www.bigdata.com/> +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> + +select ?g where { BIND ("1080" as ?g) . graph <http://www.bigdata.com/> {} } Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.srx =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.srx 2013-08-21 21:19:09 UTC (rev 7308) @@ -0,0 +1,11 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="g"/> + </head> + <results> + </results> +</sparql> \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.trig =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.trig (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429neg.trig 2013-08-21 21:19:09 UTC (rev 7308) @@ -0,0 +1,11 @@ +@prefix : <http://www.bigdata.com/> . +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . + +:b { +:x rdf:type :C . +} + +:a { +:y rdf:type :B . +:z rdf:type :A . +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jer...@us...> - 2013-08-21 21:05:46
|
Revision: 7307 http://bigdata.svn.sourceforge.net/bigdata/?rev=7307&view=rev Author: jeremy_carroll Date: 2013-08-21 21:05:38 +0000 (Wed, 21 Aug 2013) Log Message: ----------- tests for 709 and 429 select ?g { graph ?g {} } Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.rq branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.srx branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.trig branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.rq branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.srx branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.trig branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.rq branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.srx branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.trig branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709workaround.rq Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestGraphEmptyPattern709_429.java 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,69 @@ +/** + +Copyright (C) SYSTAP, LLC 2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.rdf.sparql.ast.eval; + + +/** + * Tests concerning "SELECT GRAPH XXXX {}" with XXXX and the dataset varying. + * + */ +public class TestGraphEmptyPattern709_429 extends AbstractDataDrivenSPARQLTestCase { + + /** + * + */ + public TestGraphEmptyPattern709_429() { + } + + /** + * @param name + */ + public TestGraphEmptyPattern709_429(String name) { + super(name); + } + + public void test_graph_var() throws Exception { + + new TestHelper("trac709").runTest(); + + } + + public void test_empty_graph_matches() throws Exception { + + new TestHelper("trac709empty").runTest(); + + } + public void test_graph_uri() throws Exception { + + new TestHelper("trac429").runTest(); + + } + public void test_work_around_graph_var() throws Exception { + + new TestHelper("trac709workaround", "trac709workaround.rq", "trac709.trig", "trac709.srx").runTest(); + + } + +} Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.rq 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,4 @@ +prefix : <http://www.bigdata.com/> +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> + +select ?g where { BIND ("1080" as ?g) . graph <http://www.bigdata.com/> {} } Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.srx =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.srx 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,16 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="g"/> + </head> + <results> + <result> + <binding name="g"> + <literal>1080</literal> + </binding> + </result> + </results> +</sparql> \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.trig =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.trig (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac429.trig 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,11 @@ +@prefix : <http://www.bigdata.com/> . +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . + +: { +:x rdf:type :C . +} + +:a { +:y rdf:type :B . +:z rdf:type :A . +} Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.rq 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,4 @@ +prefix : <http://www.bigdata.com/> +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> + +select ?g where { graph ?g {} } Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.srx =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.srx 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,21 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="g"/> + </head> + <results> + <result> + <binding name="g"> + <uri>http://www.bigdata.com/</uri> + </binding> + </result> + <result> + <binding name="g"> + <uri>http://www.bigdata.com/a</uri> + </binding> + </result> + </results> +</sparql> \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.trig =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.trig (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709.trig 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,11 @@ +@prefix : <http://www.bigdata.com/> . +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . + +: { +:x rdf:type :C . +} + +:a { +:y rdf:type :B . +:z rdf:type :A . +} Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.rq 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,4 @@ +prefix : <http://www.bigdata.com/> +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> + +select ?g where { graph ?g {} } Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.srx =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.srx 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,21 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="g"/> + </head> + <results> + <result> + <binding name="g"> + <uri>http://www.bigdata.com/</uri> + </binding> + </result> + <result> + <binding name="g"> + <uri>http://www.bigdata.com/a</uri> + </binding> + </result> + </results> +</sparql> \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.trig =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.trig (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709empty.trig 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,9 @@ +@prefix : <http://www.bigdata.com/> . +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . + +: { +:x rdf:type :C . +} + +:a { +} Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709workaround.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709workaround.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/trac709workaround.rq 2013-08-21 21:05:38 UTC (rev 7307) @@ -0,0 +1,8 @@ +prefix : <http://www.bigdata.com/> +prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> + +select ?g where { + { select distinct ?g + { graph ?g { ?s ?p ?o } } + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-21 20:53:34
|
Revision: 7306 http://bigdata.svn.sourceforge.net/bigdata/?rev=7306&view=rev Author: thompsonbry Date: 2013-08-21 20:53:23 +0000 (Wed, 21 Aug 2013) Log Message: ----------- Added an HAClient class. - It does not yet setup a Quorum object to reflect the state of a connected zookeeper quorum for a specific HA replication cluster. - I have not refactored AbstractServer and HAJournalServer to replace the logic in those classes that is replicated by the HAClient class. See #728 Modified Paths: -------------- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalDiscoveryClient.java Added Paths: ----------- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java Modified: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-08-21 18:34:53 UTC (rev 7305) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-08-21 20:53:23 UTC (rev 7306) @@ -407,17 +407,21 @@ return serviceDiscoveryManager; } + + /* + * DiscoveryListener + */ /** * Lock controlling access to the {@link #discoveryEvent} {@link Condition}. */ - protected final ReentrantLock discoveryEventLock = new ReentrantLock(); + private final ReentrantLock discoveryEventLock = new ReentrantLock(); /** * Condition signaled any time there is a {@link DiscoveryEvent} delivered to * our {@link DiscoveryListener}. */ - protected final Condition discoveryEvent = discoveryEventLock + private final Condition discoveryEvent = discoveryEventLock .newCondition(); /** Added: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java (rev 0) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-08-21 20:53:23 UTC (rev 7306) @@ -0,0 +1,1111 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Mar 24, 2007 + */ + +package com.bigdata.journal.jini.ha; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import net.jini.config.Configuration; +import net.jini.config.ConfigurationException; +import net.jini.config.ConfigurationProvider; +import net.jini.core.discovery.LookupLocator; +import net.jini.core.lookup.ServiceItem; +import net.jini.discovery.DiscoveryEvent; +import net.jini.discovery.DiscoveryListener; +import net.jini.discovery.LookupDiscoveryManager; +import net.jini.lease.LeaseRenewalManager; +import net.jini.lookup.ServiceDiscoveryEvent; +import net.jini.lookup.ServiceDiscoveryListener; +import net.jini.lookup.ServiceDiscoveryManager; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.ZooKeeper; + +import com.bigdata.ha.HAGlue; +import com.bigdata.jini.start.config.ZookeeperClientConfig; +import com.bigdata.jini.util.JiniUtil; +import com.bigdata.service.IDataService; +import com.bigdata.service.IService; +import com.bigdata.service.IServiceShutdown; +import com.bigdata.service.jini.JiniClient; +import com.bigdata.service.jini.JiniClientConfig; +import com.bigdata.zookeeper.ZooKeeperAccessor; +import com.sun.jini.start.ServiceDescriptor; + +/** + * A client capable of connecting to a bigdata highly available replication + * cluster. + * <p> + * Clients are configured using a Jini service configuration file. The name of + * that file is passed to {@link #newInstance(String[])}. The configuration must + * be consistent with the configuration of the federation to which you wish to + * connect. + * + * FIXME Review how we connect to a specific logical HA replication cluster and + * verify that we can connect to multiple such clusters in order to support RMI + * operations across those clusters (as long as they are in the same space). It + * might be that this is achieved through a HAConnection to each logical cluster + * instance, or not. The quorums are certainly specific to a logical instance + * and that is determined by the zkroot. Investigate! + * + * FIXME Retro fit into HAJournalServer (discoveryClient) and AbstractServer. + * + * FIXME This does not start up a quorum and set a quorum client. It should so + * we can see the quorum events. That could be a useful thing to do in main(). + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class HAClient { + + private static final Logger log = Logger.getLogger(HAClient.class); + + /** + * Options understood by the {@link HAClient}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + public static interface Options { + + /** + * The timeout in milliseconds that the client will await the discovery + * of a service if there is a cache miss (default + * {@value #DEFAULT_CACHE_MISS_TIMEOUT}). + * + * @see HAJournalDiscoveryClient + */ + String CACHE_MISS_TIMEOUT = "cacheMissTimeout"; + + String DEFAULT_CACHE_MISS_TIMEOUT = "" + (2 * 1000); + + } + + /** + * The value is the {@link HAConnection} and <code>null</code> iff not connected. + */ + private final AtomicReference<HAConnection> fed = new AtomicReference<HAConnection>(); + + /** + * The lock used to guard {@link #connect()} and + * {@link #disconnect(boolean)}. + * <p> + * Note: In order to avoid some deadlocks during the shutdown protocol, I + * refactored several methods which were using synchronized(this) to either + * use an {@link AtomicReference} (for {@link #fed} or to use a hidden lock. + */ + private final Lock connectLock = new ReentrantLock(false/* fair */); + + public boolean isConnected() { + + return fed.get() != null; + + } + + public HAConnection getConnection() { + + final HAConnection fed = this.fed.get(); + + if (fed == null) { + + throw new IllegalStateException(); + + } + + return fed; + + } + + /** + * {@inheritDoc} + * <p> + * Note: Immediate shutdown can cause odd exceptions to be logged. Normal + * shutdown is recommended unless there is a reason to force immediate + * shutdown. + * + * <pre> + * java.rmi.MarshalException: error marshalling arguments; nested exception is: + * java.io.IOException: request I/O interrupted + * at net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:785) + * at net.jini.jeri.BasicInvocationHandler.invokeRemoteMethod(BasicInvocationHandler.java:659) + * at net.jini.jeri.BasicInvocationHandler.invoke(BasicInvocationHandler.java:528) + * at $Proxy5.notify(Ljava.lang.String;Ljava.util.UUID;Ljava.lang.String;[B)V(Unknown Source) + * </pre> + * + * These messages may be safely ignored if they occur during immediate + * shutdown. + * + * @param immediateShutdown + * When <code>true</code> the shutdown is <em>abrupt</em>. You + * can expect to see messages about interrupted IO such as + */ + public void disconnect(final boolean immediateShutdown) { + + connectLock.lock(); + + try { + + final HAConnection fed = this.fed.get(); + + if (fed != null) { + + if (immediateShutdown) { + + fed.shutdownNow(); + + } else { + + fed.shutdown(); + + } + + } + + this.fed.set(null); + + } finally { + + connectLock.unlock(); + + } + + } + + public HAConnection connect() { + + connectLock.lock(); + + try { + + HAConnection fed = this.fed.get(); + + if (fed == null) { + + fed = new HAConnection(jiniConfig, zooConfig); + + this.fed.set(fed); + + fed.start(this); + + } + + return fed; + + } finally { + + connectLock.unlock(); + + } + + } + + /** + * The {@link JiniClientConfig}. + */ + public final JiniClientConfig jiniConfig; + + /** + * The {@link ZooKeeper} client configuration. + */ + public final ZookeeperClientConfig zooConfig; + + /** + * The {@link Configuration} object used to initialize this class. + */ + private final Configuration config; + + /** + * The {@link JiniClientConfig}. + */ + public JiniClientConfig getJiniClientConfig() { + + return jiniConfig; + + } + + /** + * The {@link ZooKeeper} client configuration. + */ + public final ZookeeperClientConfig getZookeeperClientConfig() { + + return zooConfig; + + } + + /** + * The {@link Configuration} object used to initialize this class. + */ + public final Configuration getConfiguration() { + + return config; + + } + +// /** +// * {@inheritDoc} +// * +// * @see #getProperties(String component) +// */ +// public Properties getProperties() { +// +// return properties; +// +// } + +// /** +// * Return the {@link Properties} for the {@link JiniClient} merged with +// * those for the named component in the {@link Configuration}. Any +// * properties found for the {@link JiniClient} "component" will be read +// * first. Properties for the named component are next, and therefore will +// * override those given for the {@link JiniClient}. You can specify +// * properties for either the {@link JiniClient} or the <i>component</i> +// * using: +// * +// * <pre> +// * properties = NV[]{...}; +// * </pre> +// * +// * in the appropriate section of the {@link Configuration}. For example: +// * +// * <pre> +// * +// * // Jini client configuration +// * com.bigdata.service.jini.JiniClient { +// * +// * // ... +// * +// * // optional JiniClient properties. +// * properties = new NV[] { +// * +// * new NV("foo","bar") +// * +// * }; +// * +// * } +// * </pre> +// * +// * And overrides for a named component as: +// * +// * <pre> +// * +// * com.bigdata.service.FooBaz { +// * +// * properties = new NV[] { +// * +// * new NV("foo","baz"), +// * new NV("goo","12"), +// * +// * }; +// * +// * } +// * </pre> +// * +// * @param component +// * The name of the component. +// * +// * @return The properties specified for that component. +// * +// * @see #getConfiguration() +// */ +// public Properties getProperties(final String component) +// throws ConfigurationException { +// +// return JiniClient.getProperties(component, getConfiguration()); +// +// } + +// /** +// * Read {@value JiniClientConfig.Options#PROPERTIES} for the optional +// * application or server class identified by [cls]. +// * <p> +// * Note: Anything read for the specific class will overwrite any value for +// * the same properties specified for {@link JiniClient}. +// * +// * @param className +// * The class name of the client or service (optional). When +// * specified, properties defined for that class in the +// * configuration will be used and will override those specified +// * for the {@value Options#NAMESPACE}. +// * @param config +// * The {@link Configuration}. +// * +// * @todo this could be replaced by explicit use of the java identifier +// * corresponding to the Option and simply collecting all such +// * properties into a Properties object using their native type (as +// * reported by the ConfigurationFile). +// */ +// static public Properties getProperties(final String className, +// final Configuration config) throws ConfigurationException { +// +// final NV[] a = (NV[]) config +// .getEntry(JiniClient.class.getName(), +// JiniClientConfig.Options.PROPERTIES, NV[].class, +// new NV[] {}/* defaultValue */); +// +// final NV[] b; +// if (className != null) { +// +// b = (NV[]) config.getEntry(className, +// JiniClientConfig.Options.PROPERTIES, NV[].class, +// new NV[] {}/* defaultValue */); +// +// } else +// b = null; +// +// final NV[] tmp = ConfigMath.concat(a, b); +// +// final Properties properties = new Properties(); +// +// for (NV nv : tmp) { +// +// properties.setProperty(nv.getName(), nv.getValue()); +// +// } +// +// if (log.isInfoEnabled() || BigdataStatics.debug) { +// +// final String msg = "className=" + className + " : properties=" +// + properties.toString(); +// +// if (BigdataStatics.debug) +// System.err.println(msg); +// +// if (log.isInfoEnabled()) +// log.info(msg); +// +// } +// +// return properties; +// +// } + + /** + * Installs a {@link SecurityManager} and returns a new client. + * + * @param args + * Either the command line arguments or the arguments from the + * {@link ServiceDescriptor}. Either way they identify the jini + * {@link Configuration} (you may specify either a file or URL) + * and optional overrides for that {@link Configuration}. + * + * @return The new client. + * + * @throws RuntimeException + * if there is a problem reading the jini configuration for the + * client, reading the properties for the client, etc. + */ + public static HAClient newInstance(final String[] args) { + + // set the security manager. + setSecurityManager(); + + try { + + return new HAClient(args); + + } catch (ConfigurationException e) { + + throw new RuntimeException(e); + + } + + } + + /** + * Configures a client. + * + * @param args + * The jini {@link Configuration} (you may specify either a file + * or URL) and optional overrides for that {@link Configuration}. + * + * @throws ConfigurationException + */ + public HAClient(final String[] args) throws ConfigurationException { + + this(HAClient.class, ConfigurationProvider.getInstance(args)); + + } + + /** + * Configures a client. + * + * @param cls + * The class of the client (optional) determines the component + * whose configuration will be read in addition to that for the + * {@link JiniClient} itself. Component specific values will + * override those specified for the {@link JiniClient} in the + * {@link Configuration}. + * @param config + * The configuration object. + * + * @throws ConfigurationException + */ + public HAClient(final Class cls, final Configuration config) + throws ConfigurationException { + + if (config == null) + throw new IllegalArgumentException(); + +// this.properties = JiniClient.getProperties(cls.getName(), config); + + this.jiniConfig = new JiniClientConfig(cls.getName(), config); + + this.zooConfig = new ZookeeperClientConfig(config); + + this.config = config; + + } + + /** + * Conditionally install a suitable security manager if there is none in + * place. This is required before the client can download code. The code + * will be downloaded from the HTTP server identified by the + * <code>java.rmi.server.codebase</code> property specified for the VM + * running the service. + */ + static protected void setSecurityManager() { + + final SecurityManager sm = System.getSecurityManager(); + + if (sm == null) { + + System.setSecurityManager(new SecurityManager()); + + if (log.isInfoEnabled()) + log.info("Set security manager"); + + } else { + + if (log.isInfoEnabled()) + log.info("Security manager already in place: " + sm.getClass()); + + } + + } + +// /** +// * Read and return the content of the properties file. +// * +// * @param propertyFile +// * The properties file. +// * +// * @throws IOException +// */ +// static protected Properties getProperties(final File propertyFile) +// throws IOException { +// +// if(log.isInfoEnabled()) { +// +// log.info("Reading properties: file="+propertyFile); +// +// } +// +// final Properties properties = new Properties(); +// +// InputStream is = null; +// +// try { +// +// is = new BufferedInputStream(new FileInputStream(propertyFile)); +// +// properties.load(is); +// +// if(log.isInfoEnabled()) { +// +// log.info("Read properties: " + properties); +// +// } +// +// return properties; +// +// } finally { +// +// if (is != null) +// is.close(); +// +// } +// +// } + + /** + * Invoked when a service join is noticed. + * + * @param service + * The RMI interface for the service. + * @param serviceUUID + * The service identifier. + */ + public void serviceJoin(final IService service, final UUID serviceUUID) { + + if (log.isInfoEnabled()) + log.info("service=" + service + ", serviceUUID" + serviceUUID); + + } + + /** + * Invoked when a service leave is noticed. + * + * @param serviceUUID The service identifier. + */ + public void serviceLeave(final UUID serviceUUID) { + + if (log.isInfoEnabled()) + log.info("serviceUUID=" + serviceUUID); + + } + + /** + * + * TODO Pattern after JiniFederation. Take the DiscoveryListener, etc. from + * AbstractServer but compare to JiniFederation. + */ + static public class HAConnection implements DiscoveryListener, + ServiceDiscoveryListener, IServiceShutdown { + + private final JiniClientConfig jiniConfig; + + private final ZookeeperClientConfig zooConfig; + + /** + * The {@link HAClient} reference. When non-<code>null</code> the client + * is connected. When <code>null</code> it is disconnected. + */ + private final AtomicReference<HAClient> clientRef = new AtomicReference<HAClient>(); + + private ZooKeeperAccessor zooKeeperAccessor; + + private LookupDiscoveryManager lookupDiscoveryManager; + + private ServiceDiscoveryManager serviceDiscoveryManager; + + /** + * Caching discovery client for the {@link HAGlue} services. + */// TODO Rename as haGlueDiscoveryService + private HAJournalDiscoveryClient discoveryClient; + + private HAConnection(final JiniClientConfig jiniConfig, + final ZookeeperClientConfig zooConfig) { + + if (jiniConfig == null) + throw new IllegalArgumentException(); + + if (zooConfig == null) + throw new IllegalArgumentException(); + + this.jiniConfig = jiniConfig; + + this.zooConfig = zooConfig; + + } + + /** + * Return the client object that was used to obtain the connection. + * + * @return The {@link HAClient} reference. When non-<code>null</code> + * the client is connected. When <code>null</code> it is + * disconnected. + * + * @throws IllegalStateException + * if the client disconnected and this object is no longer + * valid. + * + * TODO add getOpenClient() and use to verify that the + * client is non-null for various internal methods. or use + * lambda to make this safe for things that need to be done + * conditionally. + */ + public HAClient getClient() { + + return clientRef.get(); + + } + + @Override + public boolean isOpen() { + + return getClient() != null; + + } + + private void assertOpen() { + + if (!isOpen()) { + + throw new IllegalStateException(); + + } + + } + + public synchronized void start(final HAClient client) { + + if(client == null) + throw new IllegalArgumentException(); + + if (isOpen()) + throw new IllegalStateException(); + + if (log.isInfoEnabled()) + log.info(jiniConfig.toString()); + + final String[] groups = jiniConfig.groups; + + final LookupLocator[] lookupLocators = jiniConfig.locators; + + try { + + /* + * Connect to a zookeeper service in the declare ensemble of + * zookeeper servers. + */ + + zooKeeperAccessor = new ZooKeeperAccessor(zooConfig.servers, + zooConfig.sessionTimeout); + + /* + * Note: This class will perform multicast discovery if + * ALL_GROUPS is specified and otherwise requires you to specify + * one or more unicast locators (URIs of hosts running discovery + * services). As an alternative, you can use LookupDiscovery, + * which always does multicast discovery. + */ + lookupDiscoveryManager = new LookupDiscoveryManager(groups, + lookupLocators, this /* DiscoveryListener */, + client.getConfiguration()); + + /* + * Setup a helper class that will be notified as services join + * or leave the various registrars to which the data server is + * listening. + */ + try { + + serviceDiscoveryManager = new ServiceDiscoveryManager( + lookupDiscoveryManager, new LeaseRenewalManager(), + client.getConfiguration()); + + } catch(IOException ex) { + + throw new RuntimeException( + "Could not initiate service discovery manager", ex); + + } + + // FIXME Configuration [properties]. +// final long cacheMissTimeout = Long.valueOf(client.getProperties() +// .getProperty(JiniClient.Options.CACHE_MISS_TIMEOUT, +// JiniClient.Options.DEFAULT_CACHE_MISS_TIMEOUT)); + final long cacheMissTimeout = Long + .valueOf(JiniClient.Options.DEFAULT_CACHE_MISS_TIMEOUT); + + // Setup discovery for HAGlue clients. + // TODO Refactor out of HAJournalServer. + discoveryClient = new HAJournalDiscoveryClient( + serviceDiscoveryManager, + null/* serviceDiscoveryListener */, cacheMissTimeout); + + // And set the reference. The client is now "connected". + this.clientRef.set(client); + + } catch (Exception ex) { + + log.fatal("Problem initiating service discovery: " + + ex.getMessage(), ex); + + try { + + shutdownNow(); + + } catch (Throwable t) { + + log.error(t.getMessage(), t); + + } + + throw new RuntimeException(ex); + + } + + } + /** + * {@inheritDoc} + * <p> + * Extended to terminate discovery. + */ + @Override + synchronized public void shutdown() { + + if(!isOpen()) return; + + if (log.isInfoEnabled()) + log.info("begin"); + + // Disconnect. + clientRef.set(null); + + final long begin = System.currentTimeMillis(); + +// super.shutdown(); + + terminateDiscoveryProcesses(); + + final long elapsed = System.currentTimeMillis() - begin; + + if (log.isInfoEnabled()) + log.info("Done: elapsed=" + elapsed + "ms"); + + } + + /** + * {@inheritDoc} + * <p> + * Extended to terminate discovery. + */ + @Override + synchronized public void shutdownNow() { + + if (!isOpen()) + return; + + if (log.isInfoEnabled()) + log.info("begin"); + + // Disconnect. + clientRef.set(null); + + final long begin = System.currentTimeMillis(); + + // super.shutdownNow(); + + terminateDiscoveryProcesses(); + + final long elapsed = System.currentTimeMillis() - begin; + + if (log.isInfoEnabled()) + log.info("Done: elapsed=" + elapsed + "ms"); + + } + + /** + * Stop various discovery processes. + */ + private void terminateDiscoveryProcesses() { + + /* + * bigdata specific service discovery. + */ + + if (discoveryClient != null) { + + discoveryClient.terminate(); + + discoveryClient = null; + + } + + /* + * and the lower level jini processes. + */ + + if (serviceDiscoveryManager != null) { + + serviceDiscoveryManager.terminate(); + + serviceDiscoveryManager = null; + + } + + if (lookupDiscoveryManager != null) { + + lookupDiscoveryManager.terminate(); + + lookupDiscoveryManager = null; + + } + + try { + + // close the zookeeper client. + zooKeeperAccessor.close(); + + } catch (InterruptedException e) { + + throw new RuntimeException(e); + + } + + } + + /** + * Resolve the service identifier to an {@link IDataService}. + * <p> + * Note: Whether the returned object is a proxy or the service + * implementation depends on whether the federation is embedded (in + * process) or distributed (networked). + * + * @param serviceUUID + * The identifier for a {@link IDataService}. + * + * @return The RMI proxy for the specified {@link HAGlue} or + * <code>null</code> iff the {@link HAGlue} could not be + * discovered from its identifier. + */ + public HAGlue getHAGlueService(final UUID serviceUUID) { + + return discoveryClient.getService(); + + } + + /** + * Return an array UUIDs for discovered {@link HAGlue} services. + * + * @param maxCount + * The maximum #of data services whose UUIDs will be + * returned. When zero (0) the UUID for all known data + * services will be returned. + * + * @return An array of {@link UUID}s for the discovered services. + */ + public UUID[] getHAGlueServiceUUIDs(final int maxCount) { + + assertOpen(); + + return discoveryClient.getServiceUUIDs(maxCount, null/* filter */); + + } + + /** + * Return ANY {@link HAGlue} service which has been (or could be) + * discovered and which is part of the connected federation. + * + * @return <code>null</code> if there are NO known {@link HAGlue} + * services. + */ + public HAGlue getAnyHAGlueService() { + + assertOpen(); + + return discoveryClient.getService(); + + } + + /* + * ServiceDiscoveryListener + */ + + /** + * Invokes {@link HAClient#serviceJoin(IService, UUID)} if the newly + * discovered service implements {@link IService}. + * <p> + * {@inheritDoc} + */ + public void serviceAdded(final ServiceDiscoveryEvent e) { + + final ServiceItem serviceItem = e.getPostEventServiceItem(); + + if (serviceItem.service instanceof IService) { + + final UUID serviceUUID = JiniUtil + .serviceID2UUID(serviceItem.serviceID); + + final HAClient client = getClient(); + + if (client != null) { + + client.serviceJoin((IService) serviceItem.service, + serviceUUID); + + } + + } else { + + if (log.isInfoEnabled()) + log.info("Not an " + IService.class.getName() + " : " + e); + + } + + } + + /** + * NOP. + * <p> + * {@inheritDoc} + */ + @Override + public void serviceChanged(final ServiceDiscoveryEvent e) { + + } + + /** + * Invokes {@link HAClient#serviceLeave(UUID)}. + * <p> + * {@inheritDoc} + */ + @Override + public void serviceRemoved(final ServiceDiscoveryEvent e) { + + final ServiceItem serviceItem = e.getPreEventServiceItem(); + + final UUID serviceUUID = JiniUtil + .serviceID2UUID(serviceItem.serviceID); + + final HAClient client = getClient(); + + if (client != null) { + + client.serviceLeave(serviceUUID); + + } + + } + + /* + * DiscoveryListener + */ + + /** + * Lock controlling access to the {@link #discoveryEvent} {@link Condition}. + */ + private final ReentrantLock discoveryEventLock = new ReentrantLock(); + + /** + * Condition signaled any time there is a {@link DiscoveryEvent} delivered to + * our {@link DiscoveryListener}. + */ + private final Condition discoveryEvent = discoveryEventLock + .newCondition(); + + /** + * Signals anyone waiting on {@link #discoveryEvent}. + * <p> + * {@inheritDoc} + */ + @Override + public void discarded(final DiscoveryEvent e) { + + try { + + discoveryEventLock.lockInterruptibly(); + + try { + + discoveryEvent.signalAll(); + + } finally { + + discoveryEventLock.unlock(); + + } + + } catch (InterruptedException ex) { + + return; + + } + + } + + /** + * Signals anyone waiting on {@link #discoveryEvent}. + * <p> + * {@inheritDoc} + */ + @Override + public void discovered(final DiscoveryEvent e) { + + try { + + discoveryEventLock.lockInterruptibly(); + + try { + + discoveryEvent.signalAll(); + + } finally { + + discoveryEventLock.unlock(); + + } + + } catch (InterruptedException ex) { + + return; + + } + + } + + } + + /** + * Simple main just connects and then disconnects after a few seconds. It + * prints out all discovered {@link HAGlue} services before it shutsdown. + * + * @param args + * @throws ConfigurationException + * @throws InterruptedException + */ + public static void main(final String[] args) throws ConfigurationException, + InterruptedException { + + final HAClient client = new HAClient(args); + + final HAConnection ctx = client.connect(); + + try { + + System.out.println("Connected - waiting for service discovery."); + + Thread.sleep(5000/* ms */); + + // Get UUIDs for all discovered services. + final UUID[] serviceIds = ctx.getHAGlueServiceUUIDs(0/* maxCount */); + + System.out.println("Found " + serviceIds.length + " services."); + + for(UUID x : serviceIds) { + + System.out.println("service: " + x + " has proxy " + + ctx.getHAGlueService(x)); + + } + + } finally { + + ctx.shutdown(); + + } + + System.out.println("Bye"); + + } + +} Modified: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalDiscoveryClient.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalDiscoveryClient.java 2013-08-21 18:34:53 UTC (rev 7305) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalDiscoveryClient.java 2013-08-21 20:53:23 UTC (rev 7306) @@ -24,14 +24,15 @@ package com.bigdata.journal.jini.ha; import java.rmi.RemoteException; +import java.util.UUID; +import net.jini.core.lookup.ServiceItem; import net.jini.core.lookup.ServiceTemplate; import net.jini.lookup.LookupCache; import net.jini.lookup.ServiceDiscoveryListener; import net.jini.lookup.ServiceDiscoveryManager; import com.bigdata.ha.HAGlue; -import com.bigdata.service.IClientService; import com.bigdata.service.jini.lookup.AbstractCachingServiceClient; /** @@ -70,4 +71,53 @@ } + /** + * Return the proxy for an {@link HAGlue} service from the local cache -or- + * the reference to this service if the {@link UUID} identifies this service + * (this avoids RMI requests from a service to itself). + * + * @param serviceUUID + * The {@link UUID} for the {@link HAGlue} service. + * + * @return The proxy or <code>null</code> if the {@link UUID} does not + * identify a known {@link HAGlue} service. + * + * @throws IllegalArgumentException + * if <i>serviceUUID</i> is <code>null</code>. + */ + public HAGlue getService(final UUID serviceUUID) { + + /* + * Note: I have backed out this optimization as it raises concerns that + * code written to assume RMI might rely on the deserialized objects + * returned from the proxy being independent of the objects on the + * remote service. Since the main optimization of interest is joins, I + * will handle this explicitly from within the distributed join logic. + */ +// if (serviceUUID.equals(thisServiceUUID)) { +// +// /* +// * Return the actual service reference rather than a proxy to avoid +// * RMI when this service makes a request to itself. +// */ +// +// return (IDataService) thisService; +// +// } + + final ServiceItem serviceItem = getServiceItem(serviceUUID); + + if (serviceItem == null) { + + log.error("No such service: uuid=" + serviceUUID); + + return null; + + } + + // return the data service. + return (HAGlue) serviceItem.service; + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-21 18:35:03
|
Revision: 7305 http://bigdata.svn.sourceforge.net/bigdata/?rev=7305&view=rev Author: thompsonbry Date: 2013-08-21 18:34:53 +0000 (Wed, 21 Aug 2013) Log Message: ----------- I have refactored the Java GASEngine to allow different strategies to parallelize over the frontier. On my airbook with the small data set, I am not seeing any advantage to using parallel threads - in fact the performance is worse until I reach 4 threads (about the same performance) and then drops off again. It seems that the overhead of the LatchedExecutor and the underlying ExecutorService are not providing an advantage. This is against a relatively small graph and using an SSD. This needs to be explored on a machine with more memory, larger graphs, and compared with a machine with traditional (spinning) disks. Right now you can adjust the parallelism in the GASEngine (as constant). I will lift that out into a runtime option once I refactor to separate out the GASEngine (scope should be equivalent to the QueryEngine) and the execution environment for a given GASProgram. This commit also includes detailed reporting of interesting statistics. I am seeing about 120k TEPS on this data and this machine. See #629 Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/GASUtil.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/IGASEngine.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/SSSP.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/PerformanceTest.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/TestGather.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/IGASStats.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASStats.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/GASUtil.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/GASUtil.java 2013-08-21 16:27:40 UTC (rev 7304) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/GASUtil.java 2013-08-21 18:34:53 UTC (rev 7305) @@ -1,6 +1,8 @@ package com.bigdata.rdf.graph; +import java.util.concurrent.TimeUnit; + import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.spo.ISPO; @@ -37,5 +39,51 @@ return e.s(); } + + /** + * The average fan out of the frontier. + * + * @param frontierSize + * The size of the frontier. + * @param nedges + * The number of edges visited when mapping the operation across + * that frontier. + * + * @return The average fan out. + */ + public static double fanOut(final int frontierSize, final long nedges) { + return ((int) (nedges * 10d / frontierSize)) / 10d; + + } + + /** + * The traversed edges per second. + * + * @param nedges + * The number of traversed edges. + * @param elapsedNanos + * The elapsed time (nanoseconds). + * + * @return The traversed edges per second. + */ + public static long getTEPS(final long nedges, long elapsedNanos) { + + // avoid division by zero. + if (elapsedNanos == 0) + elapsedNanos = 1; + + // edges/nanosecond. + final double tepns = ((double) nedges) / elapsedNanos; + + // scale units to edges/second. + final double teps = tepns * TimeUnit.SECONDS.toNanos(1); + + // Round off to get rid of those nasty factions. + final long r = Math.round(teps); + + return r; + + } + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/IGASEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/IGASEngine.java 2013-08-21 16:27:40 UTC (rev 7304) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/IGASEngine.java 2013-08-21 18:34:53 UTC (rev 7305) @@ -1,9 +1,9 @@ package com.bigdata.rdf.graph; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import com.bigdata.rdf.internal.IV; -import com.bigdata.rdf.store.AbstractTripleStore; /** * @@ -26,14 +26,9 @@ * {@link IGASProgram}. This would allow us to reuse resources within * the {@link IGASEngine}. */ -public interface IGASEngine<VS, ES, ST> extends Callable<Void> { +public interface IGASEngine<VS, ES, ST> extends Callable<IGASStats> { /** - * Return the graph. - */ - AbstractTripleStore getKB(); - - /** * Return the program that is being evaluated. */ IGASProgram<VS, ES, ST> getGASProgram(); @@ -67,8 +62,13 @@ /** * Execute one iteration. * + * @param stats + * Used to report statistics about the execution of the + * algorithm. + * * @return true iff the new frontier is empty. */ - boolean doRound(); - + boolean doRound(IGASStats stats) throws Exception, ExecutionException, + InterruptedException; + } \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/IGASStats.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/IGASStats.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/IGASStats.java 2013-08-21 18:34:53 UTC (rev 7305) @@ -0,0 +1,5 @@ +package com.bigdata.rdf.graph; + +public interface IGASStats { + +} Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/SSSP.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-08-21 16:27:40 UTC (rev 7304) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-08-21 18:34:53 UTC (rev 7305) @@ -37,10 +37,11 @@ /** * The length of an edge. * - * FIXME This should be modified to use link weights with RDR. We need a - * pattern to get the link attributes materialized with the {@link ISPO} for - * the link. That could be done using a read-ahead filter on the striterator - * if the link weights are always clustered with the ground triple. + * FIXME RDR: This should be modified to use link weights with RDR. We need + * a pattern to get the link attributes materialized with the {@link ISPO} + * for the link. That could be done using a read-ahead filter on the + * striterator if the link weights are always clustered with the ground + * triple. * * When we make this change, the distance should be of the same type as the * link weight or generalized as <code>double</code>. Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-08-21 16:27:40 UTC (rev 7304) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2013-08-21 18:34:53 UTC (rev 7305) @@ -1,21 +1,33 @@ package com.bigdata.rdf.graph.impl; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.eclipse.jetty.util.ConcurrentHashSet; +import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.ITx; +import com.bigdata.journal.TimestampUtility; import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.GASUtil; import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASEngine; import com.bigdata.rdf.graph.IGASProgram; +import com.bigdata.rdf.graph.IGASStats; import com.bigdata.rdf.graph.IReducer; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.model.BigdataValue; @@ -23,9 +35,11 @@ import com.bigdata.rdf.spo.SPOFilter; import com.bigdata.rdf.store.AbstractTripleStore; import com.bigdata.relation.accesspath.IElementFilter; +import com.bigdata.service.IBigdataFederation; import com.bigdata.striterator.ChunkedStriterator; import com.bigdata.striterator.EmptyChunkedIterator; import com.bigdata.striterator.IChunkedIterator; +import com.bigdata.util.concurrent.LatchedExecutor; /** * {@link IGASEngine} for dynamic activation of vertices. This implementation @@ -50,9 +64,6 @@ * is Shortest Path (as per RDF3X). Reachability queries for a hierarchy can * also be maintained and accelerated (again, RDF3X using a ferrari index). * - * TODO Some of the more interesting questions are how to handle dynamic graphs. - * This is not yet considered at all by this code. - * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ @SuppressWarnings("rawtypes") @@ -66,8 +77,6 @@ * <p> * Note: This filter is pushed down onto the AP and evaluated close to the * data. - * - * TODO Lift out as static utility class. */ static final IElementFilter<ISPO> edgeOnlyFilter = new SPOFilter<ISPO>() { private static final long serialVersionUID = 1L; @@ -79,21 +88,38 @@ }; /** - * The KB (aka graph). - * <p> - * Note: This COULD be scale-out with remote indices or running embedded - * inside of a HA server. However, for scale-out we want to partition the - * work and the VS/ES so that would imply a different {@link IGASEngine} - * design. + * The {@link IIndexManager} is used to access the graph. */ - private final AbstractTripleStore kb; + private final IIndexManager indexManager; + + /** + * The graph (a KB instance). + */ + private final String namespace; /** + * The timestamp of the view of that graph. This MAY be + * {@link ITx#READ_COMMITTED} to use the current committed view of the graph + * for each iteration (dynamic graph). + */ + private final long timestamp; + + /** + * The {@link ExecutorService} used to parallelize tasks. + */ + private final ExecutorService executorService; + + /** * The graph analytic to be executed. */ private final IGASProgram<VS, ES, ST> program; /** + * The {@link IGASContext}. + */ + private final IGASContext<VS, ES, ST> ctx = this; + + /** * Factory for the vertex state objects. */ private final Factory<IV, VS> vsf; @@ -203,9 +229,36 @@ } - @Override - public AbstractTripleStore getKB() { +// @Override + protected AbstractTripleStore getKB() { + + long timestamp = this.timestamp; + + if(timestamp == ITx.READ_COMMITTED) { + + /** + * Note: This code is request the view as of the the last commit + * time. If we use ITx.READ_COMMITTED here then it will cause the + * Journal to provide us with a ReadCommittedIndex and that has a + * synchronization hot spot! + */ + + timestamp = indexManager.getLastCommitTime(); + + } + + final AbstractTripleStore kb = (AbstractTripleStore) indexManager + .getResourceLocator().locate(namespace, timestamp); + + if (kb == null) { + + throw new RuntimeException("Not found: namespace=" + namespace + + ", timestamp=" + TimestampUtility.toString(timestamp)); + + } + return kb; + } @Override @@ -215,21 +268,89 @@ @Override public IGASContext<VS, ES, ST> getGASContext() { - return this; + return ctx; } - public GASEngine(final AbstractTripleStore kb, - final IGASProgram<VS, ES, ST> program) { + /** + * The parallelism for the SCATTER and GATHER phases. + * + * TODO CONFIG NCores * f(IOWait). If there is lot of IO Wait, then more + * threads will provide better throughput. If there is very little IO Wait, + * then fewer threads might do better. Do a parameterized performance study + * on some large graphs for a variety of algorithms. + * + * FIXME Why is throughput significantly slower for nparallel=1 (using the + * latched executor) versus the direct loop over the vertices that I used in + * the initial implementation?!? Is this just the evaluation environment? + * Try to code multiple implementations of + * {@link #doFrontierTask(VertexTaskFactory)} and see if it is the overhead + * of the latched executor. Try thread pool with just that many threads. Try + * fork/join. Where is the overhead? + * + * <pre> + * nparallel BFS time (2nd run) using airbook 2013.08.21 under eclipse: 100 vertices from foaf crawl (500MB data, RWStore - 3 degrees?) + * 1 11s + * 2 14s + * 3 12s + * 4 11s + * 5 11s + * 6 11s + * </pre> + * <pre> + * nsamples=100, TestBFS, RWStore, Airbook, 2013.08.21. + * TOTAL: nrounds=839: fontierSize=500459, ms=9774, edges=1230097, teps=125843 @ nparallel=1 + * TOTAL: nrounds=839: fontierSize=500459, ms=9958, edges=1230097, teps=123523 @ nparallel=4 + * TOTAL: nrounds=839: fontierSize=500459, ms=10834, edges=1230097, teps=113537 @ nparallel=8 + * </pre> + * Parameterize execution runs againt these runtime options! + */ + private final int nparallel = 1; - if (kb == null) + /** + * + * @param indexManager + * The index manager. + * @param namespace + * The namespace of the graph (KB instance). + * @param timestamp + * The timestamp of the graph view (this should be a read-only + * view for non-blocking index reads). + * @param program + * The program to execute against that graph. + * + * TODO Scale-out: The {@link IIndexmanager} MAY be an + * {@link IBigdataFederation}. The {@link GASEngine} would + * automatically use remote indices. However, for proper + * scale-out we want to partition the work and the VS/ES so that + * would imply a different {@link IGASEngine} design. + * + * TODO Dynamic graphs: By allowing {@link ITx#READ_COMMITTED} to + * be specified for the timestamp this class provides some + * support for dynamic graphs, but for some use cases we would + * want to synchronize things such the iteration is performed (or + * re-converged) with each commit point or to replay a series of + * commit points (either through the commit record index or + * through the history index). + */ + public GASEngine(final IIndexManager indexManager, final String namespace, + final long timestamp, final IGASProgram<VS, ES, ST> program) { + + if (indexManager == null) throw new IllegalArgumentException(); + if (program == null) throw new IllegalArgumentException(); - this.kb = kb; - + this.indexManager = indexManager; + + this.namespace = namespace; + + this.timestamp = timestamp; + this.program = program; - + + this.executorService = indexManager.getExecutorService(); + this.vsf = program.getVertexStateFactory(); this.esf = program.getEdgeStateFactory(); @@ -265,7 +386,7 @@ * Callback to initialize the vertex state before the first * iteration. */ - program.init(getGASContext(), v); + program.init(ctx, v); } @@ -288,21 +409,28 @@ } @Override - public Void call() throws Exception { + public IGASStats call() throws Exception { + final GASStats total = new GASStats(); + while (!frontier().isEmpty()) { - doRound(); + final GASStats roundStats = new GASStats(); + doRound(roundStats); + + total.add(roundStats); + } if (log.isInfoEnabled()) - log.info("Done: #rounds=" + round()); + log.info("Done: " + total); - traceState(); - - return null; + traceState(getKB()); + // Done + return total; + } /** @@ -312,7 +440,7 @@ * TODO edgeState is not being traced out. */ @SuppressWarnings("unchecked") - private void traceState() { + private void traceState(final AbstractTripleStore kb) { if (!log.isTraceEnabled()) return; @@ -368,13 +496,26 @@ * pushed into the Gather. If we are doing a forward gather (in-edges), then * we are reading on OSP and will need to do a separate read on SPO. */ - public boolean doRound() { + @Override + public boolean doRound(final IGASStats stats) throws InterruptedException, + ExecutionException, Exception { + /* + * Obtain a view on the graph. + * + * Note: This will automatically advance if there has been an + * intervening commit and the caller specified ITx.READ_COMMITTED. + */ + final AbstractTripleStore kb = getKB(); + + // The size of the fontier for this round. + final int frontierSize = frontier().size(); + if (log.isInfoEnabled()) - log.info("Round=" + round + ", frontierSize=" + frontier().size() + log.info("Round=" + round + ", frontierSize=" + frontierSize + ", vertexStateSize=" + vertexState.size()); - traceState(); + traceState(kb); /* * This is the new frontier. It is initially empty. All newly discovered @@ -382,9 +523,6 @@ */ newFrontier().clear(); - final EdgesEnum gatherEdges = program.getGatherEdges(); - final EdgesEnum scatterEdges = program.getScatterEdges(); - /* * TODO This logic allows us to push down the APPLY into the GATHER or * SCATTER depending on some characteristics of the algorithm. Is this @@ -401,6 +539,8 @@ * APPLY is done before the SCATTER - this would not work if we pushed * down the APPLY into the SCATTER). */ + final EdgesEnum gatherEdges = program.getGatherEdges(); + final EdgesEnum scatterEdges = program.getScatterEdges(); final boolean pushDownApplyInGather; final boolean pushDownApplyInScatter; final boolean runApplyStage; @@ -426,56 +566,83 @@ pushDownApplyInScatter = false; runApplyStage = true; } + + /* + * GATHER + */ - gatherEdges(gatherEdges,pushDownApplyInGather); -// switch (gatherEdges) { -// case NoEdges: -// break; -// case InEdges: -// gatherEdges(true/*inEdges*/, pushDownApplyInGather); -// break; -// case OutEdges: -// gatherEdges(false/*outEdges*/, pushDownApplyInGather); -// break; -// case AllEdges: -// /* -// * TODO When doing the GATHER for both in-edges and out-edges, we -// * should submit two child GATHER tasks so those things run in -// * parallel. However, look first at how to parallelize within the -// * GATHER operation (multiple threads over key-range stripes or -// * threads racing to consume the frontier in order). -// * -// * TODO The same applies for the SCATTER stage. -// */ -// gatherEdges(true/* inEdges */, pushDownApplyInGather); -// gatherEdges(false/* outEdges */, pushDownApplyInGather); -// break; -// default: -// throw new UnsupportedOperationException(gatherEdges.name()); -// } + final long beginGather = System.nanoTime(); + + final long gatherEdgeCount = gatherEdges(kb, gatherEdges, + pushDownApplyInGather); - if(runApplyStage) { + final long elapsedGather = System.nanoTime() - beginGather; + + /* + * APPLY + */ + + final long elapsedApply; + + if (runApplyStage) { + + final long beginApply = System.nanoTime(); + apply(); + + elapsedApply = System.nanoTime() - beginApply; + + } else { + + elapsedApply = 0L; + } - scatterEdges(scatterEdges,pushDownApplyInScatter); -// switch (scatterEdges) { -// case NoEdges: -// break; -// case OutEdges: -// scatterEdges(false/*inEdges*/, pushDownApplyInScatter); -// break; -// case InEdges: -// scatterEdges(true/*inEdges*/, pushDownApplyInScatter); -// break; -// case AllEdges: -// scatterEdges(true/* inEdges */, pushDownApplyInScatter); -// scatterEdges(false/* inEdges */, pushDownApplyInScatter); -// break; -// default: -// throw new UnsupportedOperationException(scatterEdges.name()); -// } + /* + * SCATTER + */ + + final long beginScatter = System.nanoTime(); + + final long scatterEdgeCount = scatterEdges(kb, scatterEdges, + pushDownApplyInScatter); + final long elapsedScatter = System.nanoTime() - beginScatter; + + /* + * Reporting. + */ + + final long totalElapsed = elapsedGather + elapsedApply + elapsedScatter; + + final long totalEdges = scatterEdgeCount + gatherEdgeCount; + + // TODO pure interface for this. + ((GASStats) stats).add(frontierSize, totalEdges, totalElapsed); + + if (log.isInfoEnabled()) { + + log.info("\ntotal"// + + ": fontierSize=" + frontierSize // + + ", ms="+TimeUnit.NANOSECONDS.toMillis(totalElapsed)// + + ", edges=" + totalEdges// + + ", teps=" + GASUtil.getTEPS(totalEdges, totalElapsed)// + + "\ngather"// + + ": ms=" + TimeUnit.NANOSECONDS.toMillis(elapsedGather)// + + ", nedges=" + gatherEdgeCount// + + ", fanIn=" + GASUtil.fanOut(frontierSize, gatherEdgeCount)// + + ", teps=" + GASUtil.getTEPS(gatherEdgeCount, elapsedGather) // + + (runApplyStage ? ", apply=" + + TimeUnit.NANOSECONDS.toMillis(elapsedApply) : "")// + + "\nscatter"// + + ": ms=" + TimeUnit.NANOSECONDS.toMillis(elapsedScatter)// + + ", nedges=" + scatterEdgeCount // + + ", fanOut=" + GASUtil.fanOut(frontierSize, scatterEdgeCount) // + + ", teps=" + GASUtil.getTEPS(scatterEdgeCount, elapsedScatter)// + ); + + } + // Swaps old and new frontiers. round.incrementAndGet(); @@ -513,8 +680,6 @@ */ private void apply() { - final IGASContext<VS, ES, ST> ctx = getGASContext(); - // Compact, ordered frontier. No duplicates! final IV[] f = getCompactFrontier(); @@ -526,18 +691,20 @@ } - private IChunkedIterator<ISPO> getInEdges(final IV u) { - + private IChunkedIterator<ISPO> getInEdges(final AbstractTripleStore kb, + final IV u) { + // in-edges: OSP / OCSP : [u] is the Object. return kb .getSPORelation() - .getAccessPath(null/* s */, null/* p */, u/* o */, - null/* c */, edgeOnlyFilter).iterator(); + .getAccessPath(null/* s */, null/* p */, u/* o */, null/* c */, + edgeOnlyFilter).iterator(); } - - private IChunkedIterator<ISPO> getOutEdges(final IV u) { - + + private IChunkedIterator<ISPO> getOutEdges(final AbstractTripleStore kb, + final IV u) { + // out-edges: SPO / SPOC : [u] is the Subject. return kb .getSPORelation() @@ -546,19 +713,39 @@ } + /** + * Return the edges for the vertex. + * + * @param u + * The vertex. + * @param edges + * Typesafe enumeration indicating which edges should be visited. + * @return An iterator that will visit the edges for that vertex. + * + * TODO There should be a means to specify a filter on the possible + * predicates to be used for traversal. If there is a single + * predicate, then that gives us S+P bound. If there are multiple + * predicates, then we have an IElementFilter on P (in addition to + * the filter that is removing the Literals from the scan). + * + * TODO Use the chunk parallelism? Explicit for(x : chunk)? This + * could make it easier to collect the edges into an array (but that + * is not required for powergraph). + */ @SuppressWarnings("unchecked") - private IChunkedIterator<ISPO> getEdges(final IV u, final EdgesEnum edges) { + private IChunkedIterator<ISPO> getEdges(final AbstractTripleStore kb, + final IV u, final EdgesEnum edges) { switch (edges) { case NoEdges: return new EmptyChunkedIterator<ISPO>(null/* keyOrder */); case InEdges: - return getInEdges(u); + return getInEdges(kb, u); case OutEdges: - return getOutEdges(u); + return getOutEdges(kb, u); case AllEdges:{ - final IChunkedIterator<ISPO> a = getInEdges(u); - final IChunkedIterator<ISPO> b = getOutEdges(u); + final IChunkedIterator<ISPO> a = getInEdges(kb, u); + final IChunkedIterator<ISPO> b = getOutEdges(kb, u); final IChunkedIterator<ISPO> c = (IChunkedIterator<ISPO>) new ChunkedStriterator<IChunkedIterator<ISPO>, ISPO>( a).append(b); return c; @@ -570,6 +757,184 @@ } /** + * A factory for tasks that are applied to each vertex in the frontier. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + private interface VertexTaskFactory<T> { + + /** + * Return a new task that will evaluate the vertex. + * + * @param u + * The vertex to be evaluated. + * + * @return The task. + */ + Callable<T> newVertexTask(IV u); + + } + + /** + * Abstract base class for a strategy that will map a task across the + * frontier. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private abstract class AbstractFrontierStrategy implements Callable<Long> { + + final protected VertexTaskFactory<Long> taskFactory; + + AbstractFrontierStrategy(final VertexTaskFactory<Long> taskFactory) { + + this.taskFactory = taskFactory; + + } + + } + + /** + * Stategy uses the callers thread to map the task across the frontier. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class LatchedExecutorFrontierStrategy extends + AbstractFrontierStrategy { + + private final ExecutorService executorService; + private final int nparallel; + + LatchedExecutorFrontierStrategy( + final VertexTaskFactory<Long> taskFactory, + final ExecutorService executorService, final int nparallel) { + + super(taskFactory); + + this.executorService = executorService; + + this.nparallel = nparallel; + + } + + @Override + public Long call() throws Exception { + + // Compact, ordered frontier. No duplicates! + final IV[] f = getCompactFrontier(); + + final List<FutureTask<Long>> tasks = new ArrayList<FutureTask<Long>>( + f.length); + + long nedges = 0L; + + final LatchedExecutor e = new LatchedExecutor(executorService, + nparallel); + + try { + + // For all vertices in the frontier. + for (IV u : f) { + + // Future will compute scatter for vertex. + final FutureTask<Long> ft = new FutureTask<Long>( + taskFactory.newVertexTask(u)); + + // Add to set of created futures. + tasks.add(ft); + + // Enqueue future for execution. + e.execute(ft); + + } + + // Await/check futures. + for (FutureTask<Long> ft : tasks) { + + nedges += ft.get(); + + } + + } finally { + + // Ensure any error cancels all futures. + for (FutureTask<Long> ft : tasks) { + + if (ft != null) { + + // Cancel Future iff created (ArrayList has nulls). + ft.cancel(true/* mayInterruptIfRunning */); + + } + + } + + } + + return nedges; + + } + + } + + /** + * Stategy uses the callers thread to map the task across the frontier. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class RunInCallersThreadFrontierStrategy extends + AbstractFrontierStrategy { + + RunInCallersThreadFrontierStrategy( + final VertexTaskFactory<Long> taskFactory) { + + super(taskFactory); + + } + + public Long call() throws Exception { + + // Compact, ordered frontier. No duplicates! + final IV[] f = getCompactFrontier(); + + long nedges = 0L; + + // For all vertices in the frontier. + for (IV u : f) { + + nedges += taskFactory.newVertexTask(u).call(); + + } + + return nedges; + + } + + } + + /** + * Factory for the parallelism strategy that is used to map a task across + * the frontier. + * + * @param taskFactory + * The task to be mapped across the frontier. + * + * @return The strategy that will map that task across the frontier. + */ + private Callable<Long> newFrontierStrategy( + final VertexTaskFactory<Long> taskFactory) { + + if (nparallel == 1) + return new RunInCallersThreadFrontierStrategy(taskFactory); + + return new LatchedExecutorFrontierStrategy(taskFactory, + executorService, nparallel); + + } + + /** * @param inEdges * when <code>true</code> the GATHER is over the in-edges. * Otherwise it is over the out-edges. @@ -577,36 +942,131 @@ * When <code>true</code>, the APPLY() will be done during the * GATHER. * - * TODO There should be a means to specify a filter on the possible - * predicates to be used for traversal. If there is a single predicate, then - * that gives us S+P bound. If there are multiple predicates, then we have - * an IElementFilter on P (in addition to the filter that is removing the - * Literals from the scan). - * - * FIXME Striped scan using multiple threads (or implement parallel iterator - * on B+Tree). This will always do better unless all of the Vs happen to be - * in the same leaf. - * - * TODO Use the chunk parallelism? Explicit for(x : chunk)? This could make - * it easier to collect the edges into an array (but that is not required - * for powergraph). + * @throws ExecutionException + * @throws InterruptedException */ - private void scatterEdges(final EdgesEnum scatterEdges, - final boolean pushDownApply) { + private long scatterEdges(final AbstractTripleStore kb, + final EdgesEnum scatterEdges, final boolean pushDownApply) + throws InterruptedException, ExecutionException, Exception { if (scatterEdges == null) throw new IllegalArgumentException(); - // Compact, ordered frontier. No duplicates! - final IV[] f = getCompactFrontier(); + class ScatterVertexTaskFactory implements VertexTaskFactory<Long> { - final IGASContext<VS, ES, ST> ctx = getGASContext(); + public Callable<Long> newVertexTask(IV u) { - // For all vertices in the frontier. - for (IV u : f) { + return new ScatterTask(kb, u) { + @Override + protected boolean pushDownApply() { + return pushDownApply; + } - if (pushDownApply) { + @Override + protected EdgesEnum getEdgesEnum() { + return scatterEdges; + } + }; + }; + } + return newFrontierStrategy(new ScatterVertexTaskFactory()).call(); + + } + + /** + * @param gatherEdges + * The edges to be gathered. + * @param pushDownApply + * When <code>true</code>, the APPLY() will be done during the + * GATHER. + * + * @throws ExecutionException + * @throws InterruptedException + */ + private long gatherEdges(final AbstractTripleStore kb, + final EdgesEnum gatherEdges, final boolean pushDownApply) + throws InterruptedException, ExecutionException, Exception { + + if (gatherEdges == null) + throw new IllegalArgumentException(); + + class GatherVertexTaskFactory implements VertexTaskFactory<Long> { + + public Callable<Long> newVertexTask(final IV u) { + + return new GatherTask(kb, u) { + @Override + protected boolean pushDownApply() { + return pushDownApply; + } + + @Override + protected EdgesEnum getEdgesEnum() { + return gatherEdges; + } + }; + }; + } + + return newFrontierStrategy(new GatherVertexTaskFactory()).call(); + + } + + /** + * Base class for SCATTER or GATHER of edges for a vertex. + * <p> + * Note: An abstract task pattern is used to factor out parameters that are + * constants within the scope of the scatter for each vertex in the + * frontier. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + abstract private class VertexEdgesTask implements Callable<Long> { + + protected final AbstractTripleStore kb; + protected final IV u; + + public VertexEdgesTask(final AbstractTripleStore kb, final IV u) { + + this.kb = kb; + + this.u = u; + + } + + abstract protected boolean pushDownApply(); + + abstract protected EdgesEnum getEdgesEnum(); + + } + + /** + * Scatter for the edges of a single vertex. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + abstract private class ScatterTask extends VertexEdgesTask { + + public ScatterTask(final AbstractTripleStore kb, final IV u) { + + super(kb, u); + + } + + /** + * Execute the scatter for the vertex. + * + * @return The #of visited edges. + */ + public Long call() throws Exception { + + final boolean TRACE = log.isTraceEnabled(); + + if (pushDownApply()) { + /* * Run the APPLY as part of the SCATTER. * @@ -621,15 +1081,17 @@ if (!program.isChanged(ctx, u)) { // Unchanged. Do not scatter. - continue; + return 0L; } - + /* * Visit the (in|out)-edges of that vertex. */ - final IChunkedIterator<ISPO> eitr = getEdges(u, scatterEdges); + long nedges = 0L; + final IChunkedIterator<ISPO> eitr = getEdges(kb, u, getEdgesEnum()); + try { while (eitr.hasNext()) { @@ -637,7 +1099,9 @@ // edge final ISPO e = eitr.next(); - if (log.isTraceEnabled()) // TODO Batch resolve if @ TRACE + nedges++; + + if (TRACE) // TODO Batch resolve if @ TRACE log.trace("e=" + kb.toString(e)); program.scatter(ctx, u, e); @@ -650,32 +1114,33 @@ } + return nedges; + } - } // scatterOutEdges() - + } // ScatterTask + /** - * @param gatherEdges - * The edges to be gathered. - * @param pushDownApply - * When <code>true</code>, the APPLY() will be done during the - * GATHER. + * Gather for the edges of a single vertex. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> */ - private void gatherEdges(final EdgesEnum gatherEdges, final boolean pushDownApply) { + abstract private class GatherTask extends VertexEdgesTask { - if (gatherEdges == null) - throw new IllegalArgumentException(); + public GatherTask(final AbstractTripleStore kb, final IV u) { - // Compact, ordered frontier. No duplicates! - final IV[] f = getCompactFrontier(); + super(kb, u); + + } - final IGASContext<VS, ES, ST> ctx = getGASContext(); + @Override + public Long call() throws Exception { + + long nedges = 0; + + final IChunkedIterator<ISPO> eitr = getEdges(kb, u, getEdgesEnum()); - // For all vertices in the frontier. - for (IV u : f) { - - final IChunkedIterator<ISPO> eitr = getEdges(u, gatherEdges); - try { /* @@ -710,24 +1175,8 @@ } - // apply() is documented as having a possible null [sum]. -// if (first) { -// -// /* -// * No in-edges (or no out-edges, as specified) -// * -// * TODO The iterator did not visit any edges so the sum is -// * null (undefined). Should we call apply anyway in this -// * case? If so, document that it needs to handle a [null] -// * accumulant! -// */ -// -// continue; -// -// } + if (pushDownApply()) { - if (pushDownApply) { - /* * Run the APPLY as part of the GATHER. * @@ -746,10 +1195,12 @@ } + return nedges; + } - } - + } // GatherTask + /** * {@inheritDoc} * <p> @@ -767,8 +1218,6 @@ @Override public <T> T reduce(IReducer<VS, ES, ST, T> op) { - final IGASContext<VS, ES, ST> ctx = getGASContext(); - for (IV v : vertexState.keySet()) { op.visit(ctx, v); Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASStats.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASStats.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/GASStats.java 2013-08-21 18:34:53 UTC (rev 7305) @@ -0,0 +1,89 @@ +package com.bigdata.rdf.graph.impl; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.bigdata.rdf.graph.GASUtil; +import com.bigdata.rdf.graph.IGASStats; + +/** + * FIXME Refactor to a pure interface - see RuleStats. + * + * FIXME Collect the details within round statistics and then lift the + * formatting of the statistics into this class (for the details within round + * statistics). + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class GASStats implements IGASStats { + + private final AtomicLong nrounds = new AtomicLong(); + private final AtomicLong frontierSize = new AtomicLong(); + private final AtomicLong nedges = new AtomicLong(); + private final AtomicLong elapsedNanos = new AtomicLong(); + + public void add(final long frontierSize, final long nedges, + final long elapsedNanos) { + + this.nrounds.incrementAndGet(); + + this.frontierSize.addAndGet(frontierSize); + + this.nedges.addAndGet(nedges); + + this.elapsedNanos.addAndGet(elapsedNanos); + + } + + public void add(final GASStats o) { + + nrounds.addAndGet(o.getNRounds()); + + frontierSize.addAndGet(o.getFrontierSize()); + + nedges.addAndGet(o.getNEdges()); + + elapsedNanos.addAndGet(o.getElapsedNanos()); + + } + + public long getNRounds() { + return nrounds.get(); + } + + /** + * The cumulative size of the frontier across the iterations. + */ + public long getFrontierSize() { + return frontierSize.get(); + } + + /** + * The number of traversed edges across the iterations. + */ + public long getNEdges() { + return nedges.get(); + } + + /** + * The elapsed nanoseconds across the iterations. + */ + public long getElapsedNanos() { + return elapsedNanos.get(); + } + + /** + * Return a useful summary of the collected statistics. + */ + @Override + public String toString() { + + return "nrounds=" + getNRounds()// + + ": fontierSize=" + getFrontierSize() // + + ", ms=" + TimeUnit.NANOSECONDS.toMillis(getElapsedNanos())// + + ", edges=" + getNEdges()// + + ", teps=" + GASUtil.getTEPS(getNEdges(), getElapsedNanos())// + ; + } + +} Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2013-08-21 16:27:40 UTC (rev 7304) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2013-08-21 18:34:53 UTC (rev 7305) @@ -23,6 +23,7 @@ */ package com.bigdata.rdf.graph.analytics; +import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; import com.bigdata.rdf.graph.AbstractGraphTestCase; import com.bigdata.rdf.graph.IGASEngine; @@ -52,7 +53,8 @@ final SmallGraphProblem p = setupSmallGraphProblem(); final IGASEngine<BFS.VS, BFS.ES, Void> gasEngine = new GASEngine<BFS.VS, BFS.ES, Void>( - sail.getDatabase(), new BFS()); + sail.getDatabase().getIndexManager(), sail.getDatabase() + .getNamespace(), ITx.READ_COMMITTED, new BFS()); // Initialize the froniter. gasEngine.init(p.mike.getIV()); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java 2013-08-21 16:27:40 UTC (rev 7304) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java 2013-08-21 18:34:53 UTC (rev 7305) @@ -23,6 +23,7 @@ */ package com.bigdata.rdf.graph.analytics; +import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; import com.bigdata.rdf.graph.AbstractGraphTestCase; import com.bigdata.rdf.graph.IGASEngine; @@ -50,7 +51,8 @@ final SmallGraphProblem p = setupSmallGraphProblem(); final IGASEngine<SSSP.VS, SSSP.ES, Integer> gasEngine = new GASEngine<SSSP.VS, SSSP.ES, Integer>( - sail.getDatabase(), new SSSP()); + sail.getDatabase().getIndexManager(), sail.getDatabase() + .getNamespace(), ITx.READ_COMMITTED, new SSSP()); // Initialize the froniter. gasEngine.init(p.mike.getIV()); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/PerformanceTest.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/PerformanceTest.java 2013-08-21 16:27:40 UTC (rev 7304) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/PerformanceTest.java 2013-08-21 18:34:53 UTC (rev 7305) @@ -1,16 +1,26 @@ package com.bigdata.rdf.graph.impl; +import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; import java.util.Properties; import java.util.Random; import java.util.concurrent.Callable; +import org.apache.log4j.Logger; +import org.openrdf.rio.RDFFormat; + +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; import com.bigdata.rdf.graph.IGASEngine; import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.sail.BigdataSail; import com.bigdata.rdf.store.AbstractTripleStore; /** @@ -24,6 +34,8 @@ */ abstract public class PerformanceTest<VS, ES, ST> implements Callable<Void> { + private static final Logger log = Logger.getLogger(PerformanceTest.class); + private final String[] args; private final Random r; @@ -67,51 +79,240 @@ */ abstract protected IGASProgram<VS, ES, ST> newGASProgram(); - /** - * Run the test. - */ - public Void call() throws Exception { + private Properties getProperties(final String resource) throws IOException { - // The property file (for an existing Journal). - final File propertyFile = new File("RWStore.properties"); + if (log.isInfoEnabled()) + log.info("Reading properties: " + resource); - // The namespace of the KB. - final String namespace = "kb"; + InputStream is = null; + try { - final boolean quiet = false; + // try the classpath + is = getClass().getResourceAsStream(resource); - final Properties properties = new Properties(); - { - if (!quiet) - System.out.println("Reading properties: " + propertyFile); - final InputStream is = new FileInputStream(propertyFile); + if (is != null) { + + } else { + + // try file system. + final File file = new File(resource); + + if (file.exists()) { + + is = new FileInputStream(file); + + } else { + + throw new IOException("Could not locate resource: " + resource); + + } + + } + + /* + * Obtain a buffered reader on the input stream. + */ + + final Properties properties = new Properties(); + + final Reader reader = new BufferedReader(new InputStreamReader(is)); + try { - properties.load(is); + + properties.load(reader); + } finally { - if (is != null) { + + try { + + reader.close(); + + } catch (Throwable t) { + + log.error(t); + + } + + } + + return properties; + + } finally { + + if (is != null) { + + try { + is.close(); + + } catch (Throwable t) { + + log.error(t); + } + } + } + + } + + /** + * Run the test. + */ + public Void call() throws Exception { + /* + * The property file + * + * TODO Use different files for MemStore and RWStore (command line arg) + */ + final String propertyFile = "bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties"; + + final Properties properties = getProperties(propertyFile); + + final BufferMode bufferMode = BufferMode.valueOf(properties + .getProperty(Journal.Options.BUFFER_MODE, + Journal.Options.DEFAULT_BUFFER_MODE)); + + final boolean isTransient = !bufferMode.isStable(); + + final boolean isTemporary; + if (isTransient) { + + isTemporary = true; + + } else { + + final String fileStr = properties.getProperty(Journal.Options.FILE); + + if (fileStr == null) { + + /* + * We will use a temporary file that we create here. The journal + * will be destroyed below. + */ + isTemporary = true; + + final File tmpFile = File.createTempFile( + PerformanceTest.class.getSimpleName(), + Journal.Options.JNL); + + // Set this on the Properties so it will be used by the jnl. + properties.setProperty(Journal.Options.FILE, + tmpFile.getAbsolutePath()); + + } else { + + // real file is named. + isTemporary = false; + + } + + } + + // The effective KB name. + final String namespace = properties.getProperty( + BigdataSail.Options.NAMESPACE, + BigdataSail.Options.DEFAULT_NAMESPACE); + + /* + * TODO Could start NSS and use SPARQL UPDATE "LOAD" to load the data. + * That exposes the SPARQL end point for other purposes during the test. + * Is this useful? It could also let us run the GASEngine on a remote + * service (submit a callable to an HA server or define a REST API for + * submitting these GAS algorithms). + * + * TODO we need a safe pattern for when we load the data the journal is + * destroyed afterwards and when the journal is pre-existing and we + * neither load the data nor destroy the journal. This has to do with + * the effective BufferMode (if transient) and whether the file is + * specified and whether a temporary file is created (CREATE_TEMP_FILE). + * If we do our own file create if the effective buffer mode is + * non-transient, then we can get all this information. + */ final Journal jnl = new Journal(properties); try { - final long timestamp = jnl.getLastCommitTime(); + // Locate/create KB. + { + final AbstractTripleStore kb; + if (isTemporary) { - final AbstractTripleStore kb = (AbstractTripleStore) jnl - .getResourceLocator().locate(namespace, timestamp); + kb = BigdataSail.createLTS(jnl, properties); - if (kb == null) - throw new RuntimeException("No such KB: " + namespace); + } else { + final AbstractTripleStore tmp = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, + ITx.UNISOLATED); + + if (tmp == null) { + + // create. + kb = BigdataSail.createLTS(jnl, properties); + + } else { + + kb = tmp; + + } + + } + } + + /* + * Load data sets. + */ + if (isTemporary) { + + final String path = "bigdata-rdf/src/resources/data/foaf"; + final String dataFile[] = new String[] {// + path + "/data-0.nq.gz",// + path + "/data-1.nq.gz",// + path + "/data-2.nq.gz",// + path + "/data-3.nq.gz",// + }; + final String baseUrl[] = new String[dataFile.length]; + for (int i = 0; i < dataFile.length; i++) { + baseUrl[i] = "file:" + dataFile[i]; + } + final RDFFormat[] rdfFormat = new RDFFormat[] {// + RDFFormat.NQUADS,// + RDFFormat.NQUADS,// + RDFFormat.NQUADS,// + RDFFormat.NQUADS,// + }; + + // Load data using the unisolated view. + final AbstractTripleStore kb = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, ITx.UNISOLATED); + + kb.getDataLoader().loadData(dataFile, baseUrl, rdfFormat); + + } + final IGASEngine<VS, ES, ST> gasEngine = new GASEngine<VS, ES, ST>( - kb, newGASProgram()); + jnl, namespace, ITx.READ_COMMITTED, newGASProgram()); @SuppressWarnings("rawtypes") - final IV[] samples = getRandomSamples(kb); + final IV[] samples; + { + /* + * Use a read-only view (sampling depends on access to the BTree + * rather than the ReadCommittedIndex). + */ + final AbstractTripleStore kb = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, + jnl.getLastCommitTime()); + samples = getRandomSamples(kb); + + } + + final GASStats total = new GASStats(); + for (int i = 0; i < samples.length; i++) { @SuppressWarnings("rawtypes") @@ -119,10 +320,14 @@ gasEngine.init(startingVertex); - gasEngine.call(); + // TODO Pure interface for this. + total.add((GASStats)gasEngine.call()); } + // Total over all sampled vertices. + System.out.println("TOTAL: " + total); + // Done. return null; Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/RWStore.properties 2013-08-21 18:34:53 UTC (rev 7305) @@ -0,0 +1,35 @@ +# +# Note: These options are applied when the journal and the triple store are +# first created. + +## +## Journal options. +## + +# The backing file. This contains all your data. You want to put this someplace +# safe. The default locator will wind up in the directory from which you start +# your servlet container. +com.bigdata.journal.AbstractJournal.file=bigdata.jnl + +# The persistence engine. Use 'Disk' for the WORM or 'DiskRW' for the RWStore. +com.bigdata.journal.AbstractJournal.bufferMode=DiskRW +#com.bigdata.journal.AbstractJournal.bufferMode=MemStore + +com.bigdata.btree.writeRetentionQueue.capacity=4000 +com.bigdata.btree.BTree.branchingFactor=128 + +# 200M initial extent. +com.bigdata.journal.AbstractJournal.initialExtent=209715200 +com.bigdata.journal.AbstractJournal.maximumExtent=209715200 + +## +## Setup for QUADS mode without the full text index. +## +com.bigdata.rdf.sail.truthMaintenance=false +com.bigdata.rdf.store.AbstractTripleStore.quads=true +com.bigdata.rdf.store.AbstractTripleStore.statementIdentifiers=false +com.bigdata.rdf.store.AbstractTripleStore.textIndex=false +com.bigdata.rdf.store.AbstractTripleStore.axiomsClass=com.bigdata.rdf.axioms.NoAxioms +#com.bigdata.rdf.store.AbstractTripleStore.inlineDateTimes=true + +com.bigdata.rdf.rio.RDFParserOptions.stopAtFirstError=false \ No newline at end of file Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/TestGather.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/TestGather.java 2013-08-21 16:27:40 UTC (rev 7304) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/TestGather.java 2013-08-21 18:34:53 UTC (rev 7305) @@ -27,6 +27,7 @@ import java.util.LinkedHashSet; import java.util.Set; +import com.bigdata.journal.ITx; import com.bigdata.rdf.graph.AbstractGraphTestCase; import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; @@ -174,7 +175,7 @@ }; - public void testGather_inEdges() { + public void testGather_inEdges() throws Exception { final SmallGraphProblem p = setupSmallGraphProblem(); @@ -235,18 +236,21 @@ * Start on a known vertex. Do one iteration. Verify that the GATHER * populated the data structures on the mock object with the appropriate * collections. + * @throws Exception */ protected void doGatherTest(final EdgesEnum gatherEdges, - final Set<ISPO> expected, final IV startingVertex) { + final Set<ISPO> expected, final IV startingVertex) throws Exception { final IGASEngine<Set<ISPO>, Set<ISPO>, Set<ISPO>> gasEngine = new GASEngine<Set<ISPO>, Set<ISPO>, Set<ISPO>>( - sail.getDatabase(), new MockGASProgram(gatherEdges)); + sail.getDatabase().getIndexManag... [truncated message content] |
From: <tho...@us...> - 2013-08-21 16:27:47
|
Revision: 7304 http://bigdata.svn.sourceforge.net/bigdata/?rev=7304&view=rev Author: thompsonbry Date: 2013-08-21 16:27:40 +0000 (Wed, 21 Aug 2013) Log Message: ----------- Bug fix to the NQuadsParser. It was ignoring the configuration option to continue after an error. Also see #554. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/rio/nquads/NQuadsParser.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/rio/nquads/NQuadsParser.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/rio/nquads/NQuadsParser.java 2013-08-21 15:43:45 UTC (rev 7303) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/rio/nquads/NQuadsParser.java 2013-08-21 16:27:40 UTC (rev 7304) @@ -51,6 +51,7 @@ import org.semanticweb.yars.nx.parser.NxParser; import com.bigdata.rdf.internal.ILexiconConfiguration; +import com.bigdata.util.InnerCause; /** * A wrapper for an {@link NxParser} which implements the {@link RDFParser} @@ -204,6 +205,9 @@ * corresponding character sequences. */ + /* + * Note: Will throw NPE if the Literal could not be parsed. + */ final String label = lit.getUnescapedData(); final String languageTag = lit.getLanguageTag(); @@ -314,6 +318,8 @@ while (parser.hasNext()) { + try { + final Node[] nodes = parser.next(); if (nodes.length != 3 && nodes.length != 4) @@ -365,11 +371,56 @@ final Statement stmt = f.createStatement(s, p, o, c); handler.handleStatement(stmt); + + } catch(Throwable t) { + /* + * Conditionally wrap and rethrow the exception unless + * stopAtFirstError:=false. + */ + launderThrowable(t); + + // Do not stop at the first error. + log.warn("Continuing: " + t); + + } + } handler.endRDF(); } + /** + * Conditionally wrap and rethrow the exception unless + * <code>stopAtFirstError:=false</code>. + */ + private void launderThrowable(final Throwable t) { + + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + + // Do not intercept an interrupt. + throw new RuntimeException(t); + + } + + if (!getParserConfig().stopAtFirstError()) { + + log.warn("Continuing: " + t, t); + + return; + + } + + if (t instanceof RuntimeException) { + + // No need to wrap this. + throw (RuntimeException) t; + + } + + throw new RuntimeException(t); + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2013-08-21 15:43:51
|
Revision: 7303 http://bigdata.svn.sourceforge.net/bigdata/?rev=7303&view=rev Author: martyncutcher Date: 2013-08-21 15:43:45 +0000 (Wed, 21 Aug 2013) Log Message: ----------- update to use ha.IndexManagerCallable Modified Paths: -------------- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java Modified: branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-08-21 15:42:44 UTC (rev 7302) +++ branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-08-21 15:43:45 UTC (rev 7303) @@ -41,6 +41,7 @@ import com.bigdata.ha.HACommitGlue; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; +import com.bigdata.ha.IndexManagerCallable; import com.bigdata.ha.msg.IHA2PhaseCommitMessage; import com.bigdata.ha.msg.IHA2PhasePrepareMessage; import com.bigdata.ha.msg.IHANotifyReleaseTimeRequest; @@ -50,7 +51,6 @@ import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.ABC; import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.LargeLoadTask; import com.bigdata.journal.jini.ha.HAJournalTest.HAGlueTest; -import com.bigdata.journal.jini.ha.HAJournalTest.IndexManagerCallable; import com.bigdata.journal.jini.ha.HAJournalTest.SpuriousTestException; import com.bigdata.quorum.zk.ZKQuorum; import com.bigdata.quorum.zk.ZKQuorumImpl; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2013-08-21 15:42:59
|
Revision: 7302 http://bigdata.svn.sourceforge.net/bigdata/?rev=7302&view=rev Author: martyncutcher Date: 2013-08-21 15:42:44 +0000 (Wed, 21 Aug 2013) Log Message: ----------- Move submit Callable to HAGlue interface from HAGlueTest Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/HAGlue.java branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java Added Paths: ----------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/HAGlue.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/HAGlue.java 2013-08-20 18:54:28 UTC (rev 7301) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/HAGlue.java 2013-08-21 15:42:44 UTC (rev 7302) @@ -25,9 +25,11 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.Serializable; import java.rmi.Remote; import java.security.DigestException; import java.security.NoSuchAlgorithmException; +import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -46,6 +48,7 @@ import com.bigdata.ha.msg.IHASnapshotRequest; import com.bigdata.ha.msg.IHASnapshotResponse; import com.bigdata.journal.AbstractJournal; +import com.bigdata.journal.IIndexManager; import com.bigdata.journal.jini.ha.HAJournalServer; import com.bigdata.quorum.AsynchronousQuorumCloseException; import com.bigdata.quorum.QuorumException; @@ -296,4 +299,50 @@ Future<Void> rebuildFromLeader(IHARemoteRebuildRequest req) throws IOException; + + /** + * Run the caller's task on the service. + * + * @param callable + * The task to run on the service. + * @param asyncFuture + * <code>true</code> if the task will execute asynchronously + * and return a {@link Future} for the computation that may + * be used to inspect and/or cancel the computation. + * <code>false</code> if the task will execute synchronously + * and return a thick {@link Future}. + */ + public <T> Future<T> submit(IIndexManagerCallable<T> callable, + boolean asyncFuture) throws IOException; + + + public interface IIndexManagerCallable<T> extends Serializable, Callable<T> { + + /** + * Invoked before the task is executed to provide a reference to the + * {@link IIndexManager} on which it is executing. + * + * @param indexManager + * The index manager on the service. + * + * @throws IllegalArgumentException + * if the argument is <code>null</code> + * @throws IllegalStateException + * if {@link #setIndexManager(IIndexManager)} has already been + * invoked and was set with a different value. + */ + void setIndexManager(IIndexManager indexManager); + + /** + * Return the {@link IIndexManager}. + * + * @return The data service and never <code>null</code>. + * + * @throws IllegalStateException + * if {@link #setIndexManager(IIndexManager)} has not been invoked. + */ + IIndexManager getIndexManager(); + + } + } Added: branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java (rev 0) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java 2013-08-21 15:42:44 UTC (rev 7302) @@ -0,0 +1,38 @@ +package com.bigdata.ha; + +import org.apache.log4j.Logger; + +import com.bigdata.ha.HAGlue.IIndexManagerCallable; +import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.jini.ha.HAJournal; + +@SuppressWarnings("serial") +public abstract class IndexManagerCallable<T> implements IIndexManagerCallable<T> { + protected static final Logger log = Logger.getLogger(HAJournal.class); + + private transient IIndexManager indexManager; + + public IndexManagerCallable() { + + } + + public void setIndexManager(IIndexManager indexManager) { + this.indexManager = indexManager; + } + + /** + * Return the {@link IIndexManager}. + * + * @return The data service and never <code>null</code>. + * + * @throws IllegalStateException + * if {@link #setIndexManager(IIndexManager)} has not been invoked. + */ + public IIndexManager getIndexManager() { + if (indexManager == null) + throw new IllegalStateException(); + + return indexManager; + } +} + Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-08-20 18:54:28 UTC (rev 7301) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-08-21 15:42:44 UTC (rev 7302) @@ -105,6 +105,7 @@ import com.bigdata.ha.PrepareResponse; import com.bigdata.ha.QuorumService; import com.bigdata.ha.RunState; +import com.bigdata.ha.HAGlue.IIndexManagerCallable; import com.bigdata.ha.msg.HANotifyReleaseTimeResponse; import com.bigdata.ha.msg.HAReadResponse; import com.bigdata.ha.msg.HARootBlockRequest; @@ -7733,6 +7734,20 @@ } + @Override + public <T> Future<T> submit(final IIndexManagerCallable<T> callable, + final boolean asyncFuture) throws IOException { + + callable.setIndexManager(getIndexManager()); + + final Future<T> ft = getIndexManager().getExecutorService().submit( + callable); + + return getProxy(ft, asyncFuture); + + } + + }; /** Modified: branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-08-20 18:54:28 UTC (rev 7301) +++ branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-08-21 15:42:44 UTC (rev 7302) @@ -244,7 +244,7 @@ /** * The zpath of the logical service. */ - private String logicalServiceZPath = null; + protected String logicalServiceZPath = null; @Override protected void setUp() throws Exception { @@ -305,7 +305,7 @@ null/* serviceDiscoveryListener */, cacheMissTimeout); // Setup quorum client. - quorum = newQuourm(); + quorum = newQuorum(); } @@ -1076,7 +1076,7 @@ * @throws InterruptedException * @throws KeeperException */ - protected Quorum<HAGlue, QuorumClient<HAGlue>> newQuourm() + protected Quorum<HAGlue, QuorumClient<HAGlue>> newQuorum() throws ConfigurationException, InterruptedException, KeeperException { Modified: branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-08-20 18:54:28 UTC (rev 7301) +++ branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-08-21 15:42:44 UTC (rev 7302) @@ -148,65 +148,6 @@ log.warn("THREAD DUMP\n" + sb.toString()); } - public interface IIndexManagerCallable<T> extends Serializable, Callable<T> { - - /** - * Invoked before the task is executed to provide a reference to the - * {@link IIndexManager} on which it is executing. - * - * @param indexManager - * The index manager on the service. - * - * @throws IllegalArgumentException - * if the argument is <code>null</code> - * @throws IllegalStateException - * if {@link #setIndexManager(IIndexManager)} has already been - * invoked and was set with a different value. - */ - void setIndexManager(IIndexManager indexManager); - - /** - * Return the {@link IIndexManager}. - * - * @return The data service and never <code>null</code>. - * - * @throws IllegalStateException - * if {@link #setIndexManager(IIndexManager)} has not been invoked. - */ - IIndexManager getIndexManager(); - - } - - @SuppressWarnings("serial") - static public abstract class IndexManagerCallable<T> implements IIndexManagerCallable<T> { - private static final Logger log = Logger.getLogger(HAJournal.class); - - private transient IIndexManager indexManager; - - public IndexManagerCallable() { - - } - - public void setIndexManager(IIndexManager indexManager) { - this.indexManager = indexManager; - } - - /** - * Return the {@link IIndexManager}. - * - * @return The data service and never <code>null</code>. - * - * @throws IllegalStateException - * if {@link #setIndexManager(IIndexManager)} has not been invoked. - */ - public IIndexManager getIndexManager() { - if (indexManager == null) - throw new IllegalStateException(); - - return indexManager; - } - } - /** * A {@link Remote} interface for new methods published by the service. */ @@ -308,21 +249,6 @@ */ public void dumpThreads() throws IOException; - /** - * Run the caller's task on the service. - * - * @param callable - * The task to run on the service. - * @param asyncFuture - * <code>true</code> if the task will execute asynchronously - * and return a {@link Future} for the computation that may - * be used to inspect and/or cancel the computation. - * <code>false</code> if the task will execute synchronously - * and return a thick {@link Future}. - */ - public <T> Future<T> submit(IIndexManagerCallable<T> callable, - boolean asyncFuture) throws IOException; - } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |