From: <tho...@us...> - 2010-10-01 15:36:16
|
Revision: 3713 http://bigdata.svn.sourceforge.net/bigdata/?rev=3713&view=rev Author: thompsonbry Date: 2010-10-01 15:36:09 +0000 (Fri, 01 Oct 2010) Log Message: ----------- Added some interfaces for reporting on those aspects of a B+Tree which feed into cost estimates for query plans. Integrated the named graph decision tree logic with the B+Tree cost model in QueryEngine. I've only done this for standalone as this point. We will have to override this for scale-out to use an approximation of the mixtures of cost models for a BTree on the Journal and ~3 index segment files. That can be overridden in FederatedQueryEngine. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/BTreeCostModel.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/DiskCostModel.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/IndexSegmentCostModel.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/Rule2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractBTree.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeStatistics.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeUtilizationReport.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IBTreeStatistics.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IBTreeUtilizationReport.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-01 15:14:05 UTC (rev 3712) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -453,15 +453,32 @@ sb.append(','); sb.append(t.getClass().getSimpleName()); } - sb.append(")["); - final Integer id = (Integer) annotations.get(Annotations.BOP_ID); - if (id != null) - sb.append("Annotations.BOP_ID=" + id); - sb.append("]"); + sb.append(")"); + annotationsToString(sb); return sb.toString(); } + protected void annotationsToString(final StringBuilder sb) { + final Map<String,Object> annotations = annotations(); + if (!annotations.isEmpty()) { + sb.append("["); + boolean first = true; + for (Map.Entry<String, Object> e : annotations.entrySet()) { + if (!first) + sb.append(", "); + if (e.getValue() != null && e.getValue().getClass().isArray()) { + sb.append(e.getKey() + "=" + + Arrays.toString((Object[]) e.getValue())); + } else { + sb.append(e.getKey() + "=" + e.getValue()); + } + first = false; + } + sb.append("]"); + } + } + final public BOpEvaluationContext getEvaluationContext() { return getProperty(Annotations.EVALUATION_CONTEXT, Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-10-01 15:14:05 UTC (rev 3712) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -31,8 +31,6 @@ import java.util.Iterator; import java.util.Map; -import cern.colt.Arrays; - import com.bigdata.bop.AbstractAccessPathOp; import com.bigdata.bop.ArrayBindingSet; import com.bigdata.bop.BOp; @@ -44,8 +42,6 @@ import com.bigdata.bop.IVariable; import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.NV; -import com.bigdata.rdf.internal.IV; -import com.bigdata.rdf.spo.SPOPredicate; import com.bigdata.relation.accesspath.ElementFilter; import com.bigdata.relation.accesspath.IElementFilter; import com.bigdata.relation.rule.ISolutionExpander; @@ -553,28 +549,7 @@ } sb.append(")"); - - final Map<String,Object> annotations = annotations(); - if (!annotations.isEmpty()) { - sb.append("["); - boolean first = true; - for (Map.Entry<String, Object> e : annotations.entrySet()) { - if (!first) - sb.append(", "); - // @todo remove relation name hack when making relation name a scalar. - if (Annotations.RELATION_NAME.equals(e.getKey()) - && e.getValue() != null - && e.getValue().getClass().isArray()) { - sb.append(e.getKey() + "=" - + Arrays.toString((String[]) e.getValue())); - } else { - sb.append(e.getKey() + "=" + e.getValue()); - } - first = false; - } - sb.append("]"); - } - + annotationsToString(sb); return sb.toString(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/BTreeCostModel.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/BTreeCostModel.java 2010-10-01 15:14:05 UTC (rev 3712) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/BTreeCostModel.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -26,9 +26,13 @@ */ package com.bigdata.bop.cost; +import java.text.NumberFormat; + import com.bigdata.btree.AbstractBTree; import com.bigdata.btree.BTree; -import com.bigdata.journal.IIndexManager; +import com.bigdata.btree.BTreeUtilizationReport; +import com.bigdata.btree.IBTreeStatistics; +import com.bigdata.btree.IBTreeUtilizationReport; import com.bigdata.journal.Journal; /** @@ -52,43 +56,71 @@ */ public class BTreeCostModel { + private final DiskCostModel diskCostModel; + /** + * + * @param diskCostModel + * The cost model for the disk on which the {@link Journal} + * backing the {@link BTree} is located. + */ + public BTreeCostModel(final DiskCostModel diskCostModel) { + + if (diskCostModel == null) + throw new IllegalArgumentException(); + + this.diskCostModel = diskCostModel; + + } + + /** + * Compute the height of the B+Tree from its entry count and branching + * factor (this can also be used to find the minimum height at which there + * could exist a given range count). + */ + static public int estimateHeight(final int entryCount, + final int branchingFactor) { + + if (entryCount < branchingFactor) + return 0; + + final double logm = Math.log(branchingFactor); + + final double logn = Math.log(entryCount); + + final double h = (logm / logn) - 1; + + return (int) Math.ceil(h); + + } + + /** * Return the estimated cost of a range scan of the index. * - * @param diskCostModel - * The cost model for the disk. * @param rangeCount * The range count for the scan. - * @param btree - * The index. + * @param m + * The B+Tree branching factor. + * @param h + * The B+Tree height. + * @param leafUtilization + * The leaf utilization percentage [0:100]. * * @return The estimated cost (milliseconds). - * - * @todo how to get the right view onto the BTree without locking? or raise - * the cost model into the {@link IIndexManager}? */ - public double rangeScan(final DiskCostModel diskCostModel, - final int rangeCount, final BTree btree) { + public double rangeScan(final long rangeCount, final int m, final int h, + final int leafUtilization) { if (rangeCount == 0) return 0d; - // double height = (Math.log(branchingFactor) / Math.log(entryCount)) - - // 1; - - final int m = btree.getBranchingFactor(); - - final int entryCount = btree.getEntryCount(); - - final int height = btree.getHeight(); - // average seek time to a leaf. - final double averageSeekTime = Math.max(0, (height - 1)) + final double averageSeekTime = Math.max(0, (h - 1)) * diskCostModel.seekTime; // the percentage of the leaves which are full. // final double leafFillRate = .70d; - final double leafFillRate = ((double) btree.getUtilization()[1]) / 100; + final double leafFillRate = ((double) leafUtilization) / 100; /* * The expected #of leaves to visit for that range scan. @@ -96,7 +128,7 @@ * Note: There is an edge condition when the root leaf is empty * (fillRate is zero). */ - final double expectedLeafCount = Math.ceil((rangeCount / m) + final double expectedLeafCount = Math.ceil((((double) rangeCount) / m) * Math.min(1, (1 / leafFillRate))); /* @@ -110,4 +142,140 @@ } + /** + * Prints out some tables based on different disk cost models, branching + * factors, and range scans. To validate this, you can do a scatter plot of + * the rangeCount and cost columns and observe the log linear curve of the + * B+Tree. + * + * @param args + * ignored. + * + * @see <a + * href="src/resources/architectures/query-cost-model.xls">query-cost-model.xls</a> + */ + public static void main(String[] args) { + + final DiskCostModel[] diskCostModels = new DiskCostModel[] { DiskCostModel.DEFAULT }; + + final int[] branchingFactors = new int[] { 32, 64, 128, 256, 512, 1024 }; + + final int[] heights = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + + final int[] rangeCounts = new int[] { 1, 10, 100, 1000, 2000, 5000, 10000, 2000, 50000, 100000 }; + + final int leafUtilization = 65; // average percent "full" per leaf. + + System.out.println("seekTime\txferRate\tleafUtil\tm\theight\trangeCount\tcost(ms)"); + + final NumberFormat millisFormat = NumberFormat.getIntegerInstance(); + millisFormat.setGroupingUsed(true); + + final NumberFormat percentFormat = NumberFormat.getPercentInstance(); + percentFormat.setMinimumFractionDigits(0); + + final StringBuilder sb = new StringBuilder(); + + for (DiskCostModel diskCostModel : diskCostModels) { + + final BTreeCostModel btreeCostModel = new BTreeCostModel( + diskCostModel); + + for (int m : branchingFactors) { + + for (int h : heights) { + + for (int rangeCount : rangeCounts) { + + final int estimatedHeight = estimateHeight(rangeCount, + m); + + if (estimatedHeight > h) { + /* + * Skip range counts which are too large for the + * current B+Tree height. + */ + break; + } + + final double cost = btreeCostModel.rangeScan( + rangeCount, m, h, leafUtilization); + + sb.setLength(0); // reset. + sb.append(millisFormat.format(diskCostModel.seekTime)); + sb.append('\t'); + sb.append(millisFormat + .format(diskCostModel.transferRate)); + sb.append('\t'); + sb.append(percentFormat.format(leafUtilization / 100d)); + sb.append('\t'); + sb.append(m); + sb.append('\t'); + sb.append(h); + sb.append('\t'); + sb.append(rangeCount); + sb.append('\t'); + sb.append(millisFormat.format(cost)); + System.out.println(sb); + + } + + } + + } + + } + + } + + private static class MockBTreeStatistics implements IBTreeStatistics { + + private final int m; + + private final int entryCount; + + private final int height; + + private final int leafCount; + + private final int nodeCount; + + private final IBTreeUtilizationReport utilReport; + + public MockBTreeStatistics(final int m, final int entryCount, + final int height, final int leafCount, final int nodeCount) { + this.m = m; + this.entryCount = entryCount; + this.height = height; + this.leafCount = leafCount; + this.nodeCount = nodeCount; + this.utilReport = new BTreeUtilizationReport(this); + } + + public int getBranchingFactor() { + return m; + } + + public int getEntryCount() { + return entryCount; + } + + public int getHeight() { + return height; + } + + public int getLeafCount() { + return leafCount; + } + + public int getNodeCount() { + return nodeCount; + } + + public IBTreeUtilizationReport getUtilization() { + return utilReport; + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/DiskCostModel.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/DiskCostModel.java 2010-10-01 15:14:05 UTC (rev 3712) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/DiskCostModel.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -47,16 +47,24 @@ */ final public double seekTime; + /** + * The average disk transfer rate (megabytes per second). + */ final public double transferRate; /** * * @param seekTime + * The average disk seek time (milliseconds). * @param transferRate + * The average disk transfer rate (megabytes per second). */ public DiskCostModel(double seekTime, double transferRate) { + this.seekTime = seekTime; + this.transferRate = transferRate; + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/IndexSegmentCostModel.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/IndexSegmentCostModel.java 2010-10-01 15:14:05 UTC (rev 3712) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/IndexSegmentCostModel.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -37,15 +37,27 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * @todo */ public class IndexSegmentCostModel { + private final DiskCostModel diskCostModel; + /** * * @param diskCostModel * The disk cost model. + */ + public IndexSegmentCostModel(final DiskCostModel diskCostModel) { + + if (diskCostModel == null) + throw new IllegalArgumentException(); + + this.diskCostModel = diskCostModel; + + } + + /** + * * @param rangeCount * The range count for the index scan. * @param branchingFactor @@ -58,8 +70,7 @@ * * @return The estimated time for the range scan (milliseconds). */ - public double rangeScan(final DiskCostModel diskCostModel, - final int rangeCount, final int branchingFactor, + public double rangeScan(final int rangeCount, final int branchingFactor, final int averageBytesPerLeaf, final int xferBufferSize) { if (rangeCount == 0) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-01 15:14:05 UTC (rev 3712) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -42,12 +42,24 @@ import org.apache.log4j.Logger; import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContextBase; +import com.bigdata.bop.IBindingSet; import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.ap.Predicate; +import com.bigdata.bop.cost.BTreeCostModel; +import com.bigdata.bop.cost.DiskCostModel; +import com.bigdata.btree.AbstractBTree; import com.bigdata.btree.BTree; +import com.bigdata.btree.IBTreeStatistics; import com.bigdata.btree.IndexSegment; +import com.bigdata.btree.UnisolatedReadWriteIndex; import com.bigdata.btree.view.FusedView; import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.Journal; +import com.bigdata.journal.NoSuchIndexException; +import com.bigdata.journal.TimestampUtility; +import com.bigdata.relation.IRelation; +import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.resources.IndexManager; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.IDataService; @@ -771,4 +783,78 @@ // NOP } + /* + * Cost models. + */ + + /** + * The cost model associated with the disk on which the indices are stored. + * For a {@link Journal}, this is just the cost model of the backing disk. + * For the federation, this should be an average cost model. + * + * @todo This is not parameterized. A simple cost model is always assumed. + * The correct cost model is necessary in order to get the tradeoff + * point right for SCAN+FILTER versus SUBQUERY on SSD or RAID arrays + * with lots of spindles versus normal disk. + * + * @todo In a shared disk deployment, we might introduce one cost model for + * local SSD used to cache journals, one for local non-SSD disks used + * to cache index segments, and one for remote storage used to + * materialize historical journals and index segments for query. + * + * @todo In a federation, this should be reported out as metadata for the + * federation. Perhaps as a Jini attribute. + */ + private static final DiskCostModel diskCostModel = DiskCostModel.DEFAULT; + + /** + * Return an estimate of the cost of a scan on the predicate. + * + * @param pred + * The predicate. + * + * @return The estimated cost of a scan on that predicate. + * + * @todo This tunnels through to the {@link AbstractBTree} class and is thus + * specific to standalone and also may run into trouble once we + * support unisolated access paths for reads or mutation since it may + * encounter an {@link UnisolatedReadWriteIndex} instead of an + * {@link AbstractBTree}. + */ + public <E> double estimateCost(final BOpContextBase context, + final Predicate<E> pred) { + + final IRelation<E> r = context.getRelation(pred); + + final IAccessPath<E> ap = context.getAccessPath(r, pred); + + final long rangeCount = ap.rangeCount(false/* exact */); + + /* + * Drill through to the actual index object on the local index manager + * (standalone only). This avoids the UnisolatedReadWriteIndex class + * used to protect the index from concurrent modifications. + */ + final String name = pred.getOnlyRelationName() + "." + + pred.getKeyOrder(); + + final long timestamp = (Long) pred + .getRequiredProperty(BOp.Annotations.TIMESTAMP); + + final AbstractBTree index = (AbstractBTree) getIndexManager().getIndex( + name, timestamp); + + if (index == null) + throw new NoSuchIndexException(name + ", timestamp=" + + TimestampUtility.toString(timestamp)); + + final IBTreeStatistics stats = index.getStatistics(); + + final double cost = new BTreeCostModel(diskCostModel).rangeScan( + rangeCount, stats.getBranchingFactor(), stats.getHeight(), + stats.getUtilization().getLeafUtilization()); + + return cost; + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/Rule2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/Rule2BOpUtility.java 2010-10-01 15:14:05 UTC (rev 3712) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/Rule2BOpUtility.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -28,6 +28,7 @@ package com.bigdata.bop.engine; import java.io.Serializable; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -47,17 +48,24 @@ import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BOpUtility; import com.bigdata.bop.Constant; +import com.bigdata.bop.HashBindingSet; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IPredicate; import com.bigdata.bop.IVariable; +import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.bset.StartOp; import com.bigdata.bop.join.PipelineJoin; import com.bigdata.bop.rdf.join.DataSetJoin; import com.bigdata.bop.solutions.SliceOp; import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.internal.TermId; +import com.bigdata.rdf.internal.VTE; import com.bigdata.rdf.lexicon.LexiconRelation; import com.bigdata.rdf.model.BigdataURI; import com.bigdata.rdf.sail.BigdataEvaluationStrategyImpl; @@ -68,6 +76,7 @@ import com.bigdata.rdf.spo.NamedGraphSolutionExpander; import com.bigdata.rdf.store.AbstractTripleStore; import com.bigdata.rdf.store.IRawTripleStore; +import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.ElementFilter; import com.bigdata.relation.accesspath.IElementFilter; import com.bigdata.relation.rule.IProgram; @@ -76,6 +85,7 @@ import com.bigdata.relation.rule.eval.DefaultEvaluationPlan2; import com.bigdata.relation.rule.eval.IRangeCountFactory; import com.bigdata.relation.rule.eval.RuleState; +import com.bigdata.striterator.IKeyOrder; /** * Utility class converts {@link IRule}s to {@link BOp}s. @@ -168,16 +178,23 @@ * The estimated cost of a SCAN + FILTER approach to a default graph or * named graph query. */ - String COST_SCAN = Rule2BOpUtility.class.getName() + ".costScan"; + String COST_SCAN = Rule2BOpUtility.class.getName() + ".cost.scan"; /** * The estimated cost of a SUBQUERY approach to a default graph or named * graph query. */ String COST_SUBQUERY = Rule2BOpUtility.class.getName() - + ".costSubquery"; + + ".cost.subquery"; /** + * The #of samples used when estimating the cost of a SUBQUERY approach + * to a default graph or named graph query. + */ + String COST_SUBQUERY_SAMPLE_COUNT = Rule2BOpUtility.class.getName() + + ".cost.subquerySampleCount"; + + /** * The #of known graphs in the {@link Dataset} for a default graph or * named graph query. */ @@ -254,7 +271,14 @@ // evaluation plan order. final int[] order = plan.getOrder(); - // variables to be retained for each join. + // the #of variables in each tail of the rule. + final int[] nvars = new int[rule.getTailCount()]; + + // the index assigned to each tail of the rule. + final IKeyOrder[] keyOrder = computeKeyOrderForEachTail(rule, context, + order, nvars); + + // the variables to be retained for each join. final IVariable[][] selectVars = RuleState .computeRequiredVarsForEachTail(rule, order); @@ -304,6 +328,9 @@ // assign a bop id to the predicate Predicate<?> pred = (Predicate<?>) rule.getTail(order[i]).setBOpId( bopId++); + + // decorate the predicate with the assigned index. + pred = pred.setKeyOrder(keyOrder[order[i]]); /* * Collect all the constraints for this predicate based on which @@ -361,6 +388,9 @@ final boolean quads = pred.getProperty(Annotations.QUADS, Annotations.DEFAULT_QUADS); + // pull of the Sesame dataset before we strip the annotations. + final Dataset dataset = (Dataset) pred.getProperty(Annotations.DATASET); + // strip off annotations that we do not want to propagate. pred = pred.clearAnnotations(ANNS_TO_CLEAR_FROM_PREDICATE); @@ -380,12 +410,12 @@ switch (scope) { case NAMED_CONTEXTS: - left = namedGraphJoin(queryEngine, left, anns, pred, - cvar); + left = namedGraphJoin(queryEngine, context, left, anns, + pred, dataset, cvar); break; case DEFAULT_CONTEXTS: - left = defaultGraphJoin(queryEngine, left, anns, pred, - cvar); + left = defaultGraphJoin(queryEngine, context, left, + anns, pred, dataset, cvar); break; default: throw new AssertionError(); @@ -468,11 +498,10 @@ * @return */ private static PipelineOp namedGraphJoin(final QueryEngine queryEngine, - final PipelineOp left, final List<NV> anns, Predicate pred, + final BOpContextBase context, final PipelineOp left, + final List<NV> anns, Predicate pred, final Dataset dataset, final org.openrdf.query.algebra.Var cvar) { - final Dataset dataset = (Dataset) pred.getProperty(Annotations.DATASET); - final boolean scaleOut = queryEngine.isScaleOut(); if (scaleOut) { anns.add(new NV(Predicate.Annotations.EVALUATION_CONTEXT, @@ -482,8 +511,7 @@ BOpEvaluationContext.ANY)); } - final DataSetSummary summary = new DataSetSummary(dataset - .getNamedGraphs()); + final DataSetSummary summary = new DataSetSummary(dataset.getNamedGraphs()); anns.add(new NV(Annotations.NKNOWN, summary.nknown)); @@ -544,18 +572,41 @@ } else { /* - * Estimate cost of SCAN with C unbound) + * Estimate cost of SCAN with C unbound. */ - final double scanCost = getScanCost(pred); + final double scanCost = queryEngine.estimateCost(context, pred); anns.add(new NV(Annotations.COST_SCAN, scanCost)); /* - * Estimate cost of SUBQUERY with C bound. + * Estimate cost of SUBQUERY with C bound (sampling). + * + * @todo This should randomly sample in case there is bias in the + * order in which the URIs are presented here. However, the only + * thing which would be likely to create a strong bias is if someone + * sorted them on the IVs or if the URIs were in the same ordering + * in which their IVs were assigned AND the data were somehow + * correlated with that order. I rate the latter as pretty unlikely + * and the former is not true, so this sampling approach should be + * pretty good. + * + * @todo parameter for the #of samples to take. */ - final double subqueryCost = getSubqueryCost(pred); + double subqueryCost = 0d; + final int limit = 100; + int nsamples = 0; + for (URI uri : summary.graphs) { + if (nsamples == limit) + break; + final IV graph = ((BigdataURI) uri).getIV(); + subqueryCost += queryEngine.estimateCost(context, pred.asBound( + (IVariable) pred.get(3), new Constant(graph))); + nsamples++; + } + subqueryCost /= nsamples; anns.add(new NV(Annotations.COST_SUBQUERY, subqueryCost)); + anns.add(new NV(Annotations.COST_SUBQUERY_SAMPLE_COUNT, nsamples)); if (scanCost < subqueryCost * summary.nknown) { @@ -628,34 +679,6 @@ } /** - * - * @param pred - * @return - * - * FIXME Cost models have been implemented, but are not yet hooked in. - */ - static double getScanCost(Predicate pred) { - /* - * @todo Scan is more expensive on the Journal so this is set to ONE (1) - * and subquery is set to ZERO (0). This will get replaced by the actual - * computed costs shortly. - */ - return 1d; - } - - /** - * - * @param pred - * @return - * - * FIXME Cost models have been implemented, but are not yet hooked - * in. - */ - static double getSubqueryCost(Predicate pred) { - return 0d; - } - - /** * Generate a default graph join (quads mode). * * @param queryEngine @@ -669,7 +692,8 @@ * remote access paths with other remote access paths. */ private static PipelineOp defaultGraphJoin(final QueryEngine queryEngine, - final PipelineOp left, final List<NV> anns, final Predicate pred, + final BOpContextBase context, final PipelineOp left, + final List<NV> anns, final Predicate pred, final Dataset dataset, final org.openrdf.query.algebra.Var cvar) { // @todo decision of local vs remote ap. @@ -878,4 +902,106 @@ } // DataSetSummary + /** + * Return an array indicating the {@link IKeyOrder} that will be used when + * reading on each of the tail predicates. The array is formed using a + * private {@link IBindingSet} and propagating fake bindings to each + * predicate in turn using the given evaluation order. + * + * @param order + * The evaluation order. + * @param nvars + * The #of unbound variables for each tail predicate is assigned + * by side-effect. + * + * @return An array of the {@link IKeyOrder}s for each tail predicate. The + * array is correlated with the predicates index in the tail of the + * rule NOT its evaluation order. + */ + @SuppressWarnings("unchecked") + static private IKeyOrder[] computeKeyOrderForEachTail(final IRule rule, + final BOpContextBase context, final int[] order, final int[] nvars) { + + if (order == null) + throw new IllegalArgumentException(); + + if (order.length != rule.getTailCount()) + throw new IllegalArgumentException(); + + final int tailCount = rule.getTailCount(); + + final IKeyOrder[] a = new IKeyOrder[tailCount]; + + final IBindingSet bindingSet = new HashBindingSet(); + + for (int orderIndex = 0; orderIndex < tailCount; orderIndex++) { + + final int tailIndex = order[orderIndex]; + + final IPredicate pred = rule.getTail(tailIndex); + + final IRelation rel = context.getRelation(pred); + + final IPredicate asBound = pred.asBound(bindingSet); + + final IKeyOrder keyOrder = context.getAccessPath( + rel, asBound).getKeyOrder(); + + if (log.isDebugEnabled()) + log.debug("keyOrder=" + keyOrder + ", orderIndex=" + orderIndex + + ", tailIndex=" + orderIndex + ", pred=" + pred + + ", bindingSet=" + bindingSet + ", rule=" + rule); + + // save results. + a[tailIndex] = keyOrder; + nvars[tailIndex] = keyOrder == null ? asBound.getVariableCount() + : asBound.getVariableCount((IKeyOrder) keyOrder); + + final int arity = pred.arity(); + + for (int j = 0; j < arity; j++) { + + final IVariableOrConstant<?> t = pred.get(j); + + if (t.isVar()) { + + final Var<?> var = (Var<?>) t; + + if (log.isDebugEnabled()) { + + log.debug("Propagating binding: pred=" + pred + + ", var=" + var + ", bindingSet=" + + bindingSet); + + } + + bindingSet.set(var, fakeTermId); + + } + + } + + } + + if (log.isDebugEnabled()) { + + log.debug("keyOrder[]=" + Arrays.toString(a) + ", nvars=" + + Arrays.toString(nvars) + ", rule=" + rule); + + } + + return a; + + } + + /** + * A fake value that is propagated when we compute the {@link IKeyOrder} for + * a series of joins based on an assigned join evaluation order. + * + * @todo This has to be of the appropriate data type or we run into class + * cast exceptions. + */ + final private static transient IConstant<IV> fakeTermId = new Constant<IV>( + new TermId(VTE.URI, -1L)); + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-10-01 15:14:05 UTC (rev 3712) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -215,11 +215,27 @@ this.resourceService = resourceService; if(fed instanceof JiniFederation<?>) { - // the proxy for this query engine when used as a query controller. - this.clientProxy = (IQueryClient) ((JiniFederation<?>)fed).getProxy(this, false/*enableDGC*/); + + /* + * The proxy for this query engine when used as a query controller. + * + * + * Should the data services expose their query engine in this + * manner? + * + * @todo We need to unexport the proxy as well when the service is + * shutdown. This should follow the same pattern as DataService -> + * DataServer. E.g., a QueryEngineServer class. + */ + + this.clientProxy = (IQueryClient) ((JiniFederation<?>) fed) + .getProxy(this, false/* enableDGC */); + } else { - // E.g., an EmbeddedFederation in the test suite. + + // E.g., an EmbeddedFederation in the test suite. this.clientProxy = this; + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractBTree.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractBTree.java 2010-10-01 15:14:05 UTC (rev 3712) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractBTree.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -138,7 +138,7 @@ * @see KeyBuilder */ abstract public class AbstractBTree implements IIndex, IAutoboxBTree, - ILinearList { + ILinearList, IBTreeStatistics { /** * The index is already closed. @@ -787,8 +787,7 @@ // the % utilization in [0:1] for the whole tree (nodes + leaves). counterSet.addCounter("% utilization", new Instrument<Double>(){ protected void sample() { - final int[] tmp = getUtilization(); - setValue(tmp[2] / 100d); + setValue(getUtilization().getTotalUtilization() / 100d); } }); @@ -1617,10 +1616,11 @@ return true; } - - /** - * The branching factor for the btree. + + /* + * IBTreeStatistics */ + final public int getBranchingFactor() { return branchingFactor; @@ -1636,33 +1636,14 @@ */ final int minChildren; - /** - * The height of the btree. The height is the #of levels minus one. A btree - * with only a root leaf has <code>height := 0</code>. A btree with a - * root node and one level of leaves under it has <code>height := 1</code>. - * Note that all leaves of a btree are at the same height (this is what is - * means for the btree to be "balanced"). Also note that the height only - * changes when we split or join the root node (a btree maintains balance by - * growing and shrinking in levels from the top rather than the leaves). - */ abstract public int getHeight(); - /** - * The #of non-leaf nodes in the {@link AbstractBTree}. This is zero (0) - * for a new btree. - */ abstract public int getNodeCount(); - /** - * The #of leaf nodes in the {@link AbstractBTree}. This is one (1) for a - * new btree. - */ abstract public int getLeafCount(); /** - * The #of entries (aka values) in the {@link AbstractBTree}. This is zero - * (0) for a new B+Tree. Note that this value is tracked explicitly so it - * requires no IOs. + * {@inheritDoc} * * @todo this could be re-defined as the exact entry count if we tracked the * #of deleted index entries and subtracted that from the total #of @@ -1677,6 +1658,21 @@ abstract public int getEntryCount(); /** + * Return a statistics snapshot of the B+Tree. + */ + public IBTreeStatistics getStatistics() { + + return new BTreeStatistics(this); + + } + + public IBTreeUtilizationReport getUtilization() { + + return new BTreeUtilizationReport(this); + + } + + /** * The object responsible for (de-)serializing the nodes and leaves of the * {@link IIndex}. */ @@ -3197,47 +3193,6 @@ // abstract public ILeafCursor newLeafCursor(ILeafCursor leafCursor); /** - * Computes and returns the utilization of the tree. The utilization figures - * do not factor in the space requirements of nodes and leaves. - * - * @return An array whose elements are: - * <ul> - * <li>0 - the leaf utilization percentage [0:100]. The leaf - * utilization is computed as the #of values stored in the tree - * divided by the #of values that could be stored in the #of - * allocated leaves.</li> - * <li>1 - the node utilization percentage [0:100]. The node - * utilization is computed as the #of non-root nodes divided by the - * #of non-root nodes that could be addressed by the tree.</li> - * <li>2 - the total utilization percentage [0:100]. This is the - * average of the leaf utilization and the node utilization.</li> - * </ul> - */ - public int[] getUtilization() { - - final int nnodes = getNodeCount(); - - final int nleaves = getLeafCount(); - - final int nentries = getEntryCount(); - - final int numNonRootNodes = nnodes + nleaves - 1; - - final int branchingFactor = getBranchingFactor(); - - final int nodeUtilization = nnodes == 0 ? 100 : (100 * numNonRootNodes) - / (nnodes * branchingFactor); - - final int leafUtilization = (100 * nentries) - / (nleaves * branchingFactor); - - final int utilization = (nodeUtilization + leafUtilization) / 2; - - return new int[] { nodeUtilization, leafUtilization, utilization }; - - } - - /** * Recursive dump of the tree. * * @param out @@ -3256,7 +3211,7 @@ // True iff we will write out the node structure. final boolean info = level.toInt() <= Level.INFO.toInt(); - final int[] utils = getUtilization(); + final IBTreeUtilizationReport utils = getUtilization(); if (info) { @@ -3273,8 +3228,9 @@ out.println("height=" + height + ", branchingFactor=" + branchingFactor + ", #nodes=" + nnodes + ", #leaves=" + nleaves + ", #entries=" + nentries + ", nodeUtil=" - + utils[0] + "%, leafUtil=" + utils[1] + "%, utilization=" - + utils[2] + "%"); + + utils.getNodeUtilization() + "%, leafUtil=" + + utils.getLeafUtilization() + "%, utilization=" + + utils.getTotalUtilization() + "%"); } if (root != null) { Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeStatistics.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeStatistics.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeStatistics.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -0,0 +1,90 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 1, 2010 + */ + +package com.bigdata.btree; + +import java.io.Serializable; + +/** + * A snapshot of the B+Tree statistics. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class BTreeStatistics implements IBTreeStatistics, Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private final int m; + + private final int entryCount; + + private final int height; + + private final int leafCount; + + private final int nodeCount; + + private final IBTreeUtilizationReport utilReport; + + public BTreeStatistics(final AbstractBTree btree) { + this.m = btree.getBranchingFactor(); + this.entryCount = btree.getEntryCount(); + this.height = btree.getHeight(); + this.leafCount = btree.getLeafCount(); + this.nodeCount = btree.getNodeCount(); + this.utilReport = btree.getUtilization(); + } + + public int getBranchingFactor() { + return m; + } + + public int getEntryCount() { + return entryCount; + } + + public int getHeight() { + return height; + } + + public int getLeafCount() { + return leafCount; + } + + public int getNodeCount() { + return nodeCount; + } + + public IBTreeUtilizationReport getUtilization() { + return utilReport; + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeStatistics.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeUtilizationReport.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeUtilizationReport.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeUtilizationReport.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -0,0 +1,85 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 1, 2010 + */ + +package com.bigdata.btree; + +import java.io.Serializable; + +/** + * A btree utilization report. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class BTreeUtilizationReport implements IBTreeUtilizationReport, + Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private final int leafUtilization; + + private final int nodeUtilization; + + private final int totalUtilization; + + public BTreeUtilizationReport(final IBTreeStatistics stats) { + + final int nnodes = stats.getNodeCount(); + + final int nleaves = stats.getLeafCount(); + + final int nentries = stats.getEntryCount(); + + final int numNonRootNodes = nnodes + nleaves - 1; + + final int branchingFactor = stats.getBranchingFactor(); + + nodeUtilization = nnodes == 0 ? 100 : (100 * numNonRootNodes) + / (nnodes * branchingFactor); + + leafUtilization = (100 * nentries) / (nleaves * branchingFactor); + + totalUtilization = (nodeUtilization + leafUtilization) / 2; + + } + + public int getLeafUtilization() { + return leafUtilization; + } + + public int getNodeUtilization() { + return nodeUtilization; + } + + public int getTotalUtilization() { + return totalUtilization; + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeUtilizationReport.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IBTreeStatistics.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IBTreeStatistics.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IBTreeStatistics.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -0,0 +1,83 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 1, 2010 + */ + +package com.bigdata.btree; + +/** + * Interface used to report out some statistics about a B+Tree. These statistics + * may be used in combination with a disk cost model to predict the cost + * (latency) associated with a variety of operations on the B+Tree. All values + * reported by this interface are tracked explicitly by the + * {@link AbstractBTree} and do not require DISK IO. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface IBTreeStatistics { + + /** + * The branching factor for the btree. + */ + int getBranchingFactor(); + + /** + * The height of the btree. The height is the #of levels minus one. A btree + * with only a root leaf has <code>height := 0</code>. A btree with a + * root node and one level of leaves under it has <code>height := 1</code>. + * Note that all leaves of a btree are at the same height (this is what is + * means for the btree to be "balanced"). Also note that the height only + * changes when we split or join the root node (a btree maintains balance by + * growing and shrinking in levels from the top rather than the leaves). + */ + int getHeight(); + + /** + * The #of non-leaf nodes in the {@link AbstractBTree}. This is zero (0) + * for a new btree. + */ + int getNodeCount(); + + /** + * The #of leaf nodes in the {@link AbstractBTree}. This is one (1) for a + * new btree. + */ + int getLeafCount(); + + /** + * The #of entries (aka tuples) in the {@link AbstractBTree}. This is zero + * (0) for a new B+Tree. When the B+Tree supports delete markers, this value + * also includes tuples which have been marked as deleted. + */ + int getEntryCount(); + + /** + * Computes and returns the utilization of the tree. The utilization figures + * do not factor in the space requirements of nodes and leaves. + */ + IBTreeUtilizationReport getUtilization(); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IBTreeStatistics.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IBTreeUtilizationReport.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IBTreeUtilizationReport.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IBTreeUtilizationReport.java 2010-10-01 15:36:09 UTC (rev 3713) @@ -0,0 +1,58 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 1, 2010 + */ + +package com.bigdata.btree; + +/** + * B+Tree utilization report. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface IBTreeUtilizationReport { + + /** + * The leaf utilization percentage [0:100]. The leaf utilization is computed + * as the #of values stored in the tree divided by the #of values that could + * be stored in the #of allocated leaves. + */ + int getLeafUtilization(); + + /** + * The node utilization percentage [0:100]. The node utilization is computed + * as the #of non-root nodes divided by the #of non-root nodes that could be + * addressed by the tree. + */ + int getNodeUtilization(); + + /** + * The total utilization percentage [0:100]. This is the average of the leaf + * utilization and the node utilization. + */ + int getTotalUtilization(); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IBTreeUtilizationReport.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-17 11:09:51
|
Revision: 3803 http://bigdata.svn.sourceforge.net/bigdata/?rev=3803&view=rev Author: thompsonbry Date: 2010-10-17 11:09:45 +0000 (Sun, 17 Oct 2010) Log Message: ----------- javadoc edits (spelling errors). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java 2010-10-15 12:17:18 UTC (rev 3802) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java 2010-10-17 11:09:45 UTC (rev 3803) @@ -65,7 +65,7 @@ * global release time - the earliest commit time from which a transaction may * read. Under dire circumstances (disk shortage) the transaction manager MAY * choose to abort transactions and advance the release time in order to permit - * the release of locked resources and the reclaimation of their space on local + * the release of locked resources and the reclamation of their space on local * disk. * </p> * <h2>Centralized transaction manager service</h2> @@ -87,8 +87,8 @@ * time, rendering views of earlier states of the database unavailable. * </p> * <p> - * The transaction identifier is the transaction <em>start time</em>. The - * transaction start time is choosen from among those distinct timestamps + * The transaction identifier codes the transaction <em>start time</em>. The + * transaction start time is chosen from among those distinct timestamps * available between the specified commit time and the next commit time on the * database. Successive read-write transactions must be assigned transaction * identifiers whose absolute value is strictly increasing - this requirement is @@ -100,10 +100,10 @@ * identifiers may be directly used as commit times when reading on a local * store. Read-write transaction identifiers must have their sign bit cleared in * order to read from their ground state (the commit point corresponding to - * their transaction start time) and unmodified transaction identifier is used - * to access their mutable view (the view comprised of the write set of the + * their transaction start time) but the unmodified transaction identifier is + * used to access their mutable view (the view comprised of the write set of the * transaction super imposed on the ground state such that writes, overwrites, - * and deletes are visible to the view). + * and deletes are visible in the view). * </p> * <p> * The symbolic value {@link ITx#READ_COMMITTED} and any <code>startTime</code> @@ -257,7 +257,7 @@ * operation isolated by a read-write transaction to execute with access to * the named resources (this applies only to distributed databases). The * declared resources are used in the commit phase of the read-write tx to - * impose a partial order on commits. That partial order guarentees that + * impose a partial order on commits. That partial order guarantees that * commits do not deadlock in contention for the same resources. * * @param tx Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-10-15 12:17:18 UTC (rev 3802) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-10-17 11:09:45 UTC (rev 3803) @@ -633,7 +633,7 @@ * * @throws RuntimeException * Wrapping {@link TimeoutException} if a timeout occurs - * awaiting a start time which would satisify the request for a + * awaiting a start time which would satisfy the request for a * read-only transaction (this can occur only for read-only * transactions which must contend for start times which will * read from the appropriate historical commit point). @@ -658,7 +658,7 @@ * Higher concurrency will require relaxing constraints on atomic * state transitions governed by [lock]. Perhaps by introducing * additional locks that are more specific. I don't want to relax - * those constaints until I have a better sense of what must be + * those constraints until I have a better sense of what must be * exclusive operations. */ @@ -1115,7 +1115,7 @@ * (commitTime-1) then compute and set the new releaseTime. * <p> * Note: This method was historically part of {@link #notifyCommit(long)}. - * It was moved into its own method so it can be overriden for some unit + * It was moved into its own method so it can be overridden for some unit * tests. * * @throws IllegalMonitorStateException @@ -1203,10 +1203,10 @@ * * @throws InterruptedException * if interrupted while awaiting a start time which would - * satisify the request. + * satisfy the request. * @throws InterruptedException * if a timeout occurs while awaiting a start time which would - * satisify the request. + * satisfy the request. */ protected long assignTransactionIdentifier(final long timestamp) throws InterruptedException, TimeoutException { @@ -1217,7 +1217,7 @@ * When timestamp is ZERO (0L), this simply returns the next * distinct timestamp (with its sign bit flipped). * - * Note: This is guarenteed to be a valid start time since it is LT + * Note: This is guaranteed to be a valid start time since it is LT * the next possible commit point for the database. * * Note: When we validate, we will read from [-startTime] and the @@ -1338,7 +1338,7 @@ /* * Note: If there is no successor of the desired commit point then - * we can just return the next timestamp. It is guarenteed to be GT + * we can just return the next timestamp. It is guaranteed to be GT * the desired commit time and LT the next commit point. [Note: this * case is in fact handled above so you should not get here.] */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-01 12:56:32
|
Revision: 3853 http://bigdata.svn.sourceforge.net/bigdata/?rev=3853&view=rev Author: thompsonbry Date: 2010-11-01 12:56:24 +0000 (Mon, 01 Nov 2010) Log Message: ----------- Modified RunningQuery to support aggregation of small chunks generated by an operator before they are pass along to the target operator (or mapped across shards in scale-out). This is a performance optimization, but I have not yet validate the impact on performance (I need to move the code over to a workstation to do this). Added toString() to Predicate$HashedPredicate. Fixed NPE in QueryLog which occurred when the BOpStats were not yet available for some operator. The NPE was being trapped so it was not causing query evaluation errors. Modified RunState to access RunningQuery.bopIndex using getBOpIndex(). Javadoc edits to AccessPath to note an issue that I want to explore in more depth. Javadoc clarification and bug fix for LogUtil. It was failing to handle XML configuration files specified using -Dlog4j.configuration. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/AccessPath.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/config/LogUtil.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-11-01 12:52:00 UTC (rev 3852) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-11-01 12:56:24 UTC (rev 3853) @@ -628,6 +628,12 @@ } + public String toString() { + + return super.toString() + "{pred=" + pred + ",hash=" + hash + "}"; + + } + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-01 12:52:00 UTC (rev 3852) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-01 12:56:24 UTC (rev 3853) @@ -257,8 +257,10 @@ stats.add(t); } } else { - // Just this operator. - stats.add(statsMap.get(bopId)); + // Just this operator. + final BOpStats tmp = statsMap.get(bopId); + if (tmp != null) + stats.add(tmp); } final long unitsIn = stats.unitsIn.get(); final long unitsOut = stats.unitsOut.get(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-11-01 12:52:00 UTC (rev 3852) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-11-01 12:56:24 UTC (rev 3853) @@ -271,7 +271,7 @@ public RunState(final RunningQuery query) { this(query.getQuery(), query.getQueryId(), query.getDeadline(), - query.bopIndex); + query.getBOpIndex()); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-01 12:52:00 UTC (rev 3852) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-01 12:56:24 UTC (rev 3853) @@ -27,6 +27,7 @@ */ package com.bigdata.bop.engine; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -60,6 +61,7 @@ import com.bigdata.bop.solutions.SliceOp; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.ITx; +import com.bigdata.relation.accesspath.BufferClosedException; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.IMultiSourceAsynchronousIterator; @@ -67,6 +69,7 @@ import com.bigdata.service.IBigdataFederation; import com.bigdata.striterator.ICloseableIterator; import com.bigdata.util.concurrent.Haltable; +import com.bigdata.util.concurrent.Memoizer; /** * Metadata about running queries. @@ -149,7 +152,7 @@ * An index from the {@link BOp.Annotations#BOP_ID} to the {@link BOp}. This * index is generated by the constructor. It is immutable and thread-safe. */ - protected final Map<Integer, BOp> bopIndex; + private final Map<Integer, BOp> bopIndex; /** * The run state of the query and the result of the computation iff it @@ -267,7 +270,7 @@ /** * Flag used to prevent retriggering of {@link #lifeCycleTearDownQuery()}. */ - final AtomicBoolean didQueryTearDown = new AtomicBoolean(false); + private final AtomicBoolean didQueryTearDown = new AtomicBoolean(false); // /** // * The chunks available for immediate processing (they must have been @@ -1217,41 +1220,50 @@ } /** - * A {@link FutureTask} which exposes the {@link ChunkTask} which is being - * evaluated. + * A {@link FutureTask} which conditionally schedules another task for the + * same (bopId, partitionId) once this the wrapped {@link ChunkTask} is + * done. This is similar to the {@link Memoizer} pattern. This class + * coordinates with the {@link #operatorFutures}, which maintains a map of + * all currently running tasks which are consuming chunks. + * <p> + * The {@link ChunkTask} is wrapped by a {@link ChunkTaskWrapper} which is + * responsible for communicating the changes in the query's running state + * back to the {@link RunState} object on the query controller. */ private class ChunkFutureTask extends FutureTask<Void> { - public final ChunkTask chunkTask; + private final ChunkTask t; public ChunkFutureTask(final ChunkTask chunkTask) { -// super(chunkTask, null/* result */); + /* + * Note: wraps chunk task to communicate run state changes back to + * the query controller. + */ + super(new ChunkTaskWrapper(chunkTask), null/* result */); - // Note: wraps chunk task to ensure source and sinks get closed. - super(new ChunkTaskWrapper(chunkTask), null/* result */); - - this.chunkTask = chunkTask; - + this.t = chunkTask; + } public void run() { - final ChunkTask t = chunkTask; - - super.run(); - - /* - * This task is done executing so remove its Future before we - * attempt to schedule another task for the same - * (bopId,partitionId). - */ - final ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask> map = operatorFutures - .get(new BSBundle(t.bopId, t.partitionId)); - if (map != null) { - map.remove(this, this); - } - + super.run(); + + /* + * This task is done executing so remove its Future before we + * attempt to schedule another task for the same + * (bopId,partitionId). + */ + final ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask> map = operatorFutures + .get(new BSBundle(t.bopId, t.partitionId)); + + if (map != null) { + + map.remove(this, this); + + } + // Schedule another task if any messages are waiting. RunningQuery.this.scheduleNext(new BSBundle( t.bopId, t.partitionId)); @@ -1262,11 +1274,12 @@ /** * Wraps the {@link ChunkTask} and handles various handshaking with the - * {@link RunningQuery} and the {@link RunState}. Since starting and - * stopping a {@link ChunkTask} requires handshaking with the query - * controller, it is important that these actions take place once the task - * has been submitted - otherwise they would be synchronous in the loop - * which consumes available chunks and generates new {@link ChunkTask}s. + * {@link RunningQuery} and the {@link RunState} on the query controller. + * Since starting and stopping a {@link ChunkTask} requires handshaking with + * the query controller (and thus can require RMI), it is important that + * these actions take place once the task has been submitted - otherwise + * they would be synchronous in the loop which consumes available chunks and + * generates new {@link ChunkTask}s. */ private class ChunkTaskWrapper implements Runnable { @@ -1608,7 +1621,8 @@ // .getChunkTimeout(), // BufferAnnotations.chunkTimeoutUnit); - return new HandleChunkBuffer(sinkId, sinkMessagesOut, /*b,*/ stats); + return new HandleChunkBuffer(RunningQuery.this, bopId, sinkId, op + .getChunkCapacity(), sinkMessagesOut, stats); } @@ -1647,108 +1661,208 @@ return null; } // call() + } // class ChunkTask + + /** + * Class traps {@link #add(IBindingSet[])} to handle the IBindingSet[] + * chunks as they are generated by the running operator task, invoking + * {@link RunningQuery#handleOutputChunk(BOp, int, IBlockingBuffer)} for + * each generated chunk to synchronously emit {@link IChunkMessage}s. + * <p> + * This use of this class significantly increases the parallelism and + * throughput of selective queries. If output chunks are not "handled" + * until the {@link ChunkTask} is complete then the total latency of + * selective queries is increased dramatically. + */ + static private class HandleChunkBuffer implements + IBlockingBuffer<IBindingSet[]> { + + private final RunningQuery q; + + private final int bopId; + + private final int sinkId; + /** - * Class traps {@link #add(IBindingSet[])} to handle the IBindingSet[] - * chunks as they are generated by the running operator task, invoking - * {@link RunningQuery#handleOutputChunk(BOp, int, IBlockingBuffer)} for - * each generated chunk to synchronously emit {@link IChunkMessage}s. + * The target chunk size. When ZERO (0) chunks are output immediately as + * they are received (the internal buffer is not used). */ - private class HandleChunkBuffer implements IBlockingBuffer<IBindingSet[]> { + private final int chunkCapacity; + + private final AtomicInteger sinkMessagesOut; + + private final BOpStats stats; + + private volatile boolean open = true; + + /** + * An internal buffer which is used if chunkCapacity != ZERO. + */ + private IBindingSet[] chunk = null; + + /** + * The #of elements in the internal {@link #chunk} buffer. + */ + private int chunkSize = 0; + + /** + * + * @param q + * @param bopId + * @param sinkId + * @param chunkCapacity + * @param sinkMessagesOut + * @param stats + */ + public HandleChunkBuffer(final RunningQuery q, final int bopId, + final int sinkId, final int chunkCapacity, + final AtomicInteger sinkMessagesOut, final BOpStats stats) { + this.q = q; + this.bopId = bopId; + this.sinkId = sinkId; + this.chunkCapacity = chunkCapacity; + this.sinkMessagesOut = sinkMessagesOut; + this.stats = stats; + } + + /** + * Handle sink output, sending appropriate chunk message(s). This method + * MUST NOT block since that will deadlock the caller. + * <p> + * Note: This maps output over shards/nodes in s/o. + * <p> + * Note: This must be synchronized in case the caller is multi-threaded + * since it has a possible side effect on the internal buffer. + */ + public void add(final IBindingSet[] e) { - private final int sinkId; - private final AtomicInteger sinkMessagesOut; -// private final IBlockingBuffer<IBindingSet[]> sink; - private final BOpStats stats; - private volatile boolean open = true; - - public HandleChunkBuffer(final int sinkId, - final AtomicInteger sinkMessagesOut, -// final IBlockingBuffer<IBindingSet[]> b, - final BOpStats stats - ) { - this.sinkId = sinkId; - this.sinkMessagesOut = sinkMessagesOut; -// this.sink = b; - this.stats = stats; - } + if(!open) + throw new BufferClosedException(); - /** - * Handle sink output, sending appropriate chunk message(s). - * <p> - * Note: This maps output over shards/nodes in s/o. - */ - public void add(final IBindingSet[] e) { -// if (e.getClass().getComponentType() != null) { - stats.unitsOut.add(((Object[]) e).length); -// } else { -// stats.unitsOut.increment(); -// } - stats.chunksOut.increment(); + if (chunkCapacity != 0 && e.length < (chunkCapacity >> 1)) { /* - * FIXME Combine together when possible and final evict in - * flush(). The logic here MUST NOT block since that will - * deadlock the caller. The safest thing to do is to emit each - * chunk as it arrives, but that does not let us combine them - * together when an operator generates small chunks. + * The caller's array is significantly smaller than the target + * chunk size. Append the caller's array to the internal buffer + * and return immediately. The internal buffer will be copied + * through either in a subsequent add() or in flush(). */ -// sink.add(e); - sinkMessagesOut.addAndGet(getChunkHandler().handleChunk( - RunningQuery.this, bopId, sinkId, e)); - } + synchronized (this) { - public long flush() { - return 0L; -// return sink.flush(); - } + if (chunk == null) + chunk = new IBindingSet[chunkCapacity]; - public void abort(Throwable cause) { - open = false; - RunningQuery.this.halt(cause); -// sink.abort(cause); - } + if (chunkSize + e.length > chunk.length) { - public void close() { -// sink.close(); - open = false; - } + // flush the buffer first. + outputBufferedChunk(); - public Future getFuture() { -// return sink.getFuture(); - return null; - } + } - public boolean isEmpty() { - return true; -// return sink.isEmpty(); - } + // copy the chunk into the buffer. + System.arraycopy(e/* src */, 0/* srcPos */, + chunk/* dest */, chunkSize/* destPos */, + e.length/* length */); - public boolean isOpen() { - return open && !RunningQuery.this.isDone(); -// return sink.isOpen(); - } + chunkSize += e.length; - public IAsynchronousIterator<IBindingSet[]> iterator() { - throw new UnsupportedOperationException(); -// return sink.iterator(); - } + return; - public void reset() { -// sink.reset(); - } + } - public void setFuture(Future future) { - throw new UnsupportedOperationException(); -// sink.setFuture(future); } - public int size() { - return 0; -// return sink.size(); + // output the caller's chunk immediately. + outputChunk(e); + + } + + /** + * Output a chunk, updating the counters. + * + * @param e + * The chunk. + */ + private void outputChunk(final IBindingSet[] e) { + + stats.unitsOut.add(((Object[]) e).length); + + stats.chunksOut.increment(); + + sinkMessagesOut.addAndGet(q.getChunkHandler().handleChunk(q, bopId, + sinkId, e)); + + } + + /** + * Output the internal buffer. + */ + synchronized // Note: has side-effect on internal buffer. + private void outputBufferedChunk() { + if (chunk == null || chunkSize == 0) + return; + if (chunkSize != chunk.length) { + // truncate the array. + chunk = Arrays.copyOf(chunk, chunkSize); } + outputChunk(chunk); + chunkSize = 0; + chunk = null; + } + + synchronized // Note: possible side-effect on internal buffer. + public long flush() { + if (open) + outputBufferedChunk(); + return 0L; +// return sink.flush(); + } - } // class HandleChunkBuffer - - } // class ChunkTask + public void abort(Throwable cause) { + open = false; + q.halt(cause); +// sink.abort(cause); + } + + public void close() { +// sink.close(); + open = false; + } + + public Future getFuture() { +// return sink.getFuture(); + return null; + } + + public boolean isEmpty() { + return true; +// return sink.isEmpty(); + } + + public boolean isOpen() { + return open && !q.isDone(); +// return sink.isOpen(); + } + + public IAsynchronousIterator<IBindingSet[]> iterator() { + throw new UnsupportedOperationException(); +// return sink.iterator(); + } + + public void reset() { +// sink.reset(); + } + + public void setFuture(Future future) { + throw new UnsupportedOperationException(); +// sink.setFuture(future); + } + + public int size() { + return 0; +// return sink.size(); + } + + } // class HandleChunkBuffer // private static class BlockingBufferWithStats<E> extends BlockingBuffer<E> { // Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/AccessPath.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/AccessPath.java 2010-11-01 12:52:00 UTC (rev 3852) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/AccessPath.java 2010-11-01 12:56:24 UTC (rev 3853) @@ -1585,11 +1585,14 @@ /* * SWAG in case zero partition count is reported (I am not sure that * this code path is possible). + * + * @todo This is proven possible. Now figure out why. Maybe this is + * fromKey==toKey, in which case we can optimize that out. */ return new ScanCostReport(0L/* rangeCount */, partitionCount, 100/* millis */); - /* - * Should never be "zero" partition count. - */ +// /* +// * Should never be "zero" partition count. +// */ // throw new AssertionError(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/config/LogUtil.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/config/LogUtil.java 2010-11-01 12:52:00 UTC (rev 3852) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/config/LogUtil.java 2010-11-01 12:56:24 UTC (rev 3853) @@ -30,47 +30,84 @@ import org.apache.log4j.xml.DOMConfigurator; /** - * Utility class that provides a set of static convenience methods related - * to the initialization and configuration of the logging mechanism(s) - * employed by the components of the system. The methods of this class - * can be useful both in Jini configuration files, as well as in the - * system components themselves. + * Utility class that provides a set of static convenience methods related to + * the initialization and configuration of the logging mechanism(s) employed by + * the components of the system. The methods of this class can be useful both in + * Jini configuration files, as well as in the system components themselves. * <p> + * This class relies on the presence of either the + * <code>log4j.configuration</code> or the + * <code>log4j.primary.configuration</code> property and understands files with + * any of the following extensions {<code>.properties</code>, + * <code>.logging</code>, <code>.xml</code> . It will log a message on + * <em>stderr</em> if neither of those properties is defined. The class + * deliberately does not search the CLASSPATH for a log4j configuration in an + * effort to discourage the inadvertent use of hidden configuration files when + * deploying bigdata. + * <p> + * A watcher is setup on the log4j configuration if one is found. + * <p> * This class cannot be instantiated. */ public class LogUtil { + /** + * Examine the various log4j configuration properties and return the name of + * the log4j configuration resource if one was configured. + * + * @return The log4j configuration resource -or- <code>null</code> if the + * resource was not configured properly. + */ + static String getConfigPropertyValue() { + + final String log4jConfig = System + .getProperty("log4j.primary.configuration"); + + if (log4jConfig != null) + return log4jConfig; + + final String log4jDefaultConfig = System + .getProperty("log4j.configuration"); + + if (log4jDefaultConfig != null) + return log4jDefaultConfig; + + return null; + + } + // Static initialization block that retrieves and initializes // the log4j logger configuration for the given VM in which this // class resides. Note that this block is executed only once // during the life of the associated VM. static { - final String log4jConfig = - System.getProperty("log4j.primary.configuration"); + + final String log4jConfig = getConfigPropertyValue(); + if( log4jConfig != null && (log4jConfig.endsWith(".properties") || log4jConfig.endsWith(".logging"))) { + PropertyConfigurator.configureAndWatch(log4jConfig); + } else if ( log4jConfig != null && log4jConfig.endsWith(".xml") ) { + DOMConfigurator.configureAndWatch(log4jConfig); + } else { - final String log4jDefaultConfig = - System.getProperty("log4j.configuration"); - if (log4jDefaultConfig != null ) { - PropertyConfigurator.configureAndWatch(log4jDefaultConfig); - } else { - System.out.println - ("ERROR: could not initialize Log4J logging utility"); - System.out.println - (" set system property " - +"'-Dlog4j.configuration=" - +"file:bigdata/src/resources/logging/log4j.properties" - +"\n and/or \n" - +" set system property " - +"'-Dlog4j.primary.configuration=" - +"file:<installDir>/" - +"bigdata/src/resources/logging/log4j.properties'"); - } + + System.err.println("ERROR: " + LogUtil.class.getName() + + " : Could not initialize Log4J logging utility. " + + "Set system property " + +"'-Dlog4j.configuration=" + +"file:bigdata/src/resources/logging/log4j.properties" + +"\n and/or \n" + +" set system property " + +"'-Dlog4j.primary.configuration=" + +"file:<installDir>/" + +"bigdata/src/resources/logging/log4j.properties'"); + } + } public static Logger getLog4jLogger(String componentName) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-01 15:29:51
|
Revision: 3855 http://bigdata.svn.sourceforge.net/bigdata/?rev=3855&view=rev Author: thompsonbry Date: 2010-11-01 15:29:45 +0000 (Mon, 01 Nov 2010) Log Message: ----------- Modified RunningQuery to improve the chunk combiner logic for small chunks output by an operator task. The previous implementation was performing more allocations and array copies than were desirable. The new implementation uses a linked list to collect small chunks and does one exact fit allocation and copy when the small chunks are evicted. Adjusted the BlockingBuffer configurations to improve query performance. BufferAnnotations: int DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = 10;//trunk=1000 int DEFAULT_CHUNK_CAPACITY = 1000;//trunk=100 int DEFAULT_CHUNK_TIMEOUT = 10;//trunk=1000 AbstractResource: String DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = "10"; // was 1000 String DEFAULT_CHUNK_CAPACITY = "1000"; // was 100 String DEFAULT_CHUNK_TIMEOUT = "10"; // was 1000 [java] ### Finished testing BIGDATA_SPARQL_ENDPOINT ### [java] BIGDATA_SPARQL_ENDPOINT #trials=10 #parallel=1 [java] query Time Result# [java] query1 53 4 [java] query3 45 6 [java] query4 73 34 [java] query5 107 719 [java] query7 27 61 [java] query8 396 6463 [java] query10 21 0 [java] query11 20 0 [java] query12 26 0 [java] query13 24 0 [java] query14 3897 393730 [java] query6 4023 430114 [java] query2 978 130 [java] query9 5157 8627 [java] Total 14847 This is the best score for the quads query branch (the trunk comes in at 12748 total). The trunk still does better on Q6, Q14, and Q9 while the quads query branch does better on Q2. I lack an explanation for Q6 and Q14 since it does not appear to be related to the BlockingBuffer configuration. Q9 might be the result of not chaining the queues together and accounts for 1s of the total difference in time for this benchmark. The BSBM 100M WORM m=32 score in the trunk is: [java] QMpH: 4234.40 query mixes per hour With the chunk combiner based on the LinkedList in RunningQuery and the adjustments to the BufferAnnotations and AbstractResource options to optimize LUBM U50 query we have: [java] QMpH: 4121.80 query mixes per hour Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java 2010-11-01 15:16:44 UTC (rev 3854) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java 2010-11-01 15:29:45 UTC (rev 3855) @@ -52,7 +52,7 @@ /** * Default for {@link #CHUNK_OF_CHUNKS_CAPACITY} */ - int DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = 100;//trunk=1000 + int DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = 5;//trunk=1000 /** * Sets the capacity of the {@link IBuffer}[]s used to accumulate a chunk of @@ -81,7 +81,7 @@ * * @todo this is probably much larger than we want. Try 10ms. */ - int DEFAULT_CHUNK_TIMEOUT = 20;//trunk=1000 + int DEFAULT_CHUNK_TIMEOUT = 10;//trunk=1000 /** * The {@link TimeUnit}s in which the {@link #CHUNK_TIMEOUT} is measured. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-01 15:16:44 UTC (rev 3854) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-01 15:29:45 UTC (rev 3855) @@ -1695,10 +1695,11 @@ private volatile boolean open = true; - /** - * An internal buffer which is used if chunkCapacity != ZERO. - */ - private IBindingSet[] chunk = null; +// /** +// * An internal buffer which is used if chunkCapacity != ZERO. +// */ +// private IBindingSet[] chunk = null; + private List<IBindingSet[]> smallChunks = null; /** * The #of elements in the internal {@link #chunk} buffer. @@ -1739,38 +1740,66 @@ if(!open) throw new BufferClosedException(); - if (chunkCapacity != 0 && e.length < (chunkCapacity >> 1)) { - /* - * The caller's array is significantly smaller than the target - * chunk size. Append the caller's array to the internal buffer - * and return immediately. The internal buffer will be copied - * through either in a subsequent add() or in flush(). - */ - synchronized (this) { +// if (chunkCapacity != 0 && e.length < (chunkCapacity >> 1)) { +// /* +// * The caller's array is significantly smaller than the target +// * chunk size. Append the caller's array to the internal buffer +// * and return immediately. The internal buffer will be copied +// * through either in a subsequent add() or in flush(). +// */ +// synchronized (this) { +// +// if (chunk == null) +// chunk = new IBindingSet[chunkCapacity]; +// +// if (chunkSize + e.length > chunkCapacity) { +// +// // flush the buffer first. +// outputBufferedChunk(); +// +// } +// +// // copy the chunk into the buffer. +// System.arraycopy(e/* src */, 0/* srcPos */, +// chunk/* dest */, chunkSize/* destPos */, +// e.length/* length */); +// +// chunkSize += e.length; +// +// return; +// +// } +// +// } - if (chunk == null) - chunk = new IBindingSet[chunkCapacity]; + if (chunkCapacity != 0 && e.length < (chunkCapacity >> 1)) { + /* + * The caller's array is significantly smaller than the target + * chunk size. Append the caller's array to the internal list + * and return immediately. The buffered chunks will be copied + * through either in a subsequent add() or in flush(). + */ + synchronized (this) { - if (chunkSize + e.length > chunk.length) { + if (smallChunks == null) + smallChunks = new LinkedList<IBindingSet[]>(); - // flush the buffer first. - outputBufferedChunk(); + if (chunkSize + e.length > chunkCapacity) { - } + // flush the buffer first. + outputBufferedChunk(); - // copy the chunk into the buffer. - System.arraycopy(e/* src */, 0/* srcPos */, - chunk/* dest */, chunkSize/* destPos */, - e.length/* length */); + } + + smallChunks.add(e); - chunkSize += e.length; + chunkSize += e.length; - return; + return; - } + } + } - } - // output the caller's chunk immediately. outputChunk(e); @@ -1798,15 +1827,36 @@ */ synchronized // Note: has side-effect on internal buffer. private void outputBufferedChunk() { - if (chunk == null || chunkSize == 0) +// if (chunk == null || chunkSize == 0) +// return; +// if (chunkSize != chunk.length) { +// // truncate the array. +// chunk = Arrays.copyOf(chunk, chunkSize); +// } +// outputChunk(chunk); +// chunkSize = 0; +// chunk = null; + if (smallChunks == null || chunkSize == 0) return; - if (chunkSize != chunk.length) { - // truncate the array. - chunk = Arrays.copyOf(chunk, chunkSize); - } + if (smallChunks.size() == 1) { + // directly output a single small chunk. + outputChunk(smallChunks.get(0)); + chunkSize = 0; + smallChunks = null; + return; + } + // exact fit buffer. + final IBindingSet[] chunk = new IBindingSet[chunkSize]; + // copy the small chunks into the buffer. + int destPos = 0; + for (IBindingSet[] e : smallChunks) { + System.arraycopy(e/* src */, 0/* srcPos */, chunk/* dest */, + destPos, e.length/* length */); + destPos += e.length; + } outputChunk(chunk); chunkSize = 0; - chunk = null; + smallChunks = null; } synchronized // Note: possible side-effect on internal buffer. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java 2010-11-01 15:16:44 UTC (rev 3854) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java 2010-11-01 15:29:45 UTC (rev 3855) @@ -237,7 +237,7 @@ * Default for {@link #CHUNK_OF_CHUNKS_CAPACITY} * @deprecated by {@link BOp} annotations. */ - String DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = "1000"; + String DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = "10"; // was 1000 /** * <p> @@ -260,7 +260,7 @@ * * @deprecated by {@link BOp} annotations. */ - String DEFAULT_CHUNK_CAPACITY = "100"; + String DEFAULT_CHUNK_CAPACITY = "1000"; // was 100 /** * The timeout in milliseconds that the {@link BlockingBuffer} will wait @@ -278,7 +278,7 @@ * @todo this is probably much larger than we want. Try 10ms. * @deprecated by {@link BOp} annotations. */ - String DEFAULT_CHUNK_TIMEOUT = "1000"; + String DEFAULT_CHUNK_TIMEOUT = "10"; // was 1000 /** * If the estimated rangeCount for an This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-25 21:18:07
|
Revision: 4250 http://bigdata.svn.sourceforge.net/bigdata/?rev=4250&view=rev Author: thompsonbry Date: 2011-02-25 21:18:01 +0000 (Fri, 25 Feb 2011) Log Message: ----------- Modified Haltable#halted() to use the message "Halted" in the thrown RuntimeException as this is a normal halt. Modified PipelineJoin to log normal termination @ DEBUG. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-02-24 23:15:17 UTC (rev 4249) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-02-25 21:18:01 UTC (rev 4250) @@ -592,6 +592,9 @@ /** * An optional limit on the #of solutions to be produced. The limit is * ignored if it is {@link Long#MAX_VALUE}. + * <p> + * Note: Invoking {@link #halt(Object)} is necessary to enforce the + * limit. * * @see Annotations#LIMIT */ @@ -1094,8 +1097,10 @@ throw new RuntimeException("Halting join: " + t, t); } // normal termination - ignore exception. - log.warn("Caught and ignored exception: "+t); return null; - + if (log.isDebugEnabled()) + log.debug("Caught and ignored exception: " + t); + return null; + } } @@ -1632,7 +1637,7 @@ log.info("Breaking query @ limit: limit=" + limit + ", exactOutputCount=" + exactOutputCount.get()); -// halt((Void) null); + halt((Void) null); return null; } @@ -1677,10 +1682,14 @@ while (itr.hasNext()) { + halted(); + final Object[] chunk = itr.nextChunk(); - stats.accessPathChunksIn.increment(); + stats.accessPathChunksIn.increment(); +// System.err.println("#chunks="+stats.accessPathChunksIn+", chunkSize="+chunk.length); + // process the chunk in the caller's thread. new ChunkTask(bindingSets, naccepted, unsyncBuffer, chunk).call(); @@ -1727,7 +1736,7 @@ log.info("Breaking query @ limit: limit=" + limit + ", exactOutputCount=" + exactOutputCount.get()); -// halt((Void) null); + halt((Void) null); break; } @@ -1947,7 +1956,7 @@ log.info("Breaking query @ limit: limit=" + limit + ", exactOutputCount=" + exactOutputCount.get()); -// halt((Void) null); + halt((Void) null); break; } @@ -2145,7 +2154,7 @@ log.info("Breaking query @ limit: limit=" + limit + ", exactOutputCount=" + exactOutputCount.get()); -// halt((Void) null); + halt((Void) null); break; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java 2011-02-24 23:15:17 UTC (rev 4249) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java 2011-02-25 21:18:01 UTC (rev 4250) @@ -205,7 +205,7 @@ * running (since it invoked halted() it must be running). Since * it is running, */ - throw new RuntimeException(); + throw new RuntimeException("Halted"); } throw new RuntimeException(firstCause); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-03-08 15:07:05
|
Revision: 4280 http://bigdata.svn.sourceforge.net/bigdata/?rev=4280&view=rev Author: thompsonbry Date: 2011-03-08 15:06:58 +0000 (Tue, 08 Mar 2011) Log Message: ----------- Integrated FutureTaskMon into several places. It provides a means of identifying callers who might have interrupted a FutureTask when the appropriate log level is set. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -50,6 +50,7 @@ import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.fed.FederatedRunningQuery; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; import com.bigdata.relation.accesspath.BufferClosedException; @@ -593,7 +594,7 @@ * responsible for communicating the changes in the query's running state * back to the {@link RunState} object on the query controller. */ - private class ChunkFutureTask extends FutureTask<Void> { + private class ChunkFutureTask extends FutureTaskMon<Void> { private final ChunkTask t; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -58,6 +58,7 @@ import com.bigdata.btree.BTree; import com.bigdata.btree.IndexSegment; import com.bigdata.btree.view.FusedView; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.counters.CAT; import com.bigdata.counters.CounterSet; import com.bigdata.counters.Instrument; @@ -614,7 +615,7 @@ */ public void init() { - final FutureTask<Void> ft = new FutureTask<Void>(new QueryEngineTask( + final FutureTask<Void> ft = new FutureTaskMon<Void>(new QueryEngineTask( priorityQueue), (Void) null); if (engineFuture.compareAndSet(null/* expect */, ft)) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -44,6 +44,7 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -345,7 +346,7 @@ final OperatorTask opTask = new OperatorTask(bopId, src); // Wrap task with error handling and handshaking logic. - final FutureTask<Void> ft = new FutureTask<Void>( + final FutureTask<Void> ft = new FutureTaskMon<Void>( new OperatorTaskWrapper(opTask), null/* result */); if (operatorFutures.putIfAbsent(bopId, ft) != null) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -60,6 +60,7 @@ import com.bigdata.bop.engine.BOpStats; import com.bigdata.btree.BytesUtil; import com.bigdata.btree.keys.IKeyBuilder; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.counters.CAT; import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer; @@ -1411,7 +1412,7 @@ for (AccessPathTask task : tasks) { - final FutureTask<Void> ft = new FutureTask<Void>(task); + final FutureTask<Void> ft = new FutureTaskMon<Void>(task); futureTasks.add(ft); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -42,6 +42,7 @@ static private final transient Logger log = Logger .getLogger(FutureTaskMon.class); + private volatile boolean didStart = false; public FutureTaskMon(Callable<T> callable) { super(callable); @@ -50,17 +51,41 @@ public FutureTaskMon(Runnable runnable, T result) { super(runnable, result); } + + /** + * {@inheritDoc} + * <p> + * Hooked to notice when the task has been started. + */ + @Override + public void run() { + didStart = true; + super.run(); + } - public boolean cancel(boolean mayInterruptIfRunning) { - if (mayInterruptIfRunning && log.isDebugEnabled()) { + /** + * {@inheritDoc} + * <p> + * Overridden to conditionally log @ DEBUG if the caller caused the task to + * be interrupted. This can be used to search for sources of interrupts. + */ + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + + final boolean didStart = this.didStart; + + final boolean ret = super.cancel(mayInterruptIfRunning); + + if (didStart && mayInterruptIfRunning && ret && log.isDebugEnabled()) { try { throw new RuntimeException("cancel call trace"); } catch (RuntimeException re) { log.debug("May interrupt running task", re); } } - - return super.cancel(mayInterruptIfRunning); + + return ret; + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManager.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManager.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManager.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -27,10 +27,8 @@ package com.bigdata.concurrent; -import static com.bigdata.concurrent.NonBlockingLockManager.RunState.Halted; import static com.bigdata.concurrent.NonBlockingLockManager.RunState.Running; import static com.bigdata.concurrent.NonBlockingLockManager.RunState.Shutdown; -import static com.bigdata.concurrent.NonBlockingLockManager.RunState.ShutdownNow; import static com.bigdata.concurrent.NonBlockingLockManager.RunState.Starting; import java.lang.ref.WeakReference; @@ -769,7 +767,7 @@ * @param <T> * The generic type of the outcome for the {@link Future}. */ - protected class LockFutureTask<T> extends FutureTask<T> { + protected class LockFutureTask<T> extends FutureTaskMon<T> { private final R[] resource; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -1422,7 +1422,7 @@ * @param <T> * The generic type of the outcome for the {@link Future}. */ - static public class LockFutureTask<R extends Comparable<R>, T> extends FutureTask<T> { + static public class LockFutureTask<R extends Comparable<R>, T> extends FutureTaskMon<T> { /** * The instance of the outer class. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2011-03-08 13:34:32 UTC (rev 4279) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2011-03-08 15:06:58 UTC (rev 4280) @@ -63,6 +63,7 @@ import com.bigdata.cache.ConcurrentWeakValueCache; import com.bigdata.cache.ConcurrentWeakValueCacheWithTimeout; import com.bigdata.cache.HardReferenceQueue; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.config.Configuration; import com.bigdata.config.IValidator; import com.bigdata.config.IntegerRangeValidator; @@ -3764,7 +3765,7 @@ // true the token is valid and this service is the quorum leader final boolean isLeader = quorum.getMember().isLeader(prepareToken); - final FutureTask<Boolean> ft = new FutureTask<Boolean>(new Runnable() { + final FutureTask<Boolean> ft = new FutureTaskMon<Boolean>(new Runnable() { public void run() { @@ -3842,7 +3843,7 @@ public Future<Void> commit2Phase(final long commitTime) { - final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() { + final FutureTask<Void> ft = new FutureTaskMon<Void>(new Runnable() { public void run() { @@ -3896,7 +3897,7 @@ public Future<Void> abort2Phase(final long token) { - final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() { + final FutureTask<Void> ft = new FutureTaskMon<Void>(new Runnable() { public void run() { getQuorum().assertQuorum(token); @@ -4011,7 +4012,7 @@ /** NOP. */ public Future<Void> bounceZookeeperConnection() { - final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() { + final FutureTask<Void> ft = new FutureTaskMon<Void>(new Runnable() { public void run() { } }, null); @@ -4023,7 +4024,7 @@ * Does pipeline remove/add. */ public Future<Void> moveToEndOfPipeline() { - final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() { + final FutureTask<Void> ft = new FutureTaskMon<Void>(new Runnable() { public void run() { final QuorumActor<?, ?> actor = quorum.getActor(); actor.pipelineRemove(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-03-08 15:12:50
|
Revision: 4281 http://bigdata.svn.sourceforge.net/bigdata/?rev=4281&view=rev Author: thompsonbry Date: 2011-03-08 15:12:43 +0000 (Tue, 08 Mar 2011) Log Message: ----------- Modified RWStrategy to explicitly pass in the offset and length of the data to be read from the RWStore. This is in preparation for examining ways of recycling the byte[] buffers in coordination with the B+Tree to reduce heap pressure. Modified RWStore to use a double-checked locking pattern in the re-opener such that it does not always synchronize before a read or write operation on the backing channel. This removes some sources of false contention during query. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2011-03-08 15:06:58 UTC (rev 4280) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2011-03-08 15:12:43 UTC (rev 4281) @@ -612,7 +612,7 @@ */ final byte buf[] = new byte[sze + 4]; // 4 bytes for checksum - m_store.getData(rwaddr, buf); + m_store.getData(rwaddr, buf, 0, sze+4); return ByteBuffer.wrap(buf, 0, sze); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-03-08 15:06:58 UTC (rev 4280) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-03-08 15:12:43 UTC (rev 4281) @@ -3137,35 +3137,55 @@ } - synchronized public FileChannel reopenChannel() throws IOException { + public FileChannel reopenChannel() throws IOException { - if (raf != null && raf.getChannel().isOpen()) { + /* + * Note: This is basically a double-checked locking pattern. It is + * used to avoid synchronizing when the backing channel is already + * open. + */ + { + final RandomAccessFile tmp = raf; + if (tmp != null) { + final FileChannel channel = tmp.getChannel(); + if (channel.isOpen()) { + /* + * The channel is still open. If you are allowing + * concurrent reads on the channel, then this could + * indicate that two readers each found the channel + * closed and that one was able to re-open the channel + * before the other such that the channel was open again + * by the time the 2nd reader got here. + */ + return channel; + } + } + } + + synchronized(this) { - /* - * The channel is still open. If you are allowing concurrent - * reads on the channel, then this could indicate that two - * readers each found the channel closed and that one was able - * to re-open the channel before the other such that the channel - * was open again by the time the 2nd reader got here. - */ + if (raf != null) { + final FileChannel channel = raf.getChannel(); + if (channel.isOpen()) { + return channel; + } + } - return raf.getChannel(); + // open the file. + this.raf = new RandomAccessFile(file, mode); - } + // Update counters. + final StoreCounters<?> c = (StoreCounters<?>) storeCounters + .get().acquire(); + try { + c.nreopen++; + } finally { + c.release(); + } - // open the file. - this.raf = new RandomAccessFile(file, mode); + return raf.getChannel(); - // Update counters. - final StoreCounters<?> c = (StoreCounters<?>) storeCounters.get() - .acquire(); - try { - c.nreopen++; - } finally { - c.release(); - } - - return raf.getChannel(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-03-08 15:15:09
|
Revision: 4282 http://bigdata.svn.sourceforge.net/bigdata/?rev=4282&view=rev Author: thompsonbry Date: 2011-03-08 15:15:03 +0000 (Tue, 08 Mar 2011) Log Message: ----------- Javadoc edit to RWStore's reopen. A Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2011-03-08 15:12:43 UTC (rev 4281) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2011-03-08 15:15:03 UTC (rev 4282) @@ -1426,8 +1426,24 @@ */ private FileChannel reopenChannel() throws IOException { - synchronized (opener) { + /* + * Note: This is basically a double-checked locking pattern. It is + * used to avoid synchronizing when the backing channel is already + * open. + */ + { + final RandomAccessFile tmp = raf; + if (tmp != null) { + final FileChannel channel = tmp.getChannel(); + if (channel.isOpen()) { + // The channel is still open. + return channel; + } + } + } + synchronized (opener) { + assertOpen(); if (raf != null && raf.getChannel().isOpen()) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-03-08 15:12:43 UTC (rev 4281) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-03-08 15:15:03 UTC (rev 4282) @@ -3149,14 +3149,7 @@ if (tmp != null) { final FileChannel channel = tmp.getChannel(); if (channel.isOpen()) { - /* - * The channel is still open. If you are allowing - * concurrent reads on the channel, then this could - * indicate that two readers each found the channel - * closed and that one was able to re-open the channel - * before the other such that the channel was open again - * by the time the 2nd reader got here. - */ + // The channel is still open. return channel; } } @@ -3167,6 +3160,14 @@ if (raf != null) { final FileChannel channel = raf.getChannel(); if (channel.isOpen()) { + /* + * The channel is still open. If you are allowing + * concurrent reads on the channel, then this could + * indicate that two readers each found the channel + * closed and that one was able to re-open the channel + * before the other such that the channel was open again + * by the time the 2nd reader got here. + */ return channel; } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-03-14 18:17:38
|
Revision: 4298 http://bigdata.svn.sourceforge.net/bigdata/?rev=4298&view=rev Author: thompsonbry Date: 2011-03-14 18:17:32 +0000 (Mon, 14 Mar 2011) Log Message: ----------- Added an option (com.bigdata.journal.Options.INGORE_BAD_ROOT_BLOCK) in the QUADS_QUERY_BRANCH which may be used to permit a Journal with ONE (1) bad root block to be opened using the other root block. The ALTERNATE_ROOT_BLOCK option is only permitted when both root blocks are valid, but there is a reason to revert to the previous root block anyway. The IGNORE_BAD_ROOT_BLOCK option handles the case where one of the two root blocks is invalid. Note that the Journal now logs its root blocks, so it is possible (with code) to choose a historical root block to be restored. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/Options.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2011-03-14 17:49:49 UTC (rev 4297) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2011-03-14 18:17:32 UTC (rev 4298) @@ -831,54 +831,58 @@ } - /** - * This constructor handles cases where the file exists and is non-empty. - * - * @param file - * The name of the file to be opened. - * @param useDirectBuffers - * true if a buffer should be allocated using - * {@link ByteBuffer#allocateDirect(int)} rather than - * {@link ByteBuffer#allocate(int)}. This has no effect for the - * {@link BufferMode#Disk} and {@link BufferMode#Mapped} modes. - * @param readOnly - * When true, the file is opened in a read-only mode and it is an - * error if the file does not exist. - * @param forceWrites - * When true, the file is opened in "rwd" mode and individual IOs - * are forced to disk. This option SHOULD be false since we only - * need to write through to disk on commit, not on each IO. - * @param writeCacheEnabled - * When <code>true</code>, the {@link DiskOnlyStrategy} will - * allocate a direct {@link ByteBuffer} from the - * {@link DirectBufferPool} to service as a write cache. - * @param writeCacheBufferCount - * The #of buffers to allocate for the {@link WriteCacheService}. - * @param validateChecksum - * When <code>true</code>, the checksum stored in the root blocks - * of an existing file will be validated when the file is opened. - * See {@link Options#VALIDATE_CHECKSUM}. - * @param alternateRootBlock - * When <code>true</code> the prior root block will be used. This - * option may be used when a commit record is valid but the data - * associated with the commit point is invalid. There are two - * root blocks. Normally the one which has been most recently - * written will be loaded on restart. When this option is - * specified, the older of the two root blocks will be loaded - * instead. <strong>If you use this option and then do a commit - * then the more recent of the root blocks will be lost and any - * data associated with that commit point will be lost as - * well!</strong> - * - * @throws RuntimeException - * if there is a problem preparing the file for use by the - * journal. - */ + /** + * This constructor handles cases where the file exists and is non-empty. + * + * @param file + * The name of the file to be opened. + * @param useDirectBuffers + * true if a buffer should be allocated using + * {@link ByteBuffer#allocateDirect(int)} rather than + * {@link ByteBuffer#allocate(int)}. This has no effect for the + * {@link BufferMode#Disk} and {@link BufferMode#Mapped} modes. + * @param readOnly + * When true, the file is opened in a read-only mode and it is an + * error if the file does not exist. + * @param forceWrites + * When true, the file is opened in "rwd" mode and individual IOs + * are forced to disk. This option SHOULD be false since we only + * need to write through to disk on commit, not on each IO. + * @param writeCacheEnabled + * When <code>true</code>, the {@link DiskOnlyStrategy} will + * allocate a direct {@link ByteBuffer} from the + * {@link DirectBufferPool} to service as a write cache. + * @param writeCacheBufferCount + * The #of buffers to allocate for the {@link WriteCacheService}. + * @param validateChecksum + * When <code>true</code>, the checksum stored in the root blocks + * of an existing file will be validated when the file is opened. + * See {@link Options#VALIDATE_CHECKSUM}. + * @param alternateRootBlock + * When <code>true</code> the prior root block will be used. This + * option may be used when a commit record is valid but the data + * associated with the commit point is invalid. There are two + * root blocks. Normally the one which has been most recently + * written will be loaded on restart. When this option is + * specified, the older of the two root blocks will be loaded + * instead. <strong>If you use this option and then do a commit + * then the more recent of the root blocks will be lost and any + * data associated with that commit point will be lost as + * well!</strong> + * @param ignoreBadRootBlock + * When <code>true</code>, the application will be allowed to + * proceed with one damaged root block. The undamaged root block + * will be automatically chosen. + * + * @throws RuntimeException + * if there is a problem preparing the file for use by the + * journal. + */ FileMetadata(final File file, final boolean useDirectBuffers, final boolean readOnly, final ForceEnum forceWrites, final boolean writeCacheEnabled, final int writeCacheBufferCount, final boolean validateChecksum, final boolean alternateRootBlock, - final Properties properties) + final boolean ignoreBadRootBlock, final Properties properties) throws RuntimeException { if (file == null) @@ -976,7 +980,7 @@ * constants (slotSize, segmentId). */ final RootBlockUtility tmp = new RootBlockUtility(opener, file, - validateChecksum, alternateRootBlock); + validateChecksum, alternateRootBlock, ignoreBadRootBlock); this.rootBlock0 = tmp.rootBlock0; this.rootBlock1 = tmp.rootBlock1; this.rootBlock = tmp.rootBlock; @@ -1422,9 +1426,12 @@ properties, Options.VALIDATE_CHECKSUM, Options.DEFAULT_VALIDATE_CHECKSUM)); - final boolean alternateRootBlock = Boolean.parseBoolean(getProperty( - properties, Options.ALTERNATE_ROOT_BLOCK, "false")); + final boolean alternateRootBlock = Boolean.parseBoolean(getProperty( + properties, Options.ALTERNATE_ROOT_BLOCK, "false")); + final boolean ignoreBadRootBlock = Boolean.parseBoolean(getProperty( + properties, Options.IGNORE_BAD_ROOT_BLOCK, "false")); + if (alternateRootBlock && !readOnly) { log.warn("*** Using the alternate root block: " @@ -1440,7 +1447,8 @@ return new FileMetadata(file, useDirectBuffers, readOnly, forceWrites, writeCacheEnabled, writeCacheBufferCount, - validateChecksum, alternateRootBlock, properties); + validateChecksum, alternateRootBlock, ignoreBadRootBlock, + properties); } else { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/Options.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/Options.java 2011-03-14 17:49:49 UTC (rev 4297) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/Options.java 2011-03-14 18:17:32 UTC (rev 4298) @@ -322,12 +322,12 @@ * the more current root block is used to (re-)open the store. However, if * this option is specified, the <em>previous</em> root block will be used * to open the store. This will allow you to access the previous commit - * point. <strong>If you subsequently perform a commit then the most root - * block will be overwritten and any data associated with the last commit - * point will be unreachable.</strong> This option may be considered in the - * cases where the application is otherwise unable to proceed. It is - * strongly recommended that you also specify {@link #READ_ONLY} so that you - * do not <em>accidentally</em> trigger a commit and thereby make the data + * point. <strong>If you subsequently perform a commit then the other root + * block will be overwritten and any data associated with its commit point + * will be unreachable.</strong> This option may be considered in the cases + * where the application is otherwise unable to proceed. It is strongly + * recommended that you also specify {@link #READ_ONLY} so that you do not + * <em>accidentally</em> trigger a commit and thereby make the data * associated with the other root block unreachable. You may of course * deliberately allow a commit as an attempt to restore the database to * service accepting that you have rolled back the database by one commit @@ -336,6 +336,21 @@ String ALTERNATE_ROOT_BLOCK = AbstractJournal.class.getName()+".alternateRootBlock"; /** + * <strong>WARNING - The use of this option is dangerous.</strong> This + * option MAY be used to permit the database to be opened if one of the root + * blocks is bad. This will allow you to access the remaining root block and + * the associated commit point. <strong>If you subsequently perform a commit + * then the damaged root block will be overwritten.</strong> This option may + * be considered in the cases where the application is otherwise unable to + * proceed. It is strongly recommended that you also specify + * {@link #READ_ONLY} so that you do not <em>accidentally</em> trigger a + * commit. You may of course deliberately allow a commit as an attempt to + * restore the database to service accepting that you may have rolled back + * the database by one commit point in doing so. + */ + String IGNORE_BAD_ROOT_BLOCK = AbstractJournal.class.getName()+".ignoreBadRootBlock"; + + /** * An optional boolean property (default is {@value #DEFAULT_CREATE}). When * <code>true</code> and the named file is not found, a new journal will be * created. If the file exists but is empty, then a new journal will be Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockUtility.java 2011-03-14 17:49:49 UTC (rev 4297) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockUtility.java 2011-03-14 18:17:32 UTC (rev 4298) @@ -67,10 +67,24 @@ */ public final IRootBlockView rootBlock; + /** + * + * @param opener + * @param file + * @param validateChecksum + * @param alternateRootBlock + * @param ignoreBadRootBlock + * + * @throws IOException + * + * @see com.bigdata.journal.Options#ALTERNATE_ROOT_BLOCK + * @see com.bigdata.journal.Options#IGNORE_BAD_ROOT_BLOCK + */ public RootBlockUtility(final IReopenChannel<FileChannel> opener, final File file, final boolean validateChecksum, - final boolean alternateRootBlock) throws IOException { - + final boolean alternateRootBlock, final boolean ignoreBadRootBlock) + throws IOException { + final ChecksumUtility checker = validateChecksum ? ChecksumUtility.threadChk .get() : null; @@ -87,31 +101,43 @@ try { rootBlock0 = new RootBlockView(true, tmp0, checker); } catch (RootBlockException ex) { - log.warn("Bad root block zero: " + ex); + log.error("Bad root block zero: " + ex); } try { rootBlock1 = new RootBlockView(false, tmp1, checker); } catch (RootBlockException ex) { - log.warn("Bad root block one: " + ex); + log.error("Bad root block one: " + ex); } if (rootBlock0 == null && rootBlock1 == null) { throw new RuntimeException( "Both root blocks are bad - journal is not usable: " + file); } + // save references. this.rootBlock0 = rootBlock0; this.rootBlock1 = rootBlock1; - + + if (!ignoreBadRootBlock + && (rootBlock0 == null || rootBlock1 == null)) { + /* + * Do not permit the application to continue with a damaged + * root block. + */ + throw new RuntimeException( + "Bad root block(s): rootBlock0 is " + + (rootBlock0 == null ? "bad" : "ok") + + ", rootBlock1=" + + (rootBlock1 == null ? "bad" : "ok")); + } if(alternateRootBlock) { /* * A request was made to use the alternative root block. */ if (rootBlock0 == null || rootBlock1 == null) { /* - * Note: The [alternateRootBlock] flag only makes sense - * when you have two to choose from and you want to - * choose the other one. In this case, your only choice - * is to use the undamaged root block. + * Note: The [alternateRootBlock] flag only makes sense when you + * have two root blocks to choose from and you want to choose + * the other one. */ throw new RuntimeException( "Can not use alternative root block since one root block is damaged."); @@ -219,12 +245,17 @@ }; + // validate the root blocks using their checksums. final boolean validateChecksum = true; - + + // option is ignored since we are not not opening the Journal. final boolean alternateRootBlock = false; + + // Open even if one root block is bad. + final boolean ignoreBadRootBlock = true; final RootBlockUtility u = new RootBlockUtility(opener, file, - validateChecksum, alternateRootBlock); + validateChecksum, alternateRootBlock, ignoreBadRootBlock); System.out.println("rootBlock0: " + u.rootBlock0); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-03-14 17:49:49 UTC (rev 4297) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-03-14 18:17:32 UTC (rev 4298) @@ -2068,7 +2068,8 @@ try { final RootBlockUtility tmp = new RootBlockUtility(m_reopener, m_fd, - true/* validateChecksum */, false/* alternateRootBlock */); + true/* validateChecksum */, false/* alternateRootBlock */, + false/* ignoreBadRootBlock */); final IRootBlockView rootBlock = tmp.rootBlock; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-03-16 19:45:12
|
Revision: 4303 http://bigdata.svn.sourceforge.net/bigdata/?rev=4303&view=rev Author: thompsonbry Date: 2011-03-16 19:45:05 +0000 (Wed, 16 Mar 2011) Log Message: ----------- Modified DumpJournal to include an option to summarize the page (node and leaf) usage statistics and refactored some code into a DumpIndex class. Modified IndexMetadata#toString() to report the details on the TupleSerializer so we can observe which codes are used for leaves in an index. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndexSegment.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IndexMetadata.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/DumpJournal.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndex.java Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndex.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndex.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndex.java 2011-03-16 19:45:05 UTC (rev 4303) @@ -0,0 +1,178 @@ +package com.bigdata.btree; + +/** + * Utility class to dump an index in a variety of ways. + * + * @author thompsonbry + */ +public class DumpIndex { + + /** + * Utility method using an {@link ITupleIterator} to dump the keys and + * values in an {@link AbstractBTree}. + * + * @param ndx + * The index. + * @param showTuples + * When <code>true</code> the data for the keys and values will + * be displayed. Otherwise the scan will simply exercise the + * iterator. + */ + public static void dumpIndex(final AbstractBTree ndx, + final boolean showTuples) { + + // Note: reused for each tuple to avoid heap churn. + final StringBuilder sb = new StringBuilder(); + + // @todo offer the version metadata also if the index supports + // isolation. + final ITupleIterator<?> itr = ndx.rangeIterator(null, null); + + final long begin = System.currentTimeMillis(); + + int i = 0; + + while (itr.hasNext()) { + + final ITuple<?> tuple = itr.next(); + + if (showTuples) { + + dumpTuple(i, sb, tuple); + + } + + i++; + + } + + final long elapsed = System.currentTimeMillis() - begin; + + System.out.println("Visited " + i + " tuples in " + elapsed + "ms"); + + } + + private static void dumpTuple(final int recno, final StringBuilder tupleSB, + final ITuple<?> tuple) { + + final ITupleSerializer<?, ?> tupleSer = tuple.getTupleSerializer(); + + tupleSB.setLength(0); // reset. + + tupleSB.append("rec=" + recno); + + try { + + tupleSB.append("\nkey=" + tupleSer.deserializeKey(tuple)); + + } catch (Throwable t) { + + tupleSB.append("\nkey=" + BytesUtil.toString(tuple.getKey())); + + } + + try { + + tupleSB.append("\nval=" + tupleSer.deserialize(tuple)); + + } catch (Throwable t) { + + tupleSB.append("\nval=" + BytesUtil.toString(tuple.getValue())); + + } + + System.out.println(tupleSB); + + } + + /** + * Dumps pages (nodes and leaves) using a low-level approach. + * + * @param ndx + * The index. + * @param dumpNodeState + * When <code>true</code>, provide gruesome detail on each + * visited page. + */ + public static void dumpPages(final AbstractBTree ndx, + final boolean dumpNodeState) { + + final PageStats stats = new PageStats(); + + dumpPages(ndx, ndx.getRoot(), stats, dumpNodeState); + + System.out.println("name=" + ndx.getIndexMetadata().getName() + " : " + + stats); + + } + + /* + * FIXME report the min/max page size as well and report as a nice table in + * DumpJournal. + */ + static class PageStats { + + /** The #of nodes visited. */ + public long nnodes; + /** The #of leaves visited. */ + public long nleaves; + /** The #of bytes in the raw records for the nodes visited. */ + public long nodeBytes; + /** The #of bytes in the raw records for the leaves visited. */ + public long leafBytes; + + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append(getClass().getName()); + sb.append("{nnodes="+nnodes); + sb.append(",nleaves="+nleaves); + sb.append(",nodeBytes="+nodeBytes); + sb.append(",leafBytes="+leafBytes); + sb.append(",bytesPerNode="+(nnodes==0?0:nodeBytes/nnodes)); + sb.append(",bytesPerLeaf="+(nleaves==0?0:leafBytes/nleaves)); + sb.append("}"); + return sb.toString(); + } + + } + + private static void dumpPages(final AbstractBTree ndx, + final AbstractNode<?> node, final PageStats stats, + final boolean dumpNodeState) { + + if (dumpNodeState) + node.dump(System.out); + + final long addrSelf = node.getIdentity(); + + final long nbytes = ndx.getStore().getByteCount(addrSelf); + + final boolean isLeaf = node.isLeaf(); + + if (isLeaf) { + + stats.nleaves++; + stats.leafBytes += nbytes; + + } else { + + stats.nnodes++; + stats.nodeBytes += nbytes; + + final int nkeys = node.getKeyCount(); + + for (int i = 0; i <= nkeys; i++) { + + // normal read following the node hierarchy, using cache, etc. + final AbstractNode<?> child = ((Node) node).getChild(i); + + // recursive dump + dumpPages(ndx, child, stats, dumpNodeState); + + } + + } + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndexSegment.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndexSegment.java 2011-03-16 18:48:33 UTC (rev 4302) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndexSegment.java 2011-03-16 19:45:05 UTC (rev 4303) @@ -37,7 +37,6 @@ import com.bigdata.btree.IndexSegment.ImmutableNodeFactory.ImmutableLeaf; import com.bigdata.io.DirectBufferPool; -import com.bigdata.journal.DumpJournal; import com.bigdata.rawstore.IRawStore; /** @@ -192,7 +191,7 @@ writeBanner("dump keys and values using iterator"); - DumpJournal.dumpIndex(store.loadIndexSegment(),showTuples); + DumpIndex.dumpIndex(store.loadIndexSegment(),showTuples); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IndexMetadata.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IndexMetadata.java 2011-03-16 18:48:33 UTC (rev 4302) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IndexMetadata.java 2011-03-16 19:45:05 UTC (rev 4303) @@ -2347,7 +2347,7 @@ sb.append(", btreeRecordCompressorFactory=" + (btreeRecordCompressorFactory == null ? "N/A" : btreeRecordCompressorFactory)); - sb.append(", tupleSerializer=" + tupleSer.getClass().getName()); + sb.append(", tupleSerializer=" + tupleSer);//.getClass().getName()); sb.append(", conflictResolver=" + (conflictResolver == null ? "N/A" : conflictResolver .getClass().getName())); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/DumpJournal.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2011-03-16 18:48:33 UTC (rev 4302) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2011-03-16 19:45:05 UTC (rev 4303) @@ -33,13 +33,10 @@ import org.apache.log4j.Logger; -import com.bigdata.btree.AbstractBTree; import com.bigdata.btree.BTree; -import com.bigdata.btree.BytesUtil; +import com.bigdata.btree.DumpIndex; import com.bigdata.btree.IIndex; -import com.bigdata.btree.ITuple; import com.bigdata.btree.ITupleIterator; -import com.bigdata.btree.ITupleSerializer; import com.bigdata.rawstore.Bytes; import com.bigdata.util.InnerCause; @@ -92,6 +89,8 @@ * committed state).</dd> * <dt>-indices</dt> * <dd>Dump the indices (does not show the tuples by default).</dd> + * <dt>-pages</dt> + * <dd>Dump the pages of the indices and reports some information on the page size.</dd> * <dt>-tuples</dt> * <dd>Dump the records in the indices.</dd> * </dl> @@ -111,6 +110,8 @@ boolean dumpHistory = false; boolean dumpIndices = false; + + boolean dumpPages = false; boolean showTuples = false; @@ -131,18 +132,27 @@ } - if(arg.equals("-indices")) { + else if(arg.equals("-indices")) { dumpIndices = true; } - if(arg.equals("-tuples")) { + else if(arg.equals("-pages")) { + dumpPages = true; + + } + + else if(arg.equals("-tuples")) { + showTuples = true; } + else + throw new RuntimeException("Unknown argument: " + arg); + } for(; i<args.length; i++) { @@ -151,7 +161,7 @@ try { - dumpJournal(file,dumpHistory,dumpIndices,showTuples); + dumpJournal(file,dumpHistory,dumpPages,dumpIndices,showTuples); } catch( RuntimeException ex) { @@ -161,20 +171,20 @@ } - System.err.println("=================================="); + System.out.println("=================================="); } } - public static void dumpJournal(File file,boolean dumpHistory,boolean dumpIndices,boolean showTuples) { + public static void dumpJournal(File file,boolean dumpHistory,boolean dumpPages,boolean dumpIndices,boolean showTuples) { /* * Stat the file and report on its size, etc. */ { - System.err.println("File: "+file); + System.out.println("File: "+file); if(!file.exists()) { @@ -192,9 +202,9 @@ } - System.err.println("Length: "+file.length()); + System.out.println("Length: "+file.length()); - System.err.println("Last Modified: "+new Date(file.lastModified())); + System.out.println("Last Modified: "+new Date(file.lastModified())); } @@ -210,7 +220,7 @@ } - System.err.println("Opening (read-only): "+file); + System.out.println("Opening (read-only): "+file); final Journal journal = new Journal(properties); @@ -219,15 +229,15 @@ final FileMetadata fmd = journal.getFileMetadata(); // dump the MAGIC and VERSION. - System.err.println("magic="+Integer.toHexString(fmd.magic)); - System.err.println("version="+Integer.toHexString(fmd.version)); + System.out.println("magic="+Integer.toHexString(fmd.magic)); + System.out.println("version="+Integer.toHexString(fmd.version)); // dump the root blocks. - System.err.println(fmd.rootBlock0.toString()); - System.err.println(fmd.rootBlock1.toString()); + System.out.println(fmd.rootBlock0.toString()); + System.out.println(fmd.rootBlock1.toString()); // report on which root block is the current root block. - System.err.println("The current root block is #" + System.out.println("The current root block is #" + (journal.getRootBlockView().isRootBlock0() ? 0 : 1)); /* @@ -241,14 +251,14 @@ final long bytesAvailable = (fmd.userExtent - fmd.nextOffset); - System.err.println("extent="+fmd.extent+"("+fmd.extent/Bytes.megabyte+"M)"+ + System.out.println("extent="+fmd.extent+"("+fmd.extent/Bytes.megabyte+"M)"+ ", userExtent="+fmd.userExtent+"("+fmd.userExtent/Bytes.megabyte+"M)"+ ", bytesAvailable="+bytesAvailable+"("+bytesAvailable/Bytes.megabyte+"M)"+ ", nextOffset="+fmd.nextOffset); if (dumpHistory) { - System.err.println("Historical commit points follow in temporal sequence (first to last):"); + System.out.println("Historical commit points follow in temporal sequence (first to last):"); CommitRecordIndex commitRecordIndex = journal.getCommitRecordIndex(); // CommitRecordIndex commitRecordIndex = journal._commitRecordIndex; @@ -257,19 +267,19 @@ while(itr.hasNext()) { - System.err.println("----"); + System.out.println("----"); final CommitRecordIndex.Entry entry = itr.next().getObject(); - System.err.print("Commit Record: " + entry.commitTime + System.out.print("Commit Record: " + entry.commitTime + ", addr=" + journal.toString(entry.addr)+", "); final ICommitRecord commitRecord = journal .getCommitRecord(entry.commitTime); - System.err.println(commitRecord.toString()); + System.out.println(commitRecord.toString()); - dumpNamedIndicesMetadata(journal,commitRecord,dumpIndices,showTuples); + dumpNamedIndicesMetadata(journal,commitRecord,dumpPages,dumpIndices,showTuples); } @@ -281,9 +291,9 @@ final ICommitRecord commitRecord = journal.getCommitRecord(); - System.err.println(commitRecord.toString()); + System.out.println(commitRecord.toString()); - dumpNamedIndicesMetadata(journal,commitRecord,dumpIndices,showTuples); + dumpNamedIndicesMetadata(journal,commitRecord,dumpPages,dumpIndices,showTuples); } @@ -302,7 +312,7 @@ * @param commitRecord */ private static void dumpNamedIndicesMetadata(AbstractJournal journal, - ICommitRecord commitRecord, boolean dumpIndices, boolean showTuples) { + ICommitRecord commitRecord, boolean dumpPages, boolean dumpIndices, boolean showTuples) { // view as of that commit record. final IIndex name2Addr = journal.getName2Addr(commitRecord.getTimestamp()); @@ -315,7 +325,7 @@ final Name2Addr.Entry entry = Name2Addr.EntrySerializer.INSTANCE .deserialize(itr.next().getValueStream()); - System.err.println("name=" + entry.name + ", addr=" + System.out.println("name=" + entry.name + ", addr=" + journal.toString(entry.checkpointAddr)); // load B+Tree from its checkpoint record. @@ -347,85 +357,19 @@ } // show checkpoint record. - System.err.println("\t" + ndx.getCheckpoint()); + System.out.println("\t" + ndx.getCheckpoint()); // show metadata record. - System.err.println("\t" + ndx.getIndexMetadata()); + System.out.println("\t" + ndx.getIndexMetadata()); + if (dumpPages) + DumpIndex.dumpPages(ndx, false/* dumpNodeState */); + if (dumpIndices) - dumpIndex(ndx, showTuples); + DumpIndex.dumpIndex(ndx, showTuples); } } - /** - * Utility method using an {@link ITupleIterator} to dump the keys and - * values in an {@link AbstractBTree}. - * - * @param btree - * @param showTuples - * When <code>true</code> the data for the keys and values will - * be displayed. Otherwise the scan will simply exercise the - * iterator. - */ - public static void dumpIndex(AbstractBTree btree, boolean showTuples) { - - // @todo offer the version metadata also if the index supports isolation. - final ITupleIterator itr = btree.rangeIterator(null, null); - - final long begin = System.currentTimeMillis(); - - int i = 0; - - while(itr.hasNext()) { - - final ITuple tuple = itr.next(); - - if(showTuples) { - - System.err.println("rec="+i+dumpTuple( tuple )); - - } - - i++; - - } - - final long elapsed = System.currentTimeMillis() - begin; - - System.err.println("Visited "+i+" tuples in "+elapsed+"ms"); - - } - - private static String dumpTuple(ITuple tuple) { - - final ITupleSerializer tupleSer = tuple.getTupleSerializer(); - - final StringBuilder sb = new StringBuilder(); - - try { - - sb.append("\nkey="+tupleSer.deserializeKey(tuple)); - - } catch(Throwable t) { - - sb.append("\nkey="+BytesUtil.toString(tuple.getKey())); - - } - - try { - - sb.append("\nval="+tupleSer.deserialize(tuple)); - - } catch(Throwable t) { - - sb.append("\nval="+BytesUtil.toString(tuple.getValue())); - - } - - return sb.toString(); - - } - } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-03-17 14:40:00
|
Revision: 4307 http://bigdata.svn.sourceforge.net/bigdata/?rev=4307&view=rev Author: thompsonbry Date: 2011-03-17 14:39:53 +0000 (Thu, 17 Mar 2011) Log Message: ----------- Corrected some inconsistencies in DumpJournal and DumpIndex with respect to gathering and reporting the PageStats for an index. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndex.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/DumpJournal.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndex.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndex.java 2011-03-17 12:34:36 UTC (rev 4306) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndex.java 2011-03-17 14:39:53 UTC (rev 4307) @@ -93,24 +93,25 @@ * @param dumpNodeState * When <code>true</code>, provide gruesome detail on each * visited page. + * + * @return Some interesting statistics about the pages in that index which + * the caller can print out. */ - public static void dumpPages(final AbstractBTree ndx, + public static PageStats dumpPages(final AbstractBTree ndx, final boolean dumpNodeState) { final PageStats stats = new PageStats(); dumpPages(ndx, ndx.getRoot(), stats, dumpNodeState); - System.out.println("name=" + ndx.getIndexMetadata().getName() + " : " - + stats); + return stats; } - - /* - * FIXME report the min/max page size as well and report as a nice table in - * DumpJournal. + + /** + * Class reports various summary statistics for nodes and leaves. */ - static class PageStats { + public static class PageStats { /** The #of nodes visited. */ public long nnodes; @@ -120,16 +121,39 @@ public long nodeBytes; /** The #of bytes in the raw records for the leaves visited. */ public long leafBytes; + /** The min/max bytes per node. */ + public long minNodeBytes, maxNodeBytes; + /** The min/max bytes per leaf. */ + public long minLeafBytes, maxLeafBytes; + /** Return {@link #nodeBytes} plus {@link #leafBytes}. */ + public long getTotalBytes() { + return nodeBytes + leafBytes; + } + + /** The average bytes per node. */ + public long getBytesPerNode() { + return (nnodes == 0 ? 0 : nodeBytes / nnodes); + } + + /** The average bytes per leaf. */ + public long getBytesPerLeaf() { + return (nleaves == 0 ? 0 : leafBytes / nleaves); + } + public String toString() { final StringBuilder sb = new StringBuilder(); sb.append(getClass().getName()); - sb.append("{nnodes="+nnodes); - sb.append(",nleaves="+nleaves); - sb.append(",nodeBytes="+nodeBytes); - sb.append(",leafBytes="+leafBytes); - sb.append(",bytesPerNode="+(nnodes==0?0:nodeBytes/nnodes)); - sb.append(",bytesPerLeaf="+(nleaves==0?0:leafBytes/nleaves)); + sb.append("{nnodes=" + nnodes); + sb.append(",nleaves=" + nleaves); + sb.append(",nodeBytes=" + nodeBytes); + sb.append(",minNodeBytes=" + minNodeBytes); + sb.append(",maxNodeBytes=" + maxNodeBytes); + sb.append(",leafBytes=" + leafBytes); + sb.append(",minLeafBytes=" + minLeafBytes); + sb.append(",maxLeafBytes=" + maxLeafBytes); + sb.append(",bytesPerNode=" + getBytesPerNode()); + sb.append(",bytesPerLeaf=" + getBytesPerLeaf()); sb.append("}"); return sb.toString(); } @@ -153,11 +177,19 @@ stats.nleaves++; stats.leafBytes += nbytes; + if (stats.minLeafBytes > nbytes || stats.minLeafBytes == 0) + stats.minLeafBytes = nbytes; + if (stats.maxLeafBytes < nbytes) + stats.maxLeafBytes = nbytes; } else { stats.nnodes++; stats.nodeBytes += nbytes; + if (stats.minNodeBytes > nbytes || stats.minNodeBytes == 0) + stats.minNodeBytes = nbytes; + if (stats.maxNodeBytes < nbytes) + stats.maxNodeBytes = nbytes; final int nkeys = node.getKeyCount(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/DumpJournal.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2011-03-17 12:34:36 UTC (rev 4306) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2011-03-17 14:39:53 UTC (rev 4307) @@ -29,7 +29,9 @@ import java.io.File; import java.util.Date; +import java.util.Map; import java.util.Properties; +import java.util.TreeMap; import org.apache.log4j.Logger; @@ -37,6 +39,7 @@ import com.bigdata.btree.DumpIndex; import com.bigdata.btree.IIndex; import com.bigdata.btree.ITupleIterator; +import com.bigdata.btree.DumpIndex.PageStats; import com.bigdata.rawstore.Bytes; import com.bigdata.util.InnerCause; @@ -260,10 +263,10 @@ System.out.println("Historical commit points follow in temporal sequence (first to last):"); - CommitRecordIndex commitRecordIndex = journal.getCommitRecordIndex(); + final CommitRecordIndex commitRecordIndex = journal.getCommitRecordIndex(); // CommitRecordIndex commitRecordIndex = journal._commitRecordIndex; - ITupleIterator<CommitRecordIndex.Entry> itr = commitRecordIndex.rangeIterator(); + final ITupleIterator<CommitRecordIndex.Entry> itr = commitRecordIndex.rangeIterator(); while(itr.hasNext()) { @@ -311,14 +314,17 @@ * @param journal * @param commitRecord */ - private static void dumpNamedIndicesMetadata(AbstractJournal journal, - ICommitRecord commitRecord, boolean dumpPages, boolean dumpIndices, boolean showTuples) { + private static void dumpNamedIndicesMetadata(AbstractJournal journal, + ICommitRecord commitRecord, boolean dumpPages, boolean dumpIndices, boolean showTuples) { // view as of that commit record. final IIndex name2Addr = journal.getName2Addr(commitRecord.getTimestamp()); final ITupleIterator itr = name2Addr.rangeIterator(null,null); - + + final Map<String, PageStats> pageStats = dumpPages ? new TreeMap<String, PageStats>() + : null; + while (itr.hasNext()) { // a registered index. @@ -362,14 +368,100 @@ // show metadata record. System.out.println("\t" + ndx.getIndexMetadata()); - if (dumpPages) - DumpIndex.dumpPages(ndx, false/* dumpNodeState */); + if (pageStats != null) { + + final PageStats stats = DumpIndex + .dumpPages(ndx, false/* dumpNodeState */); + System.out.println("\t" + stats); + + pageStats.put(entry.name, stats); + + } + if (dumpIndices) DumpIndex.dumpIndex(ndx, showTuples); } + + if (pageStats != null) { + + /* + * TODO If we kept the BTree counters for the #of bytes written per + * node and per leaf up to date when nodes and leaves were recycled + * then we could generate this table very quickly. As it stands, we + * have to actually scan the pages in the index. + */ + System.out.print("name"); + System.out.print('\t'); + System.out.print("m"); + System.out.print('\t'); + System.out.print("height"); + System.out.print('\t'); + System.out.print("nnodes"); + System.out.print('\t'); + System.out.print("nleaves"); + System.out.print('\t'); + System.out.print("nodeBytes"); + System.out.print('\t'); + System.out.print("leafBytes"); + System.out.print('\t'); + System.out.print("totalBytes"); + System.out.print('\t'); + System.out.print("avgNodeBytes"); + System.out.print('\t'); + System.out.print("avgLeafBytes"); + System.out.print('\t'); + System.out.print("minNodeBytes"); + System.out.print('\t'); + System.out.print("maxNodeBytes"); + System.out.print('\t'); + System.out.print("minLeafBytes"); + System.out.print('\t'); + System.out.print("maxLeafBytes"); + System.out.print('\n'); + + for(Map.Entry<String,PageStats> e : pageStats.entrySet()) { + + final String name = e.getKey(); + + final PageStats stats = e.getValue(); + + final BTree ndx = (BTree) journal.getIndex(name, commitRecord); + + System.out.print(name); + System.out.print('\t'); + System.out.print(ndx.getBranchingFactor()); + System.out.print('\t'); + System.out.print(ndx.getHeight()); + System.out.print('\t'); + System.out.print(ndx.getNodeCount()); + System.out.print('\t'); + System.out.print(ndx.getLeafCount()); + System.out.print('\t'); + System.out.print(stats.nodeBytes); + System.out.print('\t'); + System.out.print(stats.leafBytes); + System.out.print('\t'); + System.out.print(stats.getTotalBytes()); + System.out.print('\t'); + System.out.print(stats.getBytesPerNode()); + System.out.print('\t'); + System.out.print(stats.getBytesPerLeaf()); + System.out.print('\t'); + System.out.print(stats.minNodeBytes); + System.out.print('\t'); + System.out.print(stats.maxNodeBytes); + System.out.print('\t'); + System.out.print(stats.minLeafBytes); + System.out.print('\t'); + System.out.print(stats.maxLeafBytes); + System.out.print('\n'); + + } + + } - } + } // dumpNamedIndicesMetadata } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2011-05-11 03:20:15
|
Revision: 4481 http://bigdata.svn.sourceforge.net/bigdata/?rev=4481&view=rev Author: mrpersonick Date: 2011-05-11 03:20:08 +0000 (Wed, 11 May 2011) Log Message: ----------- inline sids and reverse lookup Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/ChunkedArrayIterator.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/ArrayAccessPath.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/Bits.java Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/ArrayAccessPath.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/ArrayAccessPath.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/ArrayAccessPath.java 2011-05-11 03:20:08 UTC (rev 4481) @@ -0,0 +1,196 @@ +/* +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.relation.accesspath; + +import java.util.Collections; + +import com.bigdata.bop.IPredicate; +import com.bigdata.btree.IIndex; +import com.bigdata.btree.ITupleIterator; +import com.bigdata.striterator.ChunkedArrayIterator; +import com.bigdata.striterator.ChunkedWrappedIterator; +import com.bigdata.striterator.IChunkedOrderedIterator; +import com.bigdata.striterator.IKeyOrder; + +/** + * An access path over an array of elements. + */ +public class ArrayAccessPath<E> implements IAccessPath<E> { + + private final IPredicate<E> predicate; + + private final IKeyOrder<E> keyOrder; + + /** + * Array of elements + */ + private final E[] e; + + /** + * Ctor variant does not specify the {@link #getPredicate()} or the + * {@link #getKeyOrder()} and those methods will throw an + * {@link UnsupportedOperationException} if invoked. + */ + public ArrayAccessPath(final E[] e) { + + this(e, null/* predicate */, null/* keyOrder */); + + } + + /** + * Note: the {@link #getPredicate()} and {@link #getKeyOrder()} and methods + * will throw an {@link UnsupportedOperationException} if the corresponding + * argument is null. + */ + public ArrayAccessPath(final E[] e, + final IPredicate<E> predicate, final IKeyOrder<E> keyOrder) { + + this.predicate = predicate; + + this.keyOrder = keyOrder; + + this.e = e; + + } + + /** + * @throws UnsupportedOperationException + * unless the caller specified an {@link IPredicate} to the + * ctor. + */ + public IPredicate<E> getPredicate() { + + if (predicate == null) + throw new UnsupportedOperationException(); + + return predicate; + + } + + /** + * @throws UnsupportedOperationException + * unless the caller specified an {@link IKeyOrder} to the ctor. + */ + public IKeyOrder<E> getKeyOrder() { + + if (keyOrder == null) + throw new UnsupportedOperationException(); + + return keyOrder; + + } + + /** + * @throws UnsupportedOperationException + * since no index is associated with this array + */ + public IIndex getIndex() { + + throw new UnsupportedOperationException(); + + } + + /** + * Returns <code>true</code> when the array of elements is empty. + */ + public boolean isEmpty() { + + return e.length == 0; + + } + + /** + * Returns the size of the array of elements. + */ + public long rangeCount(boolean exact) { + + return e.length; + + } + + /** + * @throws UnsupportedOperationException + * since no index is associated with this array + */ + public ITupleIterator<E> rangeIterator() { + + throw new UnsupportedOperationException(); + + } + + /** + * Visits the entire array of elements. + */ + public IChunkedOrderedIterator<E> iterator() { + + if (e.length == 0) { + return new ChunkedWrappedIterator<E>( + Collections.EMPTY_LIST.iterator()); + } + + return new ChunkedArrayIterator<E>(e); + + } + + /** + * Visits the array of elements up to the specified limit. + */ + public IChunkedOrderedIterator<E> iterator(final int limit, + final int capacity) { + + return iterator(0L/* offset */, limit, capacity); + + } + + /** + * Visits the array of elements from the specified offset up to the + * specified limit. + */ + @SuppressWarnings("unchecked") + public IChunkedOrderedIterator<E> iterator(final long offset, + final long limit, final int capacity) { + + if (e.length == 0) { + return new ChunkedWrappedIterator<E>( + Collections.EMPTY_LIST.iterator()); + } + + final E[] a = (E[]) java.lang.reflect.Array.newInstance( + e[0].getClass(), (int) limit); + + System.arraycopy(e, (int) offset, a, 0, (int) limit); + + return new ChunkedArrayIterator<E>(a); + + } + + /** + * Does nothing and always returns ZERO(0). + */ + public long removeAll() { + + return 0L; + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/ChunkedArrayIterator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/ChunkedArrayIterator.java 2011-05-11 03:19:09 UTC (rev 4480) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/ChunkedArrayIterator.java 2011-05-11 03:20:08 UTC (rev 4481) @@ -74,6 +74,18 @@ * * @param a * The array of elements. + */ + public ChunkedArrayIterator(final E[] a) { + + this(a.length, a, null); + + } + + /** + * An iterator that visits the elements in the given array. + * + * @param a + * The array of elements. * @param n * The #of entries in <i>a</i> that are valid. * @param keyOrder Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/Bits.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/Bits.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/Bits.java 2011-05-11 03:20:08 UTC (rev 4481) @@ -0,0 +1,136 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.util; + +import it.unimi.dsi.bits.BitVector; + +import java.nio.ByteBuffer; + + +/** + * Simple helper class to work with bits inside a byte. Useful for classes + * that have a lot of boolean properties or pointers to enums that can be + * more compactly represented as a series of bit flags. See SPO. + * + * @author mikepersonick + */ +public class Bits { + + /** + * Set a bit inside a byte. + * + * @param bits + * the original byte + * @param i + * the bit index (0 through 7) + * @param bit + * the bit value + * @return + * the new byte + */ + public static byte set(final byte bits, final int i, final boolean bit) { + + // check to see if bits[i] == bit already, if so, nothing to do + // also does range check on i + if (get(bits, i) == bit) + return bits; + + byte b = bits; + if (bit) { + b = (byte) (b | (0x1 << i)); + } else { + b = (byte) (b & ~(0x1 << i)); + } + return b; + + } + + /** + * Get a bit from inside a byte. + * + * @param bits + * the byte + * @param i + * the bit index (0 through 7) + * @return + * the bit value + */ + public static boolean get(final byte bits, final int i) { + + if (i < 0 || i > 7) { + throw new IndexOutOfBoundsException(); + } + + return (bits & (0x1 << i)) != 0; + + } + + /** + * Get a new byte, masking off all but the bits specified by m. + * + * @param bits + * the original byte + * @param m + * the bits to keep, all others will be masked + * @return + * the new byte + */ + public static byte mask(final byte bits, final int... m) { + + byte b = 0; + + for (int i = 0; i < m.length; i++) { + + if (m[i] < 0 || m[i] > 7) { + throw new IndexOutOfBoundsException(); + } + + b |= (0x1 << m[i]); + + } + + b &= bits; + + return b; + + } + + /** + * Useful for debugging. + * + * @param bits + * the byte + * @return + * the unsigned binary string representation + */ + public static String toString(final byte bits) { + + final byte[] d = new byte[] { bits }; + final ByteBuffer b = ByteBuffer.wrap(d); + final BitVector v = new ByteBufferBitVector(b); + return v.toString(); + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2011-05-12 12:43:01
|
Revision: 4488 http://bigdata.svn.sourceforge.net/bigdata/?rev=4488&view=rev Author: martyncutcher Date: 2011-05-12 12:42:55 +0000 (Thu, 12 May 2011) Log Message: ----------- Add context registration Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2011-05-12 09:18:13 UTC (rev 4487) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2011-05-12 12:42:55 UTC (rev 4488) @@ -2227,6 +2227,10 @@ source.getResourceLocator()// delegate locator ); + final IBufferStrategy bufferStrategy = source.getBufferStrategy(); + if (bufferStrategy instanceof RWStrategy) { + ((RWStrategy) bufferStrategy).getRWStore().registerContext(this); + } } /* Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-05-12 09:18:13 UTC (rev 4487) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-05-12 12:42:55 UTC (rev 4488) @@ -862,6 +862,10 @@ } public void releaseSession(RWWriteCacheService cache) { + if (m_context != null) { + throw new IllegalStateException("Calling releaseSession on shadowed allocator"); + } + if (this.m_sessionActive) { if (log.isTraceEnabled()) log.trace("Allocator: #" + m_index + " releasing session protection"); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-05-12 09:18:13 UTC (rev 4487) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-05-12 12:42:55 UTC (rev 4488) @@ -1681,6 +1681,8 @@ * unwritten allocations. */ void releaseSessions() { + assert(m_activeTxCount == 0 && m_contexts.isEmpty()); + if (m_minReleaseAge == 0) { for (FixedAllocator fa : m_allocs) { fa.releaseSession(m_writeCache); @@ -3512,7 +3514,23 @@ return totalFreed; } + /** + * When a new context is started it must be registered to ensure it is + * protected. + * + * @param context + */ + public void registerContext(IAllocationContext context) { + m_allocationLock.lock(); + try { + establishContextAllocation(context); + } finally { + m_allocationLock.unlock(); + } + } + + /** * The ContextAllocation object manages a freeList of associated allocators * and an overall list of allocators. When the context is detached, all * allocators must be released and any that has available capacity will be @@ -3532,6 +3550,10 @@ m_contextRemovals++; alloc.release(); } + + if (m_contexts.isEmpty() && this.m_activeTxCount == 0) { + releaseSessions(); + } } finally { m_allocationLock.unlock(); } @@ -4444,7 +4466,7 @@ if(log.isInfoEnabled()) log.info("#activeTx="+m_activeTxCount); - if (m_activeTxCount == 0) { + if (m_activeTxCount == 0 && m_contexts.isEmpty()) { releaseSessions(); } } finally { @@ -4505,5 +4527,6 @@ m_allocationLock.unlock(); } } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2011-06-21 16:08:12
|
Revision: 4754 http://bigdata.svn.sourceforge.net/bigdata/?rev=4754&view=rev Author: martyncutcher Date: 2011-06-21 16:08:06 +0000 (Tue, 21 Jun 2011) Log Message: ----------- ensure cache stats are maintained correctly, and that the current referenced cache is never also on the free or dirty lists Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-06-21 15:23:36 UTC (rev 4753) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-06-21 16:08:06 UTC (rev 4754) @@ -130,10 +130,6 @@ * {@link #flush(boolean, long, TimeUnit)}, {@link #reset()}, and * {@link #close()}. */ -// * <p> -// * Note: To avoid lock ordering problems, acquire the read lock before you -// * increment the latch and acquire the write lock before you await the -// * latch. final private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** @@ -651,7 +647,8 @@ // header block. if (m_written) { // should be clean, NO WAY should this be written to! - log.warn("Writing to CLEAN cache: " + hashCode()); + log.error("Writing to CLEAN cache: " + hashCode()); + throw new IllegalStateException("Writing to CLEAN cache: " + hashCode()); } if (data == null) @@ -1051,21 +1048,27 @@ // write the data on the disk file. final boolean ret = writeOnChannel(view, getFirstOffset(), Collections.unmodifiableMap(recordMap), remaining); + + if (!ret) { + throw new IllegalStateException("Unable to flush WriteCache"); + } - if(!ret) - throw new TimeoutException(); - counters.nflush++; - if (ret && reset) { + if (reset) { /* * Atomic reset while holding the lock to prevent new * records from being written onto the buffer concurrently. + * + * FIXME: If the WriteCache is used directly then this makes + * sense, but if called from WriteCacheService then this must + * always clear the "master" recordMap of the WriteCacheService + * */ - + reset(); - + } return ret; @@ -1191,6 +1194,41 @@ */ public void reset() throws InterruptedException { + final Iterator<Long> entries = recordMap.keySet().iterator(); + + if (serviceRecordMap != null && entries.hasNext()) { + if (log.isInfoEnabled()) + log.info("resetting existing WriteCache: nrecords=" + recordMap.size() + ", hashCode=" + hashCode()); + + while (entries.hasNext()) { + final Long addr = entries.next(); + + /* + * We need to guard against the possibility that the entry in + * the service record map has been updated concurrently such + * that it now points to a different WriteCache instance. This + * is possible (for the RWStore) if a recently freed record has + * been subsequently reallocated on a different WriteCache. + * Using the conditional remove on ConcurrentMap guards against + * this. + */ + boolean removed = serviceRecordMap.remove(addr, this); + + registerWriteStatus(addr, 0, removed ? 'R' : 'L'); + + } + + } else { + if (log.isInfoEnabled()) + log.info("clean WriteCache: hashCode=" + hashCode()); // debug + // to + // see + // recycling + if (m_written) { + log.warn("Written WriteCache but with no records"); + } + } + final Lock writeLock = lock.writeLock(); writeLock.lockInterruptibly(); @@ -1294,7 +1332,6 @@ * * @param tmp */ - // ... and having the {@link #latch} at zero. private void _resetState(final ByteBuffer tmp) { if (tmp == null) @@ -1865,24 +1902,26 @@ } } } // synchronized(tmp) + + /* + * Fix up the debug flag when last address is cleared. + */ + if (m_written && recordMap.isEmpty()) { + m_written = false; + } } finally { release(); } } - - /* - * Fix up the debug flag when last address is cleared. - */ - if (m_written && recordMap.isEmpty()) { - m_written = false; - } } protected void registerWriteStatus(long offset, int length, char action) { - // NOP to be overridden for debug if required + // NOP to be overidden for debug if required } boolean m_written = false; + + ConcurrentMap<Long, WriteCache> serviceRecordMap; private long lastOffset; @@ -1890,7 +1929,7 @@ * Called to clear the WriteCacheService map of references to this * WriteCache. * - * @param recordMap + * @param serviceRecordMap * the map of the WriteCacheService that associates an address * with a WriteCache * @param fileExtent @@ -1900,40 +1939,8 @@ public void resetWith(final ConcurrentMap<Long, WriteCache> serviceRecordMap, final long fileExtent) throws InterruptedException { - final Iterator<Long> entries = recordMap.keySet().iterator(); - if (entries.hasNext()) { - if (log.isInfoEnabled()) - log.info("resetting existing WriteCache: nrecords=" + recordMap.size() + ", hashCode=" + hashCode()); - - while (entries.hasNext()) { - final Long addr = entries.next(); - - /* - * We need to guard against the possibility that the entry in - * the service record map has been updated concurrently such - * that it now points to a different WriteCache instance. This - * is possible (for the RWStore) if a recently freed record has - * been subsequently reallocated on a different WriteCache. - * Using the conditional remove on ConcurrentMap guards against - * this. - */ - boolean removed = serviceRecordMap.remove(addr, this); - - registerWriteStatus(addr, 0, removed ? 'R' : 'L'); - - } - - } else { - if (log.isInfoEnabled()) - log.info("clean WriteCache: hashCode=" + hashCode()); // debug - // to - // see - // recycling - if (m_written) { - log.warn("Written WriteCache but with no records"); - } - } - + this.serviceRecordMap = serviceRecordMap; + reset(); // must ensure reset state even if cache already empty setFileExtent(fileExtent); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2011-06-21 15:23:36 UTC (rev 4753) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2011-06-21 16:08:06 UTC (rev 4754) @@ -439,9 +439,13 @@ // save the current file extent. this.fileExtent.set(fileExtent); - // N-1 WriteCache instances. - for (int i = 0; i < nbuffers - 1; i++) { + // Add [current] WriteCache. + current.set(buffers[0] = newWriteCache(null/* buf */, + useChecksum, false/* bufferHasData */, opener)); + // add remaining buffers. + for (int i = 1; i < nbuffers; i++) { + final WriteCache tmp = newWriteCache(null/* buf */, useChecksum, false/* bufferHasData */, opener); @@ -451,9 +455,6 @@ } - // One more WriteCache for [current]. - current.set(buffers[nbuffers - 1] = newWriteCache(null/* buf */, - useChecksum, false/* bufferHasData */, opener)); // Set the same counters object on each of the write cache instances. final WriteCacheServiceCounters counters = new WriteCacheServiceCounters( @@ -613,6 +614,7 @@ // Now written, remove from dirtylist. dirtyList.take(); + counters.get().ndirty--; dirtyListLock.lockInterruptibly(); try { @@ -910,22 +912,22 @@ t.reset(); } - // re-populate the clean list with N-1 of our buffers - for (int i = 0; i < buffers.length - 1; i++) { - cleanList.put(buffers[i]); - } - // clear the service record map. recordMap.clear(); // set the current buffer. - current.set(buffers[buffers.length - 1]); + current.set(buffers[0]); + // re-populate the clean list with remaining buffers + for (int i = 1; i < buffers.length; i++) { + cleanList.put(buffers[i]); + } + // reset the counters. { final WriteCacheServiceCounters c = counters.get(); c.ndirty = 0; - c.nclean = buffers.length; + c.nclean = buffers.length-1; c.nreset++; } @@ -1253,7 +1255,7 @@ if (!writeLock.tryLock(remaining, TimeUnit.NANOSECONDS)) throw new TimeoutException(); try { - final WriteCache tmp = current.get(); + final WriteCache tmp = current.getAndSet(null); if (tmp.remaining() == 0) { /* * Handle an empty buffer by waiting until the dirtyList is @@ -1293,6 +1295,7 @@ * code is much less complex here. */ dirtyList.add(tmp); + counters.get().ndirty++; dirtyListNotEmpty.signalAll(); while (!dirtyList.isEmpty() && !halt) { // remaining := (total - elapsed). @@ -1326,6 +1329,7 @@ } // Guaranteed available hence non-blocking. final WriteCache nxt = cleanList.take(); + counters.get().nclean--; nxt.resetWith(recordMap, fileExtent.get()); current.set(nxt); return true; @@ -1440,6 +1444,9 @@ throw new IllegalArgumentException( AbstractBufferStrategy.ERR_BUFFER_NULL); + // maintain nwrites + counters.get().nwrites++; + // #of bytes in the record. final int remaining = data.remaining(); @@ -1481,7 +1488,7 @@ // A duplicate may also be indicative of an allocation // error, which we need to be pretty strict about! if (old == cache) { - throw new AssertionError("Record already in cache: offset=" + offset+" "+addrDebugInfo(offset)); + throw new AssertionError("Record already in cache: offset=" + offset + " " + addrDebugInfo(offset)); } return true; @@ -1535,7 +1542,7 @@ */ if (recordMap.put(offset, cache) != null) { // The record should not already be in the cache. - throw new AssertionError("Record already in cache: offset=" + offset+" "+addrDebugInfo(offset)); + throw new AssertionError("Record already in cache: offset=" + offset + " " + addrDebugInfo(offset)); } return true; @@ -1600,7 +1607,7 @@ // Take a buffer from the cleanList (guaranteed avail). final WriteCache newBuffer = cleanList.take(); - + counters.get().nclean--; // Clear the state on the new buffer and remove from // cacheService map newBuffer.resetWith(recordMap, fileExtent.get()); @@ -1613,7 +1620,7 @@ // This must be the only occurrence of this record. if (recordMap.put(offset, cache) != null) { - throw new AssertionError("Record already in cache: offset=" + offset+" "+addrDebugInfo(offset)); + throw new AssertionError("Record already in cache: offset=" + offset + " " + addrDebugInfo(offset)); } return true; @@ -1629,7 +1636,7 @@ /* * Should never happen. */ - throw new AssertionError("Unable to write into current WriteCache"); + throw new AssertionError("Unable to write into current WriteCache " + offset + " " + addrDebugInfo(offset)); } finally { @@ -1861,8 +1868,9 @@ if (!lock.isWriteLockedByCurrentThread()) throw new IllegalMonitorStateException(); - final WriteCache cache = current.get(); + final WriteCache cache = current.getAndSet(null); assert cache != null; + /* * Note: The lock here is required to give flush() atomic semantics with * regard to the set of dirty write buffers when flush() gained the @@ -1897,6 +1905,7 @@ // Take a buffer from the cleanList (guaranteed avail). final WriteCache newBuffer = cleanList.take(); + counters.get().nclean--; // Clear state on new buffer and remove from cacheService map newBuffer.resetWith(recordMap, fileExtent.get()); @@ -1989,18 +1998,24 @@ */ public boolean clearWrite(final long offset) { try { + counters.get().nclearRequests++; final WriteCache cache = recordMap.remove(offset); if (cache == null) return false; - final WriteCache cur = acquireForWriter(); // in case current + + // Is there any point in acquiring for writer (with the readLock)? + // It prevents concurrent access with the write method that takes + // the writeLock, but is this a problem? + //final WriteCache cur = acquireForWriter(); // in case current + counters.get().nclears++; + //try { debugAddrs(offset, 0, 'F'); - try { cache.clearAddrMap(offset); return true; - } finally { - release(); - } + //} finally { + // release(); + //} } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -2100,7 +2115,19 @@ * The #of {@link WriteCache} blocks sent by the leader to the first * downstream follower. */ - public volatile long nsend; + public volatile long nsend; + /** + * The #of writes made to the writeCacheService. + */ + public volatile long nwrites; + /** + * The #of addresses cleared by the writeCacheService. + */ + public volatile long nclearRequests; + /** + * The #of addresses cleared by the writeCacheService. + */ + public volatile long nclears; public WriteCacheServiceCounters(final int nbuffers) { @@ -2145,6 +2172,24 @@ } }); + root.addCounter("nwrites", new Instrument<Long>() { + public void sample() { + setValue(nwrites); + } + }); + + root.addCounter("nclearRequests", new Instrument<Long>() { + public void sample() { + setValue(nclearRequests); + } + }); + + root.addCounter("nclears", new Instrument<Long>() { + public void sample() { + setValue(nclears); + } + }); + root.addCounter("mbPerSec", new Instrument<Double>() { public void sample() { final double mbPerSec = (((double) bytesWritten) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-06-21 15:23:36 UTC (rev 4753) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-06-21 16:08:06 UTC (rev 4754) @@ -557,7 +557,16 @@ final int block = offset/nbits; - m_sessionActive = m_store.isSessionProtected(); + /** + * When a session is released any m_sessionActive FixedAllocators + * should be atomically released. + * However, if any state allowed a call to free once the store + * is not session protected, this must NOT overwrite m_sessionActive + * if it is already set since a commit would reset the transient bits + * without first clearing addresses them from the writeCacheService + */ + + m_sessionActive = m_sessionActive || m_store.isSessionProtected(); try { if (((AllocBlock) m_allocBlocks.get(block)) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-06-21 15:23:36 UTC (rev 4753) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-06-21 16:08:06 UTC (rev 4754) @@ -4545,5 +4545,9 @@ log.warn("WriteCacheDebug: " + paddr + " - " + m_writeCache.addrDebugInfo(paddr)); } + public CounterSet getWriteCacheCounters() { + return m_writeCache.getCounters(); + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2011-06-21 17:19:12
|
Revision: 4756 http://bigdata.svn.sourceforge.net/bigdata/?rev=4756&view=rev Author: martyncutcher Date: 2011-06-21 17:19:05 +0000 (Tue, 21 Jun 2011) Log Message: ----------- backout change to writeCache retaining serviceRecordMap and instead drop interface to force calling of flushWithRest Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-06-21 16:21:38 UTC (rev 4755) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-06-21 17:19:05 UTC (rev 4756) @@ -927,27 +927,6 @@ } /** - * Variant which resets the cache if it was successfully flushed. - */ - public void flushAndReset(final boolean force) throws IOException, InterruptedException { - - try { - - if (!flushAndReset(force, true/* reset */, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { - - throw new RuntimeException(); - - } - - } catch (TimeoutException e) { - - throw new RuntimeException(e); - - } - - } - - /** * Flush the writes to the backing channel but DOES NOT sync the channel and * DOES NOT {@link #reset()} the {@link WriteCache}. {@link #reset()} is a * separate operation because a common use is to retain recently flushed @@ -964,30 +943,6 @@ public boolean flush(final boolean force, final long timeout, final TimeUnit unit) throws IOException, TimeoutException, InterruptedException { - return flushAndReset(force, false/* reset */, timeout, unit); - - } - - /** - * Core impl. - * - * @param forceIsIgnored - * ignored (deprecated). - * @param reset - * When <code>true</code>, does atomic reset IFF the flush was - * successful in the allowed time while holding the lock to - * prevent new records from being written onto the buffer - * concurrently. - * @param timeout - * @param unit - * @return - * @throws IOException - * @throws TimeoutException - * @throws InterruptedException - */ - private boolean flushAndReset(final boolean forceIsIgnored, final boolean reset, final long timeout, - final TimeUnit unit) throws IOException, TimeoutException, InterruptedException { - // start time final long begin = System.nanoTime(); @@ -1050,27 +1005,11 @@ remaining); if (!ret) { - throw new IllegalStateException("Unable to flush WriteCache"); + throw new TimeoutException("Unable to flush WriteCache"); } counters.nflush++; - if (reset) { - - /* - * Atomic reset while holding the lock to prevent new - * records from being written onto the buffer concurrently. - * - * FIXME: If the WriteCache is used directly then this makes - * sense, but if called from WriteCacheService then this must - * always clear the "master" recordMap of the WriteCacheService - * - */ - - reset(); - - } - return ret; } @@ -1194,41 +1133,6 @@ */ public void reset() throws InterruptedException { - final Iterator<Long> entries = recordMap.keySet().iterator(); - - if (serviceRecordMap != null && entries.hasNext()) { - if (log.isInfoEnabled()) - log.info("resetting existing WriteCache: nrecords=" + recordMap.size() + ", hashCode=" + hashCode()); - - while (entries.hasNext()) { - final Long addr = entries.next(); - - /* - * We need to guard against the possibility that the entry in - * the service record map has been updated concurrently such - * that it now points to a different WriteCache instance. This - * is possible (for the RWStore) if a recently freed record has - * been subsequently reallocated on a different WriteCache. - * Using the conditional remove on ConcurrentMap guards against - * this. - */ - boolean removed = serviceRecordMap.remove(addr, this); - - registerWriteStatus(addr, 0, removed ? 'R' : 'L'); - - } - - } else { - if (log.isInfoEnabled()) - log.info("clean WriteCache: hashCode=" + hashCode()); // debug - // to - // see - // recycling - if (m_written) { - log.warn("Written WriteCache but with no records"); - } - } - final Lock writeLock = lock.writeLock(); writeLock.lockInterruptibly(); @@ -1921,8 +1825,6 @@ boolean m_written = false; - ConcurrentMap<Long, WriteCache> serviceRecordMap; - private long lastOffset; /** @@ -1939,8 +1841,40 @@ public void resetWith(final ConcurrentMap<Long, WriteCache> serviceRecordMap, final long fileExtent) throws InterruptedException { - this.serviceRecordMap = serviceRecordMap; + final Iterator<Long> entries = recordMap.keySet().iterator(); + if (serviceRecordMap != null && entries.hasNext()) { + if (log.isInfoEnabled()) + log.info("resetting existing WriteCache: nrecords=" + recordMap.size() + ", hashCode=" + hashCode()); + + while (entries.hasNext()) { + final Long addr = entries.next(); + + /* + * We need to guard against the possibility that the entry in + * the service record map has been updated concurrently such + * that it now points to a different WriteCache instance. This + * is possible (for the RWStore) if a recently freed record has + * been subsequently reallocated on a different WriteCache. + * Using the conditional remove on ConcurrentMap guards against + * this. + */ + boolean removed = serviceRecordMap.remove(addr, this); + + registerWriteStatus(addr, 0, removed ? 'R' : 'L'); + + } + + } else { + if (log.isInfoEnabled()) + log.info("clean WriteCache: hashCode=" + hashCode()); // debug + // to + // see + // recycling + if (m_written) { + log.warn("Written WriteCache but with no records"); + } + } reset(); // must ensure reset state even if cache already empty setFileExtent(fileExtent); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-06-21 16:21:38 UTC (rev 4755) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-06-21 17:19:05 UTC (rev 4756) @@ -565,8 +565,9 @@ * if it is already set since a commit would reset the transient bits * without first clearing addresses them from the writeCacheService */ - - m_sessionActive = m_sessionActive || m_store.isSessionProtected(); + final boolean tmp = m_sessionActive; + m_sessionActive = m_store.isSessionProtected(); + if (tmp && !m_sessionActive) throw new AssertionError(); try { if (((AllocBlock) m_allocBlocks.get(block)) @@ -866,7 +867,7 @@ m_statsBucket = b; } - public void releaseSession(RWWriteCacheService cache) { + void releaseSession(RWWriteCacheService cache) { if (m_context != null) { throw new IllegalStateException("Calling releaseSession on shadowed allocator"); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |