|
From: <tho...@us...> - 2010-08-06 15:59:49
|
Revision: 3424
http://bigdata.svn.sourceforge.net/bigdata/?rev=3424&view=rev
Author: thompsonbry
Date: 2010-08-06 15:59:39 +0000 (Fri, 06 Aug 2010)
Log Message:
-----------
Merged from trunk [r3407:r3423].
Modified Paths:
--------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bfs/BigdataFileSystem.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractBTree.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTree.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndexSegment.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractLocalTransactionManager.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/WriteExecutorService.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/IMutableResource.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/RelationFusedView.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/locator/ILocatableResource.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/AsynchronousOverflowTask.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/BTreeMetadata.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/IndexManager.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/OverflowManager.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/StoreManager.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractFederation.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/DataService.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/DistributedTransactionService.java
branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/btree/AbstractIndexSegmentTestCase.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/btree/TestIndexSegmentMultiBlockIterators.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestTransactionService.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerTestCase.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/resources/TestReleaseResources.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/StressTestConcurrent.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestDistributedTransactionServiceRestart.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTask.java
branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/service/jini/TransactionServer.java
branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/service/jini/lookup/AbstractCachingServiceClient.java
branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/service/jini/master/TaskMaster.java
branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/service/jini/util/DumpFederation.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DefaultExtensionFactory.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/IExtension.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/IExtensionFactory.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/ILexiconConfiguration.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/LexiconConfiguration.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rules/AbstractRuleFastClosure_3_5_6_7_9.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/LocalTripleStore.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/util/Splitter.config
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/ColorsEnumExtension.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/EpochExtension.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/SampleExtensionFactory.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/TestEncodeDecodeKeys.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestAsynchronousStatementBufferFactory.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/TestAll.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/TestScaleOutTripleStoreWithEmbeddedFederation.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/TestScaleOutTripleStoreWithJiniFederation.java
branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java
branches/QUADS_QUERY_BRANCH/build.xml
branches/QUADS_QUERY_BRANCH/src/resources/config/README
branches/QUADS_QUERY_BRANCH/src/resources/config/bigdataCluster.config
branches/QUADS_QUERY_BRANCH/src/resources/config/bigdataCluster16.config
Added Paths:
-----------
branches/QUADS_QUERY_BRANCH/src/resources/config/bigdataStandalone.config
branches/QUADS_QUERY_BRANCH/src/resources/scripts/dumpFed.sh
branches/QUADS_QUERY_BRANCH/src/resources/scripts/nanoSparqlServer.sh
Property Changed:
----------------
branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/attr/
branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/disco/
branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/util/config/
branches/QUADS_QUERY_BRANCH/bigdata-perf/
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/util/
branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/
branches/QUADS_QUERY_BRANCH/dsi-utils/LEGAL/
branches/QUADS_QUERY_BRANCH/dsi-utils/lib/
branches/QUADS_QUERY_BRANCH/dsi-utils/src/
branches/QUADS_QUERY_BRANCH/lgpl-utils/src/java/it/unimi/dsi/fastutil/bytes/custom/
branches/QUADS_QUERY_BRANCH/lgpl-utils/src/test/it/unimi/dsi/fastutil/bytes/custom/
branches/QUADS_QUERY_BRANCH/osgi/
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bfs/BigdataFileSystem.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bfs/BigdataFileSystem.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bfs/BigdataFileSystem.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -420,7 +420,7 @@
}
}
-
+
/**
* Note: A commit is required in order for a read-committed view to have
* access to the registered indices. When running against an
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractBTree.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractBTree.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractBTree.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -2840,7 +2840,8 @@
* might also want to limit the maximum size of the reads.
*/
- final DirectBufferPool pool = DirectBufferPool.INSTANCE_10M;
+// final DirectBufferPool pool = DirectBufferPool.INSTANCE_10M;
+ final DirectBufferPool pool = DirectBufferPool.INSTANCE;
if (true
&& ((flags & REVERSE) == 0)
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTree.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTree.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTree.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -644,7 +644,18 @@
this.lastCommitTime = lastCommitTime;
}
- private long lastCommitTime = 0L;// Until the first commit.
+
+ /**
+ * The lastCommitTime of the {@link Checkpoint} record from which the
+ * {@link BTree} was loaded.
+ * <p>
+ * Note: Made volatile on 8/2/2010 since it is not otherwise obvious what
+ * would guarantee visibility of this field, through I do seem to remember
+ * that visibility might be guaranteed by how the BTree class is discovered
+ * and returned to the class. Still, it does no harm to make this a volatile
+ * read.
+ */
+ volatile private long lastCommitTime = 0L;// Until the first commit.
/**
* Return the {@link IDirtyListener}.
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndexSegment.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndexSegment.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/DumpIndexSegment.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -36,6 +36,7 @@
import org.apache.log4j.Logger;
import com.bigdata.btree.IndexSegment.ImmutableNodeFactory.ImmutableLeaf;
+import com.bigdata.io.DirectBufferPool;
import com.bigdata.journal.DumpJournal;
import com.bigdata.rawstore.IRawStore;
@@ -154,6 +155,16 @@
}
+ // multi-block scan of the index segment.
+ boolean multiBlockScan = false; // @todo command line option.
+ if (multiBlockScan) {
+
+ writeBanner("dump leaves using multi-block forward scan");
+
+ dumpLeavesMultiBlockForwardScan(store);
+
+ }
+
// dump the leaves using a fast reverse scan.
boolean fastReverseScan = true;// @todo command line option
if (fastReverseScan) {
@@ -524,6 +535,36 @@
}
+ /**
+ * Dump leaves using the {@link IndexSegmentMultiBlockIterator}.
+ *
+ * @param store
+ */
+ static void dumpLeavesMultiBlockForwardScan(final IndexSegmentStore store) {
+
+ final long begin = System.currentTimeMillis();
+
+ final IndexSegment seg = store.loadIndexSegment();
+
+ final ITupleIterator<?> itr = new IndexSegmentMultiBlockIterator(seg, DirectBufferPool.INSTANCE,
+ null/* fromKey */, null/* toKey */, IRangeQuery.DEFAULT/* flags */);
+
+ int nscanned = 0;
+
+ while(itr.hasNext()) {
+
+ itr.next();
+
+ nscanned++;
+
+ }
+
+ final long elapsed = System.currentTimeMillis() - begin;
+
+ System.out.println("Visited "+nscanned+" tuples using multi-block forward scan in "+elapsed+" ms");
+
+ }
+
static void writeBanner(String s) {
System.out.println(bar);
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -218,12 +218,12 @@
*/
public final static DirectBufferPool INSTANCE;
- /**
- * A JVM-wide pool of direct {@link ByteBuffer}s with a default
- * {@link Options#BUFFER_CAPACITY} of <code>10 MB</code>. The main use case
- * for the 10M buffers are multi-block IOs for the {@link IndexSegment}s.
- */
- public final static DirectBufferPool INSTANCE_10M;
+// /**
+// * A JVM-wide pool of direct {@link ByteBuffer}s with a default
+// * {@link Options#BUFFER_CAPACITY} of <code>10 MB</code>. The main use case
+// * for the 10M buffers are multi-block IOs for the {@link IndexSegment}s.
+// */
+// public final static DirectBufferPool INSTANCE_10M;
/**
* An unbounded list of all {@link DirectBufferPool} instances.
@@ -251,11 +251,11 @@
bufferCapacity//
);
- INSTANCE_10M = new DirectBufferPool(//
- "10M",//
- Integer.MAX_VALUE, // poolCapacity
- 10 * Bytes.megabyte32 // bufferCapacity
- );
+// INSTANCE_10M = new DirectBufferPool(//
+// "10M",//
+// Integer.MAX_VALUE, // poolCapacity
+// 10 * Bytes.megabyte32 // bufferCapacity
+// );
/*
* This configuration will block if there is a concurrent demand for
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractLocalTransactionManager.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractLocalTransactionManager.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractLocalTransactionManager.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -7,6 +7,7 @@
import com.bigdata.counters.CounterSet;
import com.bigdata.counters.Instrument;
+import com.bigdata.resources.StoreManager;
import com.bigdata.service.IBigdataFederation;
import com.bigdata.service.IDataService;
@@ -171,16 +172,18 @@
* Delay between attempts reach the remote service (ms).
*/
final long delay = 10L;
-
- /**
- * #of attempts to reach the remote service.
- *
- * Note: delay*maxtries == 1000ms of trying before we give up.
- *
- * If this is not enough, then consider adding an optional parameter giving
- * the time the caller will wait and letting the StoreManager wait longer
- * during startup to discover the timestamp service.
- */
+
+ /**
+ * #of attempts to reach the remote service.
+ * <p>
+ * Note: delay*maxtries == 1000ms of trying before we give up, plus however
+ * long we are willing to wait for service discovery if the problem is
+ * locating the {@link ITransactionService}.
+ * <p>
+ * If this is not enough, then consider adding an optional parameter giving
+ * the time the caller will wait and letting the {@link StoreManager} wait
+ * longer during startup to discover the timestamp service.
+ */
final int maxtries = 100;
/**
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/WriteExecutorService.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/WriteExecutorService.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/WriteExecutorService.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -1752,11 +1752,11 @@
//
// }
- /**
- * Flag may be set to force overflow processing during the next group
- * commit. The flag is cleared once an overflow has occurred.
- */
- public final AtomicBoolean forceOverflow = new AtomicBoolean(false);
+// /**
+// * Flag may be set to force overflow processing during the next group
+// * commit. The flag is cleared once an overflow has occurred.
+// */
+// public final AtomicBoolean forceOverflow = new AtomicBoolean(false);
/**
* Return <code>true</code> if the pre-conditions for overflow processing
@@ -1765,7 +1765,8 @@
private boolean isShouldOverflow() {
return resourceManager.isOverflowEnabled()
- && (forceOverflow.get() || resourceManager.shouldOverflow());
+// && (forceOverflow.get() || resourceManager.shouldOverflow());
+ && resourceManager.shouldOverflow();
}
@@ -1815,10 +1816,10 @@
log.error("Overflow error: "+serviceName+" : "+t, t);
- } finally {
-
- // clear force flag.
- forceOverflow.set(false);
+// } finally {
+//
+// // clear force flag.
+// forceOverflow.set(false);
}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -582,9 +582,21 @@
}
/**
+ * The default implementation only logs the event.
+ */
+ public AbstractResource<E> init() {
+
+ if (log.isInfoEnabled())
+ log.info(toString());
+
+ return this;
+
+ }
+
+ /**
*
* @todo Lock service supporting shared locks, leases and lease renewal,
- * excalation of shared locks to exclusive locks, deadlock detection,
+ * escalation of shared locks to exclusive locks, deadlock detection,
* and possibly a resource hierarchy. Leases should be Callable
* objects that are submitted by the client to its executor service so
* that they will renew automatically until cancelled (and will cancel
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/IMutableResource.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/IMutableResource.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/IMutableResource.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -38,7 +38,10 @@
public interface IMutableResource<T> extends ILocatableResource<T> {
/**
- * Create any logically contained resources (relations, indices).
+ * Create any logically contained resources (relations, indices). There is
+ * no presumption that {@link #init()} is suitable for invocation from
+ * {@link #create()}. Instead, you are responsible for invoking {@link #init()}
+ * from this method IFF it is appropriate to reuse its initialization logic.
*/
void create();
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/RelationFusedView.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/RelationFusedView.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/RelationFusedView.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -21,8 +21,8 @@
*/
public class RelationFusedView<E> implements IRelation<E> {
- private IRelation<E> relation1;
- private IRelation<E> relation2;
+ final private IRelation<E> relation1;
+ final private IRelation<E> relation2;
public IRelation<E> getRelation1() {
@@ -36,6 +36,13 @@
}
+ // NOP
+ public RelationFusedView<E> init() {
+
+ return this;
+
+ }
+
/**
*
* @param relation1
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -586,6 +586,8 @@
properties //
});
+ r.init();
+
if(INFO) {
log.info("new instance: "+r);
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/locator/ILocatableResource.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/locator/ILocatableResource.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/locator/ILocatableResource.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -45,6 +45,13 @@
*/
public interface ILocatableResource<T> {
+ /**
+ * Deferred initialization method is automatically invoked when the resource
+ * is materialized by the {@link IResourceLocator}. The implementation is
+ * encouraged to strengthen the return type.
+ */
+ public ILocatableResource<T> init();
+
/**
* The identifying namespace.
*/
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -1074,16 +1074,20 @@
final UUID sinkUUID = locator.getDataServiceUUID();
+ final IDataService dataService;
if (sinkUUID.equals(fed.getServiceUUID())) {
- /*
- * @todo As an optimization, special case when the downstream
- * data service is _this_ data service.
- */
+ /*
+ * As an optimization, special case when the downstream
+ * data service is _this_ data service.
+ */
+ dataService = (IDataService)fed.getService();
+ } else {
+
+ dataService = fed.getDataService(sinkUUID);
+
}
-
- final IDataService dataService = fed.getDataService(sinkUUID);
sink = new JoinTaskSink(fed, locator, this);
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/AsynchronousOverflowTask.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/AsynchronousOverflowTask.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/AsynchronousOverflowTask.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -310,6 +310,7 @@
private final OverflowActionEnum action;
private final ViewMetadata vmd;
+ private final boolean forceCompactingMerge;
private final AbstractTask<T> task;
/**
@@ -319,11 +320,17 @@
* @param vmd
* The {@link ViewMetadata} for the index partition for which
* that action will be taken.
+ * @param forceCompactingMerge
+ * if a compacting merge should be taken even if the view was
+ * simply copied to the new journal.
* @param task
* The task which implements that action.
*/
- public AtomicCallable(final OverflowActionEnum action,
- final ViewMetadata vmd, final AbstractTask<T> task) {
+ public AtomicCallable(final OverflowActionEnum action,//
+ final ViewMetadata vmd,//
+ final boolean forceCompactingMerge, //
+ final AbstractTask<T> task//
+ ) {
if (action == null)
throw new IllegalArgumentException();
@@ -337,6 +344,8 @@
this.action = action;
this.vmd = vmd;
+
+ this.forceCompactingMerge = forceCompactingMerge;
this.task = task;
@@ -407,110 +416,112 @@
}
- /**
- * Schedule a build for each shard and a merge for each shard with a
- * non-zero merge priority. Whether a build or a merge is performed for a
- * shard will depend on which action is initiated first. When an build or
- * merge action is initiated, that choice is atomically registered on the
- * {@link ViewMetadata} and any subsequent attempt (within this method
- * invocation) to start a build or merge for the same shard will be dropped.
- * Processing ends once all tasks scheduled on a "build" service are
- * complete.
- * <p>
- * After actions are considered for each shard for which a compacting merge
- * is executed. These after actions can cause a shard split, join, or move.
- * Deferring such actions until we have a compact view (comprised of one
- * journal and one index segment) greatly improves our ability to decide
- * whether a shard should be split or joined and simplifies the logic and
- * effort required to split, join or move a shard.
- * <p>
- * The following is a brief summary of some after actions on compact shards.
- * <dl>
- * <dt>split</dt>
- * <dd>A shard is split when its size on the disk exceeds the (adjusted)
- * nominal size of a shard (overflow). By waiting until the shard view is
- * compact we have exact information about the size of the shard (it is
- * contained in a single {@link IndexSegment}) and we are able to easily
- * select the separator key to split the shard.</dd>
- * <dt>tailSplit</dt>
- * <dd>A tail split may be selected for a shard which has a mostly append
- * access pattern. For such access patterns, a normal split would leave the
- * left sibling 50% full and the right sibling would quickly fill up with
- * continued writes on the tail of the key range. To compensate for this
- * access pattern, a tail split chooses a separator key near the end of the
- * key range of a shard. This results in a left sibling which is mostly full
- * and a right sibling which is mostly empty. If the pattern of heavy tail
- * append continues, then the left sibling will remain mostly full and the
- * new writes will flow mostly into the right sibling.</dd>
- * <dt>scatterSplit</dt>
- * <dd>A scatter split breaks the first shard for a new scale-out index into
- * N shards and scatters those shards across the data services in a
- * federation in order to improve the data distribution and potential
- * concurrency of the index. By waiting until the shard view is compact we
- * are able to quickly select appropriate separator keys for the shard
- * splits.</dd>
- * <dt>move</dt>
- * <dd>A move transfer a shard from this data service to another data
- * service in order to reduce the load on this data service. By waiting
- * until the shard view is compact we are able to rapidly transfer the bulk
- * of the data in the form of a single {@link IndexSegment}.</dd>
- * <dt>join</dt>
- * <dd>A join combines a shard which is under 50% of its (adjusted) nominal
- * maximum size on the disk (underflow) with its right sibling. Joins are
- * driven by deletes of tuples from a key range. Since deletes are handled
- * as writes where a delete marker is set on the tuple, neither the shard
- * size on the disk nor the range count of the shard will decrease until a
- * compacting merge. A join is indicated if the size on disk for the shard
- * has shrunk considerably since the last time a compacting merge was
- * performed for the view (this covers both the case of deletes, which
- * reduce the range count, and updates which replace the values in the
- * tuples with more compact data). <br>
- * There are actually three cases for a join.
- * <ol>
- * <li>If the right sibling is local, then the shard will be joined with its
- * right sibling.</li>
- * <li>If the right sibling is remote, then the shard will be moved to the
- * data service on which the right sibling is found.</li>
- * <li>If the right sibling does not exist, then nothing is done (the last
- * shard in a scale-out index does not have a right sibling). The right most
- * sibling will remain undercapacity until and unless its left sibling also
- * underflows, at which point the left sibling will cause itself to be
- * joined with the right sibling (this is done to simplify the logic which
- * searches for a sibling with which to join an undercapacity shard).</li>
- * </ol>
- * </dl>
- *
- * @param forceCompactingMerges
- * When <code>true</code> a compacting merge will be forced for
- * each non-compact view.
- *
- * @throws InterruptedException
- *
- * @todo The size of the merge queue (or its sum of priorities) may be an
- * indication of the load of the node which could be used to decide
- * that index partitions should be shed/moved.
- *
- * @todo For HA, this needs to be a shared priority queue using zk or the
- * like since any node in the failover set could do the merge (or
- * build). [Alternatively, nodes do the build/merge for the shards for
- * which they have the highest affinity out of the failover set.]
- *
- * FIXME tailSplits currently operate on the mutable BTree rather than
- * a compact view). This task does not require a compact view (at
- * least, not yet) and generating one for it might be a waste of time.
- * Instead it examines where the inserts are occurring in the index
- * and splits of the tail if the index is heavy for write append. It
- * probably could defer that choice until a compact view was some
- * percentage of a split (maybe .6?) So, probably an after action for
- * the mergeQ.
- *
- * FIXME joins must track metadata about the previous size on disk of
- * the compact view in order to decide when underflow has resulted. In
- * order to handle the change in the value of the acceleration factor,
- * this data should be stored as the percentage of an adjusted split
- * of the last compact view. We can update that metadata each time we
- * do a compacting merge.
- */
+ /**
+ * Schedule a build for each shard and a merge for each shard with a
+ * non-zero merge priority. Whether a build or a merge is performed for a
+ * shard will depend on which action is initiated first. When an build or
+ * merge action is initiated, that choice is atomically registered on the
+ * {@link ViewMetadata} and any subsequent attempt (within this method
+ * invocation) to start a build or merge for the same shard will be dropped.
+ * Processing ends once all tasks scheduled on a "build" service are
+ * complete.
+ * <p>
+ * After actions are considered for each shard for which a compacting merge
+ * is executed. These after actions can cause a shard split, join, or move.
+ * Deferring such actions until we have a compact view (comprised of one
+ * journal and one index segment) greatly improves our ability to decide
+ * whether a shard should be split or joined and simplifies the logic and
+ * effort required to split, join or move a shard.
+ * <p>
+ * The following is a brief summary of some after actions on compact shards.
+ * <dl>
+ * <dt>split</dt>
+ * <dd>A shard is split when its size on the disk exceeds the (adjusted)
+ * nominal size of a shard (overflow). By waiting until the shard view is
+ * compact we have exact information about the size of the shard (it is
+ * contained in a single {@link IndexSegment}) and we are able to easily
+ * select the separator key to split the shard.</dd>
+ * <dt>tailSplit</dt>
+ * <dd>A tail split may be selected for a shard which has a mostly append
+ * access pattern. For such access patterns, a normal split would leave the
+ * left sibling 50% full and the right sibling would quickly fill up with
+ * continued writes on the tail of the key range. To compensate for this
+ * access pattern, a tail split chooses a separator key near the end of the
+ * key range of a shard. This results in a left sibling which is mostly full
+ * and a right sibling which is mostly empty. If the pattern of heavy tail
+ * append continues, then the left sibling will remain mostly full and the
+ * new writes will flow mostly into the right sibling.</dd>
+ * <dt>scatterSplit</dt>
+ * <dd>A scatter split breaks the first shard for a new scale-out index into
+ * N shards and scatters those shards across the data services in a
+ * federation in order to improve the data distribution and potential
+ * concurrency of the index. By waiting until the shard view is compact we
+ * are able to quickly select appropriate separator keys for the shard
+ * splits.</dd>
+ * <dt>move</dt>
+ * <dd>A move transfer a shard from this data service to another data
+ * service in order to reduce the load on this data service. By waiting
+ * until the shard view is compact we are able to rapidly transfer the bulk
+ * of the data in the form of a single {@link IndexSegment}.</dd>
+ * <dt>join</dt>
+ * <dd>A join combines a shard which is under 50% of its (adjusted) nominal
+ * maximum size on the disk (underflow) with its right sibling. Joins are
+ * driven by deletes of tuples from a key range. Since deletes are handled
+ * as writes where a delete marker is set on the tuple, neither the shard
+ * size on the disk nor the range count of the shard will decrease until a
+ * compacting merge. A join is indicated if the size on disk for the shard
+ * has shrunk considerably since the last time a compacting merge was
+ * performed for the view (this covers both the case of deletes, which
+ * reduce the range count, and updates which replace the values in the
+ * tuples with more compact data). <br>
+ * There are actually three cases for a join.
+ * <ol>
+ * <li>If the right sibling is local, then the shard will be joined with its
+ * right sibling.</li>
+ * <li>If the right sibling is remote, then the shard will be moved to the
+ * data service on which the right sibling is found.</li>
+ * <li>If the right sibling does not exist, then nothing is done (the last
+ * shard in a scale-out index does not have a right sibling). The right most
+ * sibling will remain undercapacity until and unless its left sibling also
+ * underflows, at which point the left sibling will cause itself to be
+ * joined with the right sibling (this is done to simplify the logic which
+ * searches for a sibling with which to join an undercapacity shard).</li>
+ * </ol>
+ * </dl>
+ *
+ * @param forceCompactingMerges
+ * When <code>true</code> a compacting merge will be forced for
+ * each non-compact view. Compacting merges will be taken in
+ * priority order and will continue until finished or until the
+ * journal is nearing its nominal maximum extent.
+ *
+ * @throws InterruptedException
+ *
+ * @todo The size of the merge queue (or its sum of priorities) may be an
+ * indication of the load of the node which could be used to decide
+ * that index partitions should be shed/moved.
+ *
+ * @todo For HA, this needs to be a shared priority queue using zk or the
+ * like since any node in the failover set could do the merge (or
+ * build). [Alternatively, nodes do the build/merge for the shards for
+ * which they have the highest affinity out of the failover set.]
+ *
+ * FIXME tailSplits currently operate on the mutable BTree rather than
+ * a compact view). This task does not require a compact view (at
+ * least, not yet) and generating one for it might be a waste of time.
+ * Instead it examines where the inserts are occurring in the index
+ * and splits of the tail if the index is heavy for write append. It
+ * probably could defer that choice until a compact view was some
+ * percentage of a split (maybe .6?) So, probably an after action for
+ * the mergeQ.
+ *
+ * FIXME joins must track metadata about the previous size on disk of
+ * the compact view in order to decide when underflow has resulted. In
+ * order to handle the change in the value of the acceleration factor,
+ * this data should be stored as the percentage of an adjusted split
+ * of the last compact view. We can update that metadata each time we
+ * do a compacting merge.
+ */
private List<Future<?>> scheduleAndAwaitTasks(
final boolean forceCompactingMerges) throws InterruptedException {
@@ -554,21 +565,30 @@
if (log.isInfoEnabled())
log.info("was copied : " + vmd);
- continue;
+ } else {
+ buildList.add(new Priority<ViewMetadata>(vmd.buildPriority, vmd));
+
}
- buildList.add(new Priority<ViewMetadata>(vmd.buildPriority, vmd));
+ if (vmd.mergePriority > 0d || forceCompactingMerges) {
- if (vmd.mergePriority > 0d) {
+ /*
+ * Schedule a merge if the priority is non-zero or if compacting
+ * merges are being forced.
+ */
- mergeList
- .add(new Priority<ViewMetadata>(vmd.mergePriority, vmd));
+ mergeList
+ .add(new Priority<ViewMetadata>(vmd.mergePriority, vmd));
}
} // itr.hasNext()
+ if(log.isInfoEnabled()) {
+ log.info("Scheduling tasks: buildList="+buildList.size()+", mergeList="+mergeList.size());
+ }
+
/*
* Schedule build and merge tasks and await their futures. The tasks are
* submitted from a PriorityQueue, so the order in which the tasks are
@@ -606,18 +626,23 @@
resourceManager.mergeServiceCorePoolSize);
// Schedule merge tasks.
- if (!forceCompactingMerges) {
-
for (Priority<ViewMetadata> p : mergeList) {
final ViewMetadata vmd = p.v;
- if (vmd.mergePriority > 0) {
+ if (vmd.mergePriority > 0 || forceCompactingMerges) {
+ if(forceCompactingMerges && vmd.getAction().equals(OverflowActionEnum.Copy)) {
+
+ vmd.clearCopyAction();
+
+ }
+
// Schedule a compacting merge.
final FutureTask<?> ft = new FutureTask(
new AtomicCallable(OverflowActionEnum.Merge,
- vmd, new CompactingMergeTask(vmd)));
+ vmd, forceCompactingMerges,
+ new CompactingMergeTask(vmd)));
mergeFutures.add(ft);
mergeService.execute(ft);
@@ -625,8 +650,6 @@
}
- }
-
// Schedule build tasks.
for (Priority<ViewMetadata> p : buildList) {
@@ -636,7 +659,8 @@
// Force a compacting merge.
final FutureTask<?> ft = new FutureTask(new AtomicCallable(
- OverflowActionEnum.Merge, vmd,
+ OverflowActionEnum.Merge, vmd,
+ forceCompactingMerges,
new CompactingMergeTask(vmd)));
mergeFutures.add(ft);
mergeService.execute(ft);
@@ -646,6 +670,7 @@
// Schedule a build.
final FutureTask<?> ft = new FutureTask(new AtomicCallable(
OverflowActionEnum.Build, vmd,
+ forceCompactingMerges,
new IncrementalBuildTask(vmd)));
buildFutures.add(ft);
buildService.execute(ft);
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/BTreeMetadata.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/BTreeMetadata.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/BTreeMetadata.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -280,6 +280,25 @@
actionRef.set(action);
}
+
+ /**
+ * Used to force clear a {@link OverflowActionEnum#Copy} action
+ * when we will force a compacting merge. This allows us to do
+ * compacting merges on shard views which would otherwise simply
+ * be copied onto the new journal.
+ */
+ void clearCopyAction() {
+
+ lock.lock();
+ try {
+ if(actionRef.get().equals(OverflowActionEnum.Copy)) {
+ actionRef.set(null/*clear*/);
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ }
/**
*
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/IndexManager.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/IndexManager.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/IndexManager.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -1684,16 +1684,28 @@
final StringBuilder sb = new StringBuilder();
final AbstractJournal journal = getJournal(timestamp);
+
+ if (journal == null) {
+ /*
+ * This condition can occur if there are no shard views on the
+ * previous journal and the releaseAge is zero since the previous
+ * journal can be purged (deleted) before this method is invoked.
+ * This situation arises in a few of the unit tests which begin with
+ * an empty journal and copy everything onto the new journal such
+ * that the old journal can be immediately released.
+ */
+ return "No journal: timestamp=" + timestamp;
+ }
sb.append("timestamp="+timestamp+"\njournal="+journal.getResourceMetadata());
// historical view of Name2Addr as of that timestamp.
- final ITupleIterator itr = journal.getName2Addr(timestamp)
+ final ITupleIterator<?> itr = journal.getName2Addr(timestamp)
.rangeIterator();
while (itr.hasNext()) {
- final ITuple tuple = itr.next();
+ final ITuple<?> tuple = itr.next();
final Entry entry = EntrySerializer.INSTANCE
.deserialize(new DataInputBuffer(tuple.getValue()));
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/OverflowManager.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/OverflowManager.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/OverflowManager.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -287,6 +287,14 @@
*/
protected final AtomicBoolean asyncOverflowEnabled = new AtomicBoolean(true);
+ /**
+ * Flag may be set to force overflow processing during the next group
+ * commit. The flag is cleared by {@link #overflow()}.
+ *
+ * @see DataService#forceOverflow(boolean, boolean)
+ */
+ public final AtomicBoolean forceOverflow = new AtomicBoolean(false);
+
/**
* A flag that may be set to force the next asynchronous overflow to perform
* a compacting merge for all indices that are not simply copied over to the
@@ -295,6 +303,8 @@
* made compact and SHOULD NOT be used for deployed federations</strong>).
* The state of the flag is cleared each time asynchronous overflow
* processing begins.
+ *
+ * @see DataService#forceOverflow(boolean, boolean)
*/
public final AtomicBoolean compactingMerge = new AtomicBoolean(false);
@@ -1849,6 +1859,19 @@
*/
public boolean shouldOverflow() {
+ if(forceOverflow.get()) {
+
+ /*
+ * Note: forceOverflow trumps everything else.
+ */
+
+ if (log.isInfoEnabled())
+ log.info("Forcing overflow.");
+
+ return true;
+
+ }
+
if (isTransient()) {
/*
@@ -1886,7 +1909,7 @@
return false;
}
-
+
/*
* Look for overflow condition on the "live" journal.
*/
@@ -1959,8 +1982,18 @@
*/
public Future<Object> overflow() {
- assert overflowAllowed.get();
+// assert overflowAllowed.get();
+ /*
+ * Atomically test and clear the flag. The local boolean is inspected
+ * below. When true, asynchronous overflow processing will occur unless
+ * an error occurs during synchronous overflow processing. This ensures
+ * that we can force a compacting merge on the shards of a data service
+ * even if that data service has not buffer sufficient writes to warrant
+ * a build on any of the index segments.
+ */
+ final boolean forceOverflow = this.forceOverflow.getAndSet(false/* newValue */);
+
final Event e = new Event(getFederation(), new EventResource(),
EventType.SynchronousOverflow).addDetail(
"synchronousOverflowCounter",
@@ -1982,7 +2015,12 @@
if (asyncOverflowEnabled.get()) {
- if (overflowMetadata.postProcess) {
+ /*
+ * Do overflow processing if overflow is being forced OR if we
+ * need to do a build for at least one index partition.
+ */
+
+ if (forceOverflow || overflowMetadata.postProcess) {
/*
* Post-processing SHOULD be performed.
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/StoreManager.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -1416,22 +1416,45 @@
* Verify that the concurrency manager has been set and wait a while
* it if is not available yet.
*/
- if (log.isInfoEnabled())
- log.info("Waiting for concurrency manager");
- for (int i = 0; i < 5; i++) {
- try {
- getConcurrencyManager(); break;
- } catch (IllegalStateException ex) {
- Thread.sleep(100/* ms */);
- }
+ {
+ int nwaits = 0;
+ while (true) {
+ try {
+ getConcurrencyManager();
+ break;
+ } catch (IllegalStateException ex) {
+ Thread.sleep(100/* ms */);
+ if (++nwaits % 50 == 0)
+ log.warn("Waiting for concurrency manager");
+ }
+ }
}
- getConcurrencyManager();
- if (Thread.interrupted())
- throw new InterruptedException();
- /*
- * Look for pre-existing data files.
- */
+ try {
+ final IBigdataFederation<?> fed = getFederation();
+ if (fed == null) {
+ /*
+ * Some of the unit tests do not start the txs until after
+ * the DataService. For those unit tests getFederation()
+ * will return null during startup() of the DataService. To
+ * have a common code path, we throw the exception here
+ * which is caught below.
+ */
+ throw new UnsupportedOperationException();
+ }
+ while (true) {
+ if (fed.getTransactionService() != null) {
+ break;
+ }
+ log.warn("Waiting for transaction service discovery");
+ }
+ } catch (UnsupportedOperationException ex) {
+ log.warn("Federation not available - running in test case?");
+ }
+
+ /*
+ * Look for pre-existing data files.
+ */
if (!isTransient) {
if (log.isInfoEnabled())
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractFederation.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractFederation.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractFederation.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -829,7 +829,7 @@
}
/**
- * Delegated.
+ * Delegated. {@inheritDoc}
*/
public T getService() {
@@ -840,7 +840,7 @@
}
/**
- * Delegated.
+ * Delegated. {@inheritDoc}
*/
public String getServiceName() {
@@ -851,7 +851,7 @@
}
/**
- * Delegated.
+ * Delegated. {@inheritDoc}
*/
public Class getServiceIface() {
@@ -862,7 +862,7 @@
}
/**
- * Delegated.
+ * Delegated. {@inheritDoc}
*/
public UUID getServiceUUID() {
@@ -873,7 +873,7 @@
}
/**
- * Delegated.
+ * Delegated. {@inheritDoc}
*/
public boolean isServiceReady() {
@@ -894,7 +894,7 @@
}
/**
- * Delegated.
+ * Delegated. {@inheritDoc}
*/
public void reattachDynamicCounters() {
@@ -905,7 +905,7 @@
}
/**
- * Delegated.
+ * Delegated. {@inheritDoc}
*/
public void didStart() {
@@ -916,7 +916,7 @@
}
/**
- * Delegated.
+ * Delegated. {@inheritDoc}
*/
public AbstractHTTPD newHttpd(final int httpdPort,
final CounterSet counterSet) throws IOException {
@@ -927,7 +927,10 @@
}
- public void serviceJoin(IService service, UUID serviceUUID) {
+ /**
+ * Delegated. {@inheritDoc}
+ */
+ public void serviceJoin(final IService service, final UUID serviceUUID) {
if (!isOpen()) return;
@@ -941,7 +944,10 @@
}
- public void serviceLeave(UUID serviceUUID) {
+ /**
+ * Delegated. {@inheritDoc}
+ */
+ public void serviceLeave(final UUID serviceUUID) {
if(!isOpen()) return;
@@ -1129,9 +1135,9 @@
// notify delegates that deferred startup has occurred.
AbstractFederation.this.didStart();
+
}
-
/**
* Setup sampling on the client's thread pool. This collects interesting
* statistics about the thread pool for reporting to the load balancer
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -47,6 +47,7 @@
import com.bigdata.btree.IRangeQuery;
import com.bigdata.btree.ITuple;
import com.bigdata.btree.ITupleIterator;
+import com.bigdata.btree.IndexSegment;
import com.bigdata.journal.ITransactionService;
import com.bigdata.journal.ITx;
import com.bigdata.mdi.IMetadataIndex;
@@ -492,40 +493,56 @@
}
- /**
- * Force overflow of each data service in the scale-out federation (only
- * scale-out federations support overflow processing). This method is
- * synchronous. It will not return until all {@link DataService}s have
- * initiated and completed overflow processing. Any unused resources (as
- * determined by the {@link StoreManager}) will have been purged.
- *
- * @param truncateJournal
- * When <code>true</code>, the live journal will be truncated
- * to its minimum extent (all writes will be preserved but there
- * will be no free space left in the journal). This may be used
- * to force the {@link DataService} to its minimum possible
- * footprint.
- *
- * @todo when overflow processing is enabled for the {@link MetadataService}
- * we will have to modify this to also trigger overflow for those
- * services.
- */
- public void forceOverflow(final boolean truncateJournal) {
+ /**
+ * Force overflow of each data service in the scale-out federation (only
+ * scale-out federations support overflow processing). This method is
+ * synchronous. It will not return until all {@link DataService}s have
+ * initiated and completed overflow processing. Any unused resources (as
+ * determined by the {@link StoreManager}) will have been purged.
+ * <p>
+ * This is a relatively fast operation when
+ * <code>compactingMerge := false</code>. By specifying both
+ * <code>compactingMerge := false</code> and
+ * <code>truncateJournal := false</code> you can cause the data services to
+ * close out their current journals against further writes. While this is
+ * not a global synchronous operation, it can provide a basis to obtain a
+ * "near synchronous" snapshot from the federation consisting of all writes
+ * up to the point where overflow was triggered on each data service.
+ *
+ * @param compactingMerge
+ * When <code>true</code>, each shard on each
+ * {@link IDataService} will undergo a compacting merge.
+ * Synchronous parallel compacting merge of all shards is an
+ * expensive operation. This parameter shoudl normally be
+ * <code>false</code> unless you are requesting a compacting
+ * merge for specific purposes, such as benchmarking when all
+ * data is known to exist in one {@link IndexSegment} per shard.
+ * @param truncateJournal
+ * When <code>true</code>, the live journal will be truncated to
+ * its minimum extent (all writes will be preserved but there
+ * will be no free space left in the journal). This may be used
+ * to force the {@link DataService} to its minimum possible
+ * footprint.
+ *
+ * @todo when overflow processing is enabled for the {@link MetadataService}
+ * we will have to modify this to also trigger overflow for those
+ * services.
+ */
+ public void forceOverflow(final boolean compactingMerge, final boolean truncateJournal) {
// find UUID for each data service.
final UUID[] dataServiceUUIDs = getDataServiceUUIDs(0/* maxCount */);
final int ndataServices = dataServiceUUIDs.length;
- if(log.isInfoEnabled())
- log.info("#dataServices=" + ndataServices + ", now=" + new Date());
+ log.warn("Forcing overflow: #dataServices=" + ndataServices + ", now=" + new Date());
final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(ndataServices);
for (UUID serviceUUID : dataServiceUUIDs) {
tasks.add(new ForceOverflowTask(getDataService(serviceUUID),
- truncateJournal));
+ compactingMerge, truncateJournal));
}
@@ -570,8 +587,7 @@
}
- if(log.isInfoEnabled())
- log.info("Did overflow: #ok=" + nok + ", #dataServices="
+ log.warn("Did overflow: #ok=" + nok + ", #dataServices="
+ ndataServices + ", now=" + new Date());
if (nok != tasks.size()) {
@@ -643,16 +659,19 @@
.getLogger(ForceOverflowTask.class);
private final IDataService dataService;
+ private final boolean compactingMerge;
private final boolean truncateJournal;
- public ForceOverflowTask(final IDataService dataService,
- final boolean truncateJournal) {
+ public ForceOverflowTask(final IDataService dataService,
+ final boolean compactingMerge, final boolean truncateJournal) {
if (dataService == null)
throw new IllegalArgumentException();
this.dataService = dataService;
+ this.compactingMerge = compactingMerge;
+
this.truncateJournal = truncateJournal;
}
@@ -663,8 +682,7 @@
log.info("dataService: " + dataService.getServiceName());
// returns once synchronous overflow is complete.
- dataService
- .forceOverflow(true/* immediate */, true/* compactingMerge */);
+ dataService.forceOverflow(true/* immediate */, compactingMerge);
if (log.isInfoEnabled())
log.info("Synchronous overflow is done: "
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-08-06 15:46:07 UTC (rev 3423)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-08-06 15:59:39 UTC (rev 3424)
@@ -43,7 +43,6 @@
import org.apache.log4j.Logger;
-import com.bigdata.concurrent.LockManager;
import com.bigdata.config.LongValidator;
import com.bigdata.counters.CounterSet;
import com.bigdata.counters.Instrument;
@@ -80,9 +79,9 @@
*/
protected static final Logger log = Logger.getLogger(AbstractTransactionService.class);
- protected static final boolean INFO = log.isInfoEnabled();
+// protected static final boolean INFO = log.isInfoEnabled();
- protected static final boolean DEBUG = log.isDebugEnabled();
+// protected static final boolean DEBUG = log.isDebugEnabled();
/**
* Options understood by this service.
@@ -91,29 +90,39 @@
* @version $Id$
*/
public interface Options {
-
- /**
- * How long you want to hold onto the database history (in milliseconds)
- * or {@link Long#MAX_VALUE} for an (effectively) immortal database. The
- * {@link ITransactionService} tracks the timestamp corresponding to the
- * earliest running transaction (if any). When such a transaction
- * exists, the actual release time is:
- *
- * <pre>
- * releaseTime = min(earliestRunningTx, now - minimumReleaseAge) - 1
- * </pre>
- *
- * This ensures that history in use by running transactions is not
- * released even when the minimumReleaseAge is ZERO (0).
- *
- * @see #DEFAULT_MIN_RELEASE_AGE
- * @see #MIN_RELEASE_AGE_1H
- * @see #MIN_RELEASE_AGE_1D
- * @see #MIN_RELEASE_AGE_1W
- * @see #MIN_RELEASE_AGE_NEVER
- *
- * @see AbstractTransactionService#updateReleaseTime(long)
- */
+
+ /**
+ * How long you want to hold onto the database history (in milliseconds)
+ * or {@link Long#MAX_VALUE} for an (effectively) immortal database. The
+ * {@link ITransactionService} tracks the timestamp corresponding to the
+ * earliest running transaction (if any). When such a transaction
+ * exists, the actual release time is:
+ *
+ * <pre>
+ * releaseTime = min(lastCommitTime - 1, min(earliestRunningTx, now - minimumReleaseAge))
+ * </pre>
+ *
+ * This ensures that history in use by running transactions is not
+ * released even when the minimumReleaseAge is ZERO (0).
+ * <p>
+ * When no transactions exist the actual release time is:
+ *
+ * <pre>
+ * releaseTime = min(commitTime - 1, now - minimumReleaseAge)
+ * </pre>
+ *
+ * This ensures that the the release time advances when no transactions
+ * are in use, but that the mi...
[truncated message content] |