From: <tho...@us...> - 2010-09-14 19:14:36
|
Revision: 3549 http://bigdata.svn.sourceforge.net/bigdata/?rev=3549&view=rev Author: thompsonbry Date: 2010-09-14 19:14:30 +0000 (Tue, 14 Sep 2010) Log Message: ----------- Modified LoadStats and ClosureStats to use CATs in preparation for a concurrent data loader. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/ClosureStats.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/load/SingleResourceReaderTask.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/LoadStats.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/metrics/TaskATest.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/metrics/TestMetrics.java Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/ClosureStats.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/ClosureStats.java 2010-09-14 19:13:44 UTC (rev 3548) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/ClosureStats.java 2010-09-14 19:14:30 UTC (rev 3549) @@ -24,6 +24,8 @@ */ package com.bigdata.rdf.inf; +import com.bigdata.counters.CAT; + /** * Statistics collected when performing inference. * @@ -38,13 +40,13 @@ * change in the #of statements in the database across the closure * operation. */ - public long mutationCount; + public final CAT mutationCount = new CAT(); /** * Time to compute the entailments and store them within the database * (milliseconds). */ - public long elapsed; + public final CAT elapsed = new CAT(); public ClosureStats() { @@ -55,26 +57,26 @@ * @param mutationCount * @param elapsed */ - public ClosureStats(long mutationCount,long elapsed) { + public ClosureStats(final long mutationCount,final long elapsed) { - this.mutationCount = mutationCount; + this.mutationCount.set(mutationCount); - this.elapsed = elapsed; + this.elapsed.set( elapsed); } - public synchronized void add(ClosureStats o) { + public void add(final ClosureStats o) { - this.mutationCount += o.mutationCount; + this.mutationCount.add( o.mutationCount.get()); - this.elapsed += o.elapsed; + this.elapsed.add(o.elapsed.get()); } public String toString() { - return getClass().getSimpleName() + "{mutationCount=" + mutationCount - + ", elapsed=" + elapsed + "ms}"; + return getClass().getSimpleName() + "{mutationCount=" + mutationCount.estimate_get() + + ", elapsed=" + elapsed.estimate_get() + "ms}"; } Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java 2010-09-14 19:13:44 UTC (rev 3548) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java 2010-09-14 19:14:30 UTC (rev 3549) @@ -440,7 +440,7 @@ final long elapsed = System.currentTimeMillis() - begin; - stats.elapsed += elapsed; + stats.elapsed.add(elapsed); if (INFO) log.info("Computed closure in " + elapsed + "ms"); Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/load/SingleResourceReaderTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/load/SingleResourceReaderTask.java 2010-09-14 19:13:44 UTC (rev 3548) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/load/SingleResourceReaderTask.java 2010-09-14 19:14:30 UTC (rev 3549) @@ -148,7 +148,7 @@ } // Note: IFF the task succeeds! - toldTriples.addAndGet(loadStats.toldTriples); + toldTriples.addAndGet(loadStats.toldTriples.get()); } @@ -194,9 +194,11 @@ final long now = System.currentTimeMillis(); - stats.toldTriples = nstmts; + stats.toldTriples.set(nstmts); - stats.totalTime = stats.loadTime = now - begin; + stats.totalTime.set( now - begin ); + + stats.loadTime.set( now - begin ); /* * This reports the load rate for the file, but this will only Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/LoadStats.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/LoadStats.java 2010-09-14 19:13:44 UTC (rev 3548) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/LoadStats.java 2010-09-14 19:14:30 UTC (rev 3549) @@ -24,6 +24,7 @@ */ package com.bigdata.rdf.rio; +import com.bigdata.counters.CAT; import com.bigdata.rdf.inf.ClosureStats; /** @@ -34,12 +35,12 @@ */ public class LoadStats { - public long toldTriples; - public long loadTime; - public long commitTime; - public long totalTime; + public final CAT toldTriples = new CAT(); + public final CAT loadTime = new CAT(); + public final CAT commitTime = new CAT(); + public final CAT totalTime = new CAT(); - private transient long lastReportTime = 0l; + private transient volatile long lastReportTime = 0l; /** * The internal with which this class will log on {@link System#out} in @@ -55,19 +56,19 @@ public long triplesPerSecond() { - return ((long) (((double) toldTriples) / ((double) totalTime) * 1000d)); + return ((long) (((double) toldTriples.estimate_get()) / ((double) totalTime.estimate_get()) * 1000d)); } public void add(final LoadStats stats) { - toldTriples += stats.toldTriples; + toldTriples.add(stats.toldTriples.get()); - loadTime += stats.loadTime; + loadTime.add(stats.loadTime.get()); - commitTime += stats.commitTime; + commitTime.add(stats.commitTime.get()); - totalTime += stats.totalTime; + totalTime.add(stats.totalTime.get()); if (stats.closureStats != null) { @@ -82,7 +83,7 @@ if (lastReportTime == 0L) { - if (loadTime >= REPORT_INTERVAL) { + if (loadTime.estimate_get() >= REPORT_INTERVAL) { System.out.println("loading: " + toString()); @@ -111,14 +112,14 @@ return toldTriples + " stmts added in " - + ((double) loadTime) + + ((double) loadTime.estimate_get()) / 1000d + " secs, rate= " + triplesPerSecond() + ", commitLatency=" - + commitTime + + commitTime.estimate_get() + "ms" - + (closureStats.elapsed!=0L? "\n"+closureStats.toString() : ""); + + (closureStats.elapsed.estimate_get()!=0L? "\n"+closureStats.toString() : ""); } Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2010-09-14 19:13:44 UTC (rev 3548) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2010-09-14 19:14:30 UTC (rev 3549) @@ -230,8 +230,8 @@ if (buffer != null) { - if(log.isInfoEnabled()) - log.info("Flushing the buffer."); + if(log.isDebugEnabled()) + log.debug("Flushing the buffer."); buffer.flush(); @@ -610,7 +610,7 @@ database.commit(); - totals.commitTime += System.currentTimeMillis() - beginCommit; + totals.commitTime.add(System.currentTimeMillis() - beginCommit); if (log.isInfoEnabled()) log.info("commit: latency="+totals.commitTime+"ms"); @@ -838,8 +838,8 @@ if (file.isDirectory()) { - if (log.isInfoEnabled()) - log.info("loading directory: " + file); + if (log.isDebugEnabled()) + log.debug("loading directory: " + file); // final LoadStats loadStats = new LoadStats(); @@ -1007,9 +1007,9 @@ final long nstmts = loader.getStatementsAdded(); - stats.toldTriples = nstmts; + stats.toldTriples.set( nstmts ); - stats.loadTime = System.currentTimeMillis() - begin; + stats.loadTime.set(System.currentTimeMillis() - begin); if (closureEnum == ClosureEnum.Incremental || (endOfBatch && closureEnum == ClosureEnum.Batch)) { @@ -1037,20 +1037,24 @@ database.commit(); - stats.commitTime = System.currentTimeMillis() - beginCommit; + stats.commitTime.set(System.currentTimeMillis() - beginCommit); if (log.isInfoEnabled()) log.info("commit: latency=" + stats.commitTime + "ms"); } - stats.totalTime = System.currentTimeMillis() - begin; + stats.totalTime.set(System.currentTimeMillis() - begin); + // aggregate stats + totals.add(stats); + if (log.isInfoEnabled()) { - log.info(stats.toString()); + log.info("file:: " + stats + "; totals:: " + totals); if (buffer != null && buffer.getDatabase() instanceof AbstractLocalTripleStore) { - log.info(((AbstractLocalTripleStore) buffer.getDatabase()) + if(log.isDebugEnabled()) + log.debug(((AbstractLocalTripleStore) buffer.getDatabase()) .getLocalBTreeBytesWritten(new StringBuilder()) .toString()); } @@ -1060,6 +1064,9 @@ } catch ( Exception ex ) { + // aggregate stats even for exceptions. + totals.add(stats); + /* * Note: discard anything in the buffer in case auto-flush is * disabled. This prevents the buffer from retaining data after a @@ -1096,11 +1103,11 @@ throw ex2; - } finally { +// } finally { +// +// // aggregate regardless of the outcome. +// totals.add(stats); - // aggregate regardless of the outcome. - totals.add(stats); - } } @@ -1436,8 +1443,8 @@ || (name.endsWith(".gz") && RDFFormat.forFileName(name .substring(0, name.length() - 3)) != null); - if (log.isInfoEnabled()) - log.info("dir=" + dir + ", name=" + name + " : isRDF=" + isRDF); + if (log.isDebugEnabled()) + log.debug("dir=" + dir + ", name=" + name + " : isRDF=" + isRDF); return isRDF; Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/metrics/TaskATest.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/metrics/TaskATest.java 2010-09-14 19:13:44 UTC (rev 3548) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/metrics/TaskATest.java 2010-09-14 19:14:30 UTC (rev 3549) @@ -475,23 +475,23 @@ // Explicit + (Entailments = Axioms + Inferred) - final long totalTriples = loadStats[run].toldTriples - + (closureStats!=null?closureStats.mutationCount : 0); + final long totalTriples = loadStats[run].toldTriples.get() + + (closureStats!=null?closureStats.mutationCount.get() : 0); // loadTime + closureTime + commitTime. - final long totalTime = loadStats[run].loadTime - + (closureStats != null ? closureStats.elapsed : 0) - + loadStats[run].commitTime; + final long totalTime = loadStats[run].loadTime.get() + + (closureStats != null ? closureStats.elapsed.get() : 0) + + loadStats[run].commitTime.get(); System.out.println( all_sources[ run * 3 ]+ ", " + ( errors[ run ] == null ? "Ok" +", "+loadStats[run].toldTriples - +", "+loadStats[run].loadTime/1000 - +", "+tps(loadStats[run].toldTriples,loadStats[run].loadTime) - +", "+(closureStats!=null?closureStats.mutationCount:"") - +", "+(closureStats!=null?closureStats.elapsed/1000:"") - +", "+(closureStats!=null?tps(closureStats.mutationCount,closureStats.elapsed):"") + +", "+loadStats[run].loadTime.get()/1000 + +", "+tps(loadStats[run].toldTriples.get(),loadStats[run].loadTime.get()) + +", "+(closureStats!=null?closureStats.mutationCount.get():"") + +", "+(closureStats!=null?closureStats.elapsed.get()/1000:"") + +", "+(closureStats!=null?tps(closureStats.mutationCount.get(),closureStats.elapsed.get()):"") +", "+loadStats[run].commitTime +", "+tps(totalTriples,totalTime) Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/metrics/TestMetrics.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/metrics/TestMetrics.java 2010-09-14 19:13:44 UTC (rev 3548) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/metrics/TestMetrics.java 2010-09-14 19:14:30 UTC (rev 3549) @@ -1085,7 +1085,7 @@ /* * #of explicit statements loaded. */ - toldTriples = loadStats.toldTriples; + toldTriples = loadStats.toldTriples.get(); /* * This is the elapsed time for the entire transaction in which the file @@ -1093,19 +1093,19 @@ * store, and the time required to perform the transaction commit. */ // transactionTime = System.currentTimeMillis() - begin; - transactionTime = loadStats.totalTime; + transactionTime = loadStats.totalTime.get(); /* * This is the time required to load the triples exclusive of the * startup and commit time for the transaction. */ - loadTime = loadStats.loadTime; + loadTime = loadStats.loadTime.get(); /* * A pragmatic estimate of the commit time that assumes the transaction * start time is zero. */ - commitTime = loadStats.commitTime; + commitTime = loadStats.commitTime.get(); // commitTime = transactionTime - loadTime; /* @@ -1132,7 +1132,7 @@ statementsAdded = statementCount1 - statementCount0; // inferencesAdded = inferenceCount1 - inferenceCount0; - inferencesAdded = loadStats.closureStats.mutationCount; + inferencesAdded = loadStats.closureStats.mutationCount.get(); // int explicitAdded = statementsAdded - inferencesAdded; proofsAdded = proofCount1 - proofCount0; urisAdded = uriCount1 - uriCount0; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |