From: <tho...@us...> - 2011-03-10 21:22:40
|
Revision: 4289 http://bigdata.svn.sourceforge.net/bigdata/?rev=4289&view=rev Author: thompsonbry Date: 2011-03-10 21:22:30 +0000 (Thu, 10 Mar 2011) Log Message: ----------- A refactor to collect performance statistics for the standalone database (the journal) per [1,2,3]. In addition, I have taken on a REST API for the NanoSparqlServer [4]. I am passing this over to Martyn to cut the code over from NanoHTTPD to the Servlet API using Jetty [5] [1] https://sourceforge.net/apps/trac/bigdata/ticket/206 (Expose performance counters for Journal) [2] https://sourceforge.net/apps/trac/bigdata/ticket/269 (Refactor performance counter interface) [3] https://sourceforge.net/apps/trac/bigdata/ticket/207 (Report on top-N queries) [4] https://sourceforge.net/apps/trac/bigdata/ticket/263 (Add update protocol to NanoSparqlServer) [5] https://sourceforge.net/apps/trac/bigdata/ticket/200 (Raise NanoSparqlServer to front line class) Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/QueryEngineFactory.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/AbstractStatisticsCollector.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/IStatisticsCollector.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/httpd/CounterSetHTTPD.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/httpd/CounterSetHTTPDServer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/PIDStatCollector.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/SarCpuUtilizationCollector.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/StatisticsCollectorForLinux.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/VMStatCollector.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/query/CounterSetQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/query/URLQueryModel.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/win/StatisticsCollectorForWindows.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/win/TypeperfCollector.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/ResourceManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractFederation.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/DataService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/DefaultClientDelegate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/DefaultServiceFederationDelegate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/IBigdataFederation.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/IFederationDelegate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/LoadBalancerService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/ThreadPoolExecutorStatisticsTask.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/httpd/AbstractHTTPD.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/httpd/NanoHTTPD.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestQueryEngineFactory.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/counters/httpd/TestCounterSetHTTPDServer.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestDirectJournal.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestDiskJournal.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestJournalShutdown.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestMappedJournal.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestTransactionService.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestTransientJournal.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestWORMStrategy.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/ha/TestHAWORMStrategy.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerTestCase.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestEventReceiver.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/jini/start/config/AbstractHostConstraint.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/jini/start/config/HostAllowConstraint.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/jini/start/config/HostRejectConstraint.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/service/jini/LoadBalancerServer.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/service/jini/TransactionServer.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/load/ConcurrentDataLoader.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/AbstractTestCase.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalQuadStore.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStore.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlClient.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/AbstractBigdataSailTestCase.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestDescribe.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/ICounterSetAccess.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/httpd/HTTPGetHandler.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/httpd/HTTPHeaderUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/httpd/MIMEType.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/httpd/NVPair.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/httpd/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/httpd/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/httpd/TestDecodeParams.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/httpd/TestHTTPHeaderUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/httpd/TestMIMEType.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/httpd/TestNanoHTTPD.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/bench/ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/bench/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/bench/TestNanoSparqlServer.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/bench/TestNanoSparqlServer_StartStop.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -64,7 +64,6 @@ import com.bigdata.counters.Instrument; import com.bigdata.journal.IIndexManager; import com.bigdata.rdf.sail.QueryHints; -import com.bigdata.rdf.sail.bench.NanoSparqlServer; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.resources.IndexManager; @@ -1232,8 +1231,6 @@ * * @return The {@link AbstractRunningQuery} -or- <code>null</code> if there * is no query associated with that query identifier. - * - * @todo Exposed to {@link NanoSparqlServer} */ public /*protected*/ AbstractRunningQuery getRunningQuery(final UUID queryId) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/QueryEngineFactory.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/QueryEngineFactory.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/QueryEngineFactory.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -38,7 +38,6 @@ import com.bigdata.journal.BufferMode; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.Journal; -import com.bigdata.rawstore.Bytes; import com.bigdata.service.IBigdataClient; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.ManagedResourceService; @@ -80,6 +79,29 @@ ); /** + * Singleton factory test (does not create the query controller) for + * standalone or scale-out. + * + * @param indexManager + * The database. + * + * @return The query controller iff one has been obtained from the factory + * and its weak reference has not been cleared. + */ + static public QueryEngine getExistingQueryController( + final IIndexManager indexManager) { + + if (indexManager instanceof IBigdataFederation<?>) { + + return federationQECache.get((IBigdataFederation<?>) indexManager); + + } + + return standaloneQECache.get((Journal)indexManager); + + } + + /** * Singleton factory for standalone or scale-out. * * @param indexManager Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/concurrent/NonBlockingLockManagerWithNewDesign.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -59,6 +59,7 @@ import com.bigdata.cache.ConcurrentWeakValueCacheWithTimeout; import com.bigdata.counters.CounterSet; +import com.bigdata.counters.ICounterSetAccess; import com.bigdata.counters.Instrument; import com.bigdata.journal.AbstractTask; import com.bigdata.util.concurrent.DaemonThreadFactory; @@ -81,7 +82,8 @@ * possible). * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ + * @version $Id: NonBlockingLockManagerWithNewDesign.java 4280 2011-03-08 + * 15:06:58Z thompsonbry $ * * @param R * The type of the object that identifies a resource for the purposes @@ -130,7 +132,8 @@ * ensure anything except lower latency when compared to other operations * awaiting their own locks. */ -public abstract class NonBlockingLockManagerWithNewDesign</* T, */R extends Comparable<R>> { +public abstract class NonBlockingLockManagerWithNewDesign</* T, */R extends Comparable<R>> + implements ICounterSetAccess { final protected static Logger log = Logger .getLogger(NonBlockingLockManagerWithNewDesign.class); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/AbstractStatisticsCollector.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/AbstractStatisticsCollector.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/AbstractStatisticsCollector.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -32,7 +32,6 @@ import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.Arrays; import java.util.Enumeration; import java.util.List; @@ -115,7 +114,7 @@ } - protected AbstractStatisticsCollector(int interval) { + protected AbstractStatisticsCollector(final int interval) { if (interval <= 0) throw new IllegalArgumentException(); @@ -168,10 +167,10 @@ // // } - /** - * {@link CounterSet} hierarchy. - */ - private CounterSet countersRoot; +// /** +// * {@link CounterSet} hierarchy. +// */ +// private CounterSet countersRoot; /** * Return the counter hierarchy. The returned hierarchy only includes those @@ -182,10 +181,11 @@ * Note: Subclasses MUST extend this method to initialize their own * counters. */ - synchronized public CounterSet getCounters() { + /*synchronized*/public CounterSet getCounters() { - if (countersRoot == null) { +// if (countersRoot == null) { + final CounterSet countersRoot = new CounterSet(); // os.arch @@ -213,7 +213,7 @@ + IHostCounters.Info_ProcessorInfo, new OneShotInstrument<String>(SystemUtil.cpuInfo())); - } +// } return countersRoot; @@ -233,8 +233,9 @@ * @param properties * The properties used to configure that service or client. */ - static public void addBasicServiceOrClientCounters(CounterSet serviceRoot, - String serviceName, Class serviceIface, Properties properties) { + static public void addBasicServiceOrClientCounters( + final CounterSet serviceRoot, final String serviceName, + final Class serviceIface, final Properties properties) { // Service info. { @@ -252,6 +253,18 @@ } + serviceRoot.attach(getMemoryCounterSet()); + + } + + /** + * Return the {@link IProcessCounters#Memory memory counter set}. This + * should be attached to the service root. + */ + static public CounterSet getMemoryCounterSet() { + + final CounterSet serviceRoot = new CounterSet(); + // Service per-process memory data { @@ -278,18 +291,13 @@ .addGarbageCollectorMXBeanCounters(serviceRoot .makePath(ICounterHierarchy.Memory_GarbageCollectors)); - // Moved since counters must be dynamically reattached to reflect pool hierarchy. -// /* -// * Add counters reporting on the various DirectBufferPools. -// */ -// { -// -// serviceRoot.makePath( -// IProcessCounters.Memory + ICounterSet.pathSeparator -// + "DirectBufferPool").attach( -// DirectBufferPool.getCounters()); -// -// } + /* + * Add counters reporting on the various DirectBufferPools. + */ + serviceRoot.makePath( + IProcessCounters.Memory + ICounterSet.pathSeparator + + "DirectBufferPool").attach( + DirectBufferPool.getCounters()); if (LRUNexus.INSTANCE != null) { @@ -306,7 +314,9 @@ } } - + + return serviceRoot; + } /** @@ -675,7 +685,7 @@ final int port = 8080; if (port != 0) { try { - httpd = new CounterSetHTTPD(port,client.countersRoot); + httpd = new CounterSetHTTPD(port, client); } catch (IOException e) { log.warn("Could not start httpd: port=" + port+" : "+e); } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/ICounterSetAccess.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/ICounterSetAccess.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/ICounterSetAccess.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -0,0 +1,43 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Mar 8, 2011 + */ + +package com.bigdata.counters; + +/** + * Interface for self-reporting performance counters. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface ICounterSetAccess { + + /** + * Return performance counters. + */ + public CounterSet getCounters(); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/ICounterSetAccess.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/IStatisticsCollector.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/IStatisticsCollector.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/IStatisticsCollector.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -34,17 +34,17 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -public interface IStatisticsCollector { +public interface IStatisticsCollector extends ICounterSetAccess { /** * The interval in seconds at which the counter values are sampled. */ public int getInterval(); - /** - * Return the counter hierarchy. - */ - public CounterSet getCounters(); +// /** +// * Return the counter hierarchy. +// */ +// public CounterSet getCounters(); /** * Start collecting performance data. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/httpd/CounterSetHTTPD.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/httpd/CounterSetHTTPD.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/httpd/CounterSetHTTPD.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -7,14 +7,12 @@ import java.io.OutputStreamWriter; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; -import java.util.Properties; -import java.util.Vector; import org.apache.log4j.Logger; import com.bigdata.counters.CounterSet; +import com.bigdata.counters.ICounterSetAccess; import com.bigdata.counters.query.CounterSetSelector; import com.bigdata.counters.query.ICounterSelector; import com.bigdata.counters.query.URLQueryModel; @@ -32,19 +30,38 @@ */ public class CounterSetHTTPD extends AbstractHTTPD { - static private final Logger log = Logger.getLogger(CounterSetHTTPD.class); - + static private final Logger log = Logger.getLogger(CounterSetHTTPD.class); + /** - * The {@link CounterSet} exposed by this service. + * Access to the {@link CounterSet} exposed by this service. */ - protected final CounterSet counterSet; + private final ICounterSetAccess accessor; /** * The service reference iff one one specified to the ctor (may be null). */ - protected final IService service; + private final IService service; /** + * The minimum time before a client can force the re-materialization of the + * {@link CounterSet}. This is designed to limit the impact of the client on + * the service. + * + * TODO Configuration parameter for {@link #minUpdateLatency} + */ + private final long minUpdateLatency = 5000; + + /** + * The last materialized {@link CounterSet}. + */ + private volatile CounterSet counterSet = null; + + /** + * The timestamp of the last materialized {@link CounterSet}. + */ + private volatile long lastTimestamp = 0L; + + /** * Class used to pre-declare classpath resources that are available for * download via httpd. * @@ -106,29 +123,39 @@ * downloaded via httpd. */ private final Map<String/*uri*/,DeclaredResource> allowedClassPathResources; - - public CounterSetHTTPD(final int port, final CounterSet root) throws IOException { - this(port, root, null/*fed*/); + /** + * The service reference iff one one specified to the ctor (may be null). + */ + final protected IService getService() { + return service; + } + + public CounterSetHTTPD(final int port, final ICounterSetAccess accessor) throws IOException { + + this(port, accessor, null/*fed*/); } /** * * @param port - * @param root + * @param accessor * @param service * Optional reference to the service within which this httpd is * hosted. * @throws IOException */ - public CounterSetHTTPD(final int port, final CounterSet root, + public CounterSetHTTPD(final int port, final ICounterSetAccess accessor, final IService service) throws IOException { super(port); - this.counterSet = root; + if(accessor == null) + throw new IllegalArgumentException(); + this.accessor = accessor; + // Note: MAY be null. this.service = service; @@ -152,14 +179,13 @@ } - public Response doGet(final String uri, final String method, - final Properties header, - final LinkedHashMap<String, Vector<String>> parms) throws Exception { + @Override + public Response doGet(final Request req) throws Exception { final ByteArrayOutputStream baos = new ByteArrayOutputStream( 2 * Bytes.kilobyte32); - final String charset = "UTF-8"; + final String charset = UTF8; final InputStream is; @@ -167,7 +193,7 @@ * If the request uri is one of the pre-declared resources then we send * that resource. */ - final DeclaredResource decl = allowedClassPathResources.get(uri); + final DeclaredResource decl = allowedClassPathResources.get(req.uri); if (decl != null) { @@ -177,6 +203,29 @@ } /* + * Materialization the CounterSet iff necessary or stale. + * + * Note: This bit needs to be single threaded to avoid concurrent + * requests causing concurrent materialization of the counter set. + */ + final ICounterSelector counterSelector; + synchronized(this) { + + final long now = System.currentTimeMillis(); + + final long elapsed = now - lastTimestamp; + + if (counterSet == null || elapsed > minUpdateLatency/* ms */) { + + counterSet = accessor.getCounters(); + + } + + counterSelector = new CounterSetSelector(counterSet); + + } + + /* * Obtain a renderer. * * @todo This really should pass in the Accept header and our own list @@ -192,12 +241,9 @@ { // build model of the controller state. - final URLQueryModel model = new URLQueryModel(service, uri, parms, - header); + final URLQueryModel model = new URLQueryModel(getService(), + req.uri, req.params, req.headers); - final ICounterSelector counterSelector = new CounterSetSelector( - counterSet); - renderer = RendererFactory.get(model, counterSelector, mimeType); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/httpd/CounterSetHTTPDServer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/httpd/CounterSetHTTPDServer.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/httpd/CounterSetHTTPDServer.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -38,6 +38,7 @@ import org.apache.log4j.Logger; import com.bigdata.counters.CounterSet; +import com.bigdata.counters.ICounterSetAccess; import com.bigdata.counters.PeriodEnum; import com.bigdata.counters.query.QueryUtil; import com.bigdata.counters.render.XHTMLRenderer; @@ -107,6 +108,16 @@ final CounterSet counterSet = new CounterSet(); + final ICounterSetAccess access = new ICounterSetAccess() { + + public CounterSet getCounters() { + + return counterSet; + + } + + }; + final DummyEventReportingService service = new DummyEventReportingService(); // any -filter arguments. @@ -185,7 +196,7 @@ // new server. final CounterSetHTTPDServer server = new CounterSetHTTPDServer(port, - counterSet, service); + access, service); // run server. server.run(); @@ -199,8 +210,9 @@ * * @param port */ - public CounterSetHTTPDServer(final int port, final CounterSet counterSet, - final IService service) throws Exception { + public CounterSetHTTPDServer(final int port, + final ICounterSetAccess counterSet, final IService service) + throws Exception { /* * The runtime shutdown hook appears to be a robust way to handle ^C by Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/PIDStatCollector.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/PIDStatCollector.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/PIDStatCollector.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -282,13 +282,13 @@ * declared using the bare path for the counter. E.g., as * {@link IProcessCounters#Memory_virtualSize}. */ - synchronized public CounterSet getCounters() { + /*synchronized*/ public CounterSet getCounters() { - if(root == null) { +// if(root == null) { - root = new CounterSet(); + final CounterSet root = new CounterSet(); - inst = new LinkedList<AbstractInst>(); + inst = new LinkedList<AbstractInst<?>>(); /* * Note: Counters are all declared as Double to facilitate @@ -315,11 +315,11 @@ inst.add(new ID(IProcessCounters.PhysicalDisk_BytesReadPerSec, Bytes.kilobyte32)); inst.add(new ID(IProcessCounters.PhysicalDisk_BytesWrittenPerSec, Bytes.kilobyte32)); - } +// } - for(Iterator<AbstractInst> itr = inst.iterator(); itr.hasNext(); ) { + for(Iterator<AbstractInst<?>> itr = inst.iterator(); itr.hasNext(); ) { - AbstractInst i = itr.next(); + final AbstractInst<?> i = itr.next(); root.addCounter(i.getPath(), i); @@ -328,8 +328,8 @@ return root; } - private List<AbstractInst> inst = null; - private CounterSet root = null; + private List<AbstractInst<?>> inst = null; +// private CounterSet root = null; /** * Extended to force <code>pidstat</code> to use a consistent Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/SarCpuUtilizationCollector.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/SarCpuUtilizationCollector.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/SarCpuUtilizationCollector.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -195,11 +195,11 @@ /** * Declares the counters that we will collect using <code>sar</code>. */ - synchronized public CounterSet getCounters() { + /*synchronized*/ public CounterSet getCounters() { - if(root == null) { +// if(root == null) { - root = new CounterSet(); + final CounterSet root = new CounterSet(); inst = new LinkedList<I>(); @@ -219,19 +219,19 @@ for(Iterator<I> itr = inst.iterator(); itr.hasNext(); ) { - I i = itr.next(); + final I i = itr.next(); root.addCounter(i.getPath(), i); } - } +// } return root; } private List<I> inst = null; - private CounterSet root = null; +// private CounterSet root = null; /** * Extended to force <code>sar</code> to use a consistent timestamp Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/StatisticsCollectorForLinux.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/StatisticsCollectorForLinux.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/StatisticsCollectorForLinux.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -96,13 +96,14 @@ } - private boolean countersAdded = false; +// private boolean countersAdded = false; - synchronized public CounterSet getCounters() { + @Override + /*synchronized*/ public CounterSet getCounters() { final CounterSet root = super.getCounters(); - if( ! countersAdded ) { +// if( ! countersAdded ) { if (sar1 != null) { @@ -141,9 +142,9 @@ } - countersAdded = true; - - } +// countersAdded = true; +// +// } return root; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/VMStatCollector.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/VMStatCollector.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/linux/VMStatCollector.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -193,11 +193,11 @@ /** * Declares the counters that we will collect */ - synchronized public CounterSet getCounters() { + /*synchronized*/ public CounterSet getCounters() { - if(root == null) { +// if(root == null) { - root = new CounterSet(); + final CounterSet root = new CounterSet(); inst = new LinkedList<I>(); @@ -270,13 +270,13 @@ } - } +// } return root; } private List<I> inst = null; - private CounterSet root = null; +// private CounterSet root = null; public AbstractProcessReader getProcessReader() { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/query/CounterSetQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/query/CounterSetQuery.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/query/CounterSetQuery.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -44,7 +44,8 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; -import java.util.Properties; +import java.util.Map; +import java.util.TreeMap; import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -55,6 +56,7 @@ import javax.xml.parsers.ParserConfigurationException; +import org.CognitiveWeb.util.CaseInsensitiveStringComparator; import org.apache.log4j.Logger; import org.xml.sax.SAXException; @@ -129,12 +131,14 @@ // Extract the URL query parameters. final LinkedHashMap<String, Vector<String>> params = NanoHTTPD - .decodeParms(url.getQuery()); + .decodeParams(url.getQuery(), + new LinkedHashMap<String, Vector<String>>()); // add any relevant headers - final Properties headers = new Properties(); + final Map<String, String> headers = new TreeMap<String, String>( + new CaseInsensitiveStringComparator()); - headers.setProperty("host", url.getHost() + ":" + url.getPort()); + headers.put("host", url.getHost() + ":" + url.getPort()); return new URLQueryModel(null/* service */, url.toString(), params, headers); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/query/URLQueryModel.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/query/URLQueryModel.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/query/URLQueryModel.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -42,7 +42,6 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Properties; import java.util.Vector; import java.util.regex.Pattern; @@ -211,7 +210,7 @@ /** * The request headers. */ - final public Properties headers; + final public Map<String,String> headers; /** * The value of the {@link #PATH} query parameter. @@ -348,8 +347,8 @@ */ public URLQueryModel(final IService service, final String uri, final LinkedHashMap<String, Vector<String>> params, - final Properties headers) { - + final Map<String, String> headers) { + this.uri = uri; this.params = params; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/win/StatisticsCollectorForWindows.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/win/StatisticsCollectorForWindows.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/win/StatisticsCollectorForWindows.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -63,13 +63,14 @@ } - private boolean countersAdded = false; +// private boolean countersAdded = false; - synchronized public CounterSet getCounters() { + @Override + /*synchronized*/ public CounterSet getCounters() { - CounterSet root = super.getCounters(); + final CounterSet root = super.getCounters(); - if( ! countersAdded ) { +// if( ! countersAdded ) { /* * These are per-host counters. We attach them under the fully @@ -77,9 +78,9 @@ */ root.makePath(fullyQualifiedHostName).attach( typeperf.getCounters() ); - countersAdded = true; - - } +// countersAdded = true; +// +// } return root; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/win/TypeperfCollector.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/win/TypeperfCollector.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/counters/win/TypeperfCollector.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -468,9 +468,9 @@ */ public CounterSet getCounters() { - if (root == null) { +// if (root == null) { - root = new CounterSet(); + final CounterSet root = new CounterSet(); final String p = ""; // @todo remove this variable. @@ -566,13 +566,13 @@ } - } +// } return root; } - private CounterSet root; +// private CounterSet root; /** * List of performance counters that we will be collecting. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2011-03-10 15:16:52 UTC (rev 4288) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2011-03-10 21:22:30 UTC (rev 4289) @@ -25,6 +25,8 @@ import java.io.File; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; @@ -33,12 +35,17 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.log4j.Logger; + import com.bigdata.bfs.BigdataFileSystem; import com.bigdata.bfs.GlobalFileSystemHelper; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.fed.QueryEngineFactory; import com.bigdata.btree.AbstractBTree; import com.bigdata.btree.BTree; import com.bigdata.btree.ILocalBTreeView; @@ -48,7 +55,10 @@ import com.bigdata.cache.HardReferenceQueue; import com.bigdata.config.IntegerValidator; import com.bigdata.config.LongValidator; +import com.bigdata.counters.AbstractStatisticsCollector; import com.bigdata.counters.CounterSet; +import com.bigdata.counters.ICounterSet; +import com.bigdata.counters.httpd.CounterSetHTTPD; import com.bigdata.ha.HAGlue; import com.bigdata.ha.QuorumService; import com.bigdata.quorum.Quorum; @@ -57,15 +67,21 @@ import com.bigdata.relation.locator.ILocatableResource; import com.bigdata.relation.locator.IResourceLocator; import com.bigdata.resources.IndexManager; +import com.bigdata.resources.ResourceManager; import com.bigdata.resources.StaleLocatorReason; import com.bigdata.service.AbstractTransactionService; import com.bigdata.service.DataService; +import com.bigdata.service.IBigdataClient; import com.bigdata.service.IBigdataFederation; +import com.bigdata.service.LoadBalancerService; import com.bigdata.sparse.GlobalRowStoreHelper; import com.bigdata.sparse.SparseRowStore; import com.bigdata.util.concurrent.DaemonThreadFactory; import com.bigdata.util.concurrent.LatchedExecutor; import com.bigdata.util.concurrent.ShutdownHelper; +import com.bigdata.util.concurrent.TaskCounters; +import com.bigdata.util.concurrent.ThreadPoolExecutorStatisticsTask; +import com.bigdata.util.httpd.AbstractHTTPD; /** * Concrete implementation suitable for a local and unpartitioned database. @@ -80,18 +96,6 @@ public class Journal extends AbstractJournal implements IConcurrencyManager, /*ILocalTransactionManager,*/ IResourceManager { -// /* -// * These fields were historically marked as [final] and set by the -// * constructor. With the introduction of high availability these fields can -// * not be final because the CREATE of the journal must be deferred until a -// * quorum leader has been elected. -// * -// * The pattern for these fields is that they are assigned by create() and -// * are thereafter immutable. The fields are marked as [volatile] so the -// * state change when they are set will be visible without explicit -// * synchronization (many methods use volatile reads on these fields). -// */ - /** * Object used to manage local transactions. */ @@ -154,7 +158,57 @@ String READ_POOL_SIZE = Journal.class.getName() + ".readPoolSize"; String DEFAULT_READ_POOL_SIZE = "0"; + + /* + * Performance counters options. + */ + + /** + * Boolean option for the collection of statistics from the underlying + * operating system (default + * {@value #DEFAULT_COLLECT_PLATFORM_STATISTICS}). + * + * @see AbstractStatisticsCollector#newInstance(Properties) + */ + String COLLECT_PLATFORM_STATISTICS = Journal.class.getName() + + ".collectPlatformStatistics"; + String DEFAULT_COLLECT_PLATFORM_STATISTICS = "false"; + + /** + * Boolean option for the collection of statistics from the various + * queues using to run tasks (default + * {@link #DEFAULT_COLLECT_QUEUE_STATISTICS}). + * + * @see ThreadPoolExecutorStatisticsTask + */ + String COLLECT_QUEUE_STATISTICS = Journal.class.getName() + + ".collectQueueStatistics"; + + String DEFAULT_COLLECT_QUEUE_STATISTICS = "false"; + + /** + * Integer option specifies the port on which an httpd service will be + * started that exposes the {@link CounterSet} for the client (default + * {@value #DEFAULT_HTTPD_PORT}). When ZERO (0), a random port will be + * used. The httpd service may be disabled by specifying <code>-1</code> + * as the port. + * <p> + * Note: The httpd service for the {@link LoadBalancerService} is + * normally run on a known port in order to make it easy to locate that + * service, e.g., port 80, 8000 or 8080, etc. This MUST be overridden for + * the {@link LoadBalancerService} it its configuration since + * {@link #DEFAULT_HTTPD_PORT} will otherwise cause a random port to be + * assigned. + */ + String HTTPD_PORT = Journal.class.getName() + ".httpdPort"; + + /** + * The default http service port is <code>-1</code>, which means + * performance counter reporting is disabled by default. + */ + String DEFAULT_HTTPD_PORT = "-1"; + } /** @@ -176,9 +230,25 @@ tempStoreFactory = new TemporaryStoreFactory(properties); - executorService = Executors.newCachedThreadPool(new DaemonThreadFactory - (getClass().getName()+".executorService")); + executorService = (ThreadPoolExecutor) Executors + .newCachedThreadPool(new DaemonThreadFactory(getClass() + .getName() + + ".executorService")); + if (Boolean.valueOf(properties.getProperty( + Options.COLLECT_QUEUE_STATISTICS, + Options.DEFAULT_COLLECT_QUEUE_STATISTICS))) { + + scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new DaemonThreadFactory( + getClass().getName() + ".sampleService")); + + } else { + + scheduledExecutorService = null; + + } + { final int readPoolSize = Integer.valueOf(properties.getProperty( @@ -221,19 +291,10 @@ concurrencyManager = new ConcurrencyManager(properties, localTransactionManager, this); - } + getExecutorService().execute(new StartDeferredTasksTask()); + + } -// public void init() { -// -// super.init(); -// -// localTransactionManager = newLocalTransactionManager(); -// -// concurrencyManager = new ConcurrencyManager(properties, -// localTransactionManager, this); -// -// } - protected AbstractLocalTransactionManager newLocalTransactionManager() { final JournalTransactionService abstractTransactionService = new JournalTransactionService( @@ -318,16 +379,105 @@ } + /** + * Interface defines and documents the counters and counter namespaces + * reported by the {@link Journal} and the various services which it uses. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + public static interface IJournalCounters extends + ConcurrencyManager.IConcurrencyManagerCounters, +// ...TransactionManager.XXXCounters, + ResourceManager.IResourceManagerCounters + { + + /** + * The namespace for the counters pertaining to the {@link ConcurrencyManager}. + */ + String concurrencyManager = "Concurrency Manager"; + + /** + * The namespace for the counters pertaining to the {@link ILocalTransactionService}. + */ + String transactionManager = "Transaction Manager"; + + /** + * The namespace for counters pertaining to the + * {@link Journal#getExecutorService()}. + */ + String executorService = "Executor Service"; + + /** + * Performance counters for the query engine associated with this + * journal (if any). + */ + String queryEngine = "Query Engine"; + + } + + /** + * {@inheritDoc} + * <p> + * Overridden to attach additional performance counters. + */ + @Override public CounterSet getCounters() { - final CounterSet counters = super.getCounters(); + final CounterSet root = new CounterSet(); + + // Host wide performance counters (collected from the OS). + if (platformStatisticsCollector != null) { + + root.attach(platformStatisticsCollector.getCounters()); + + } + + // JVM wide performance counters. + { - counters.attach(concurrencyManager.getCounters()); + final CounterSet tmp = root.makePath("JVM"); + + tmp.attach(AbstractStatisticsCollector.getMemoryCounterSet()); + + } - counters.attach(localTransactionManager.getCounters()); + // Journal performance counters. + { + + final CounterSet tmp = root.makePath("Journal"); - return counters; + tmp.attach(super.getCounters()); + + tmp.makePath(IJournalCounters.concurrencyManager) + .attach(concurrencyManager.getCounters()); + + tmp.makePath(IJournalCounters.transactionManager) + .attach(localTransactionManager.getCounters()); + + if (threadPoolExecutorStatisticsTask != null) { + + tmp.makePath(IJournalCounters.executorService) + .attach(threadPoolExecutorStatisticsTask.getCounters()); + + } + + } + // Lookup an existing query engine, but do not cause one to be created. + final QueryEngine queryEngine = QueryEngineFactory + .getExistingQueryController(this); + + if (queryEngine != null) { + + final CounterSet tmp = root.makePath(IJournalCounters.queryEngine); + + tmp.attach(queryEngine.getCounters()); + + } + + return root; + } /* @@ -888,6 +1038,31 @@ */ localTransactionManager.shutdown(); + if (platformStatisticsCollector != null) { + + platformStatisticsCollector.stop(); + + platformStatisticsCollector = null; + + } + + if (scheduledExecutorService != null) { + + scheduledExecutorService.shutdown(); + + } + + // optional httpd service for the local counters. + if (httpd != null) { + + httpd.shutdown(); + + httpd = null; + + httpdURL = null; + + } + /* * Shutdown the executor service. This will wait for any tasks being run * on that service by the application to complete. @@ -941,6 +1116,28 @@ if (!isOpen()) return; + if (platformStatisticsCollector != null) { + + platformStatisticsCollector.stop(); + + platformStatisticsCollector = null; + + } + + if (scheduledExecutorService != null) + scheduledExecutorService.shutdownNow(); + + // optional httpd service for the local counters. + if (httpd != null) { + + httpd.shutdown(); + + httpd = null; + + httpdURL = null; + + } + // Note: can be null if error in ctor. if (executorService != null) executorService.shutdownNow(); @@ -1077,7 +1274,7 @@ * @throws UnsupportedOperationException * always. */ - public IBigdataFederation getFederation() { + public IBigdataFederation<?> getFederation() { throw new UnsupportedOperationException(); @@ -1103,16 +1300,6 @@ } -// /** -// * @throws UnsupportedOperationException -// * always. -// */ -// public UUID[] getDataServiceUUIDs() { -// -// throw new UnsupportedOperationException(); -// -// } - /** * Always returns <code>null</code> since index partition moves are not * supported. @@ -1266,7 +1453,7 @@ return resourceLocator; } - private final DefaultResourceLocator resourceLocator; + private final DefaultResourceLocator<?> resourceLocator; public IResourceLockService getResourceLockService() { @@ -1284,9 +1471,62 @@ return executorService; } - private final ExecutorService executorService; + private final ThreadPoolExecutor executorService; /** + * Used to sample and report on the queue associated with the + * {@link #executorService} and <code>null</code> if we will not be + * collecting data on task execution. + */ + private final ScheduledExecutorService scheduledExecutorService; + + /** + * Collects interesting statistics on the {@link #executorService}. + * + * @see Options#COLLECT_QUEUE_STATISTICS + */ + private ThreadPoolExecutorStatisticsTask threadPoolExecutorStatisticsTask = null; + + /** + * Counters that aggregate across all tasks submitted to the Journal's + * {@link ExecutorService}. Those counters are sampled by a + * {@link ThreadPoolExecutorStatisticsTask}. + * + * @see Options#COLLECT_QUEUE_STATISTICS + */ + private final TaskCounters taskCounters = new TaskCounters(); + + /** + * Collects interesting statistics on the host and process. + * + * @see Options#COLLECT_PLATFORM_STATISTICS + */ + private AbstractStatisticsCollector platformStatisticsCollector = null; + + /** + * httpd reporting the live counters -or- <code>null</code> if not enabled. + * + * @see Options#HTTPD_PORT + */ + private AbstractHTTPD httpd = null; + + /** + * The URL that may be used to access the httpd service exposed by this + * client -or- <code>null</code> if not enabled. + */ + private String httpdURL = null; + + /** + * The URL that may be used to access the httpd service exposed by this + * client -or- <code>null</code> if not enabled. + */ + final public String getHttpdURL() { + + return httpdURL; + + } + + /** * An executor service used to read on the local disk. * * @todo This is currently used by prefetch. We should generalize this @@ -1309,5 +1549,215 @@ } private final LatchedExecutor readService; - + + /** + * This task runs once starts an (optional) + * {@link AbstractStatisticsCollector} and an (optional) httpd service. + * <p> + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + * + * FIXME Make sure that we disable this by default for the unit + * tests or we will have a bunch of sampling processes running! + */ + private class StartDeferredTasksTask implements Runnable { + + /** + * Note: The logger is named for this class, but since it is an inner + * class the name uses a "$" delimiter (vs a ".") between the outer and + * the inner class names. + */ + final private Logger log = Logger.getLogger(StartDeferredTasksTask.class); + + private StartDeferredTasksTask() { + } + + public void run() { + + try { + + startDeferredTasks(); + + } catch (Throwable t) { + + log.error(t, t); + + return; + + } + + } + + /** + * Starts performance counter collection. + */ + protected void startDeferredTasks() throws IOException { + + // start collection on various work queues. + startQueueStatisticsCollection(); + + // start collecting performance counters (if enabled). + startPlatformStatisticsCollection(); + + // start the local httpd service reporting on this service. + startHttpdService(); + + } + + /** + * Setup sampling on the client's thread pool. This collects interesting + * statistics about the thread pool for reporting to the load balancer + * service. + */ + protected void startQueueStatisticsCollection() { + + final boolean collectQueueStatistics = Boolean.valueOf(getProperty( + Options.COLLECT_QUEUE_STATISTICS, + Options.DEFAULT_COLLECT_QUEUE_STATISTICS)); + + if (log.isInfoEnabled()) + log.info(Options.COLLECT_QUEUE_STATISTICS + "=" + + collectQueueStatistics); + + if (!collectQueueStatistics) { + + return; + + } + + final long initialDelay = 0; // initial delay in ms. + final long delay = 1000; // delay in ms. + final TimeUnit unit = TimeUnit.MILLISECONDS; + + final String relpath = "Thread Pool"; + + threadPoolExecutorStatisticsTask = new ThreadPoolExecutorStatisticsTask( + relpath, executorService, taskCounters); + + scheduledExecutorService + .scheduleWithFixedDelay(threadPoolExecutorStatisticsTask, + initialDelay, delay, unit); + + } + + /** + * Start collecting performance counters from the OS (if enabled). + */ + protected void startPlatformStatisticsCollection() { + + final boolean collectPlatformStatistics = Boolean + .valueOf(getProperty(Options.COLLECT_PLATFORM_STATISTICS, + Options.DEFAULT_COLLECT_PLATFORM_STATISTICS)); + + if (log.isInfoEnabled()) + log.info(Options.COLLECT_PLATFORM_STATISTICS + "=" + + collectPlatformStatistics); + + if (!collectPlatformStatistics) { + + return; + + } + + final Properties p = getProperties(); + + if (p.getProperty(AbstractStatisticsCollector.Options.PROCESS_NAME) == null) { + + // Set default name for this process. + p.setProperty(AbstractStatisticsCollector.Options.PROCESS_NAME, + "service" + ICounterSet.pathSeparator + + Journal.class.getName()); + + } + + try { + + final AbstractStatisticsCollector tmp = AbstractStatisticsCollector + .newInstance(p); + + tmp.start(); + + // Note: synchronized(Journal.this) keeps find bugs happy. + synchronized(Journal.this) { + + Journal.this.platformStatisticsCollector = tmp; + + } + + if (log.isInfoEnabled()) + log.info("Collecting platform statistics."); + + } catch (Throwable t) { + + log.error(t, t); + + } + + } + + /** + * Start the local httpd service (if enabled). The service is started on + * the {@link IBigdataClient#getHttpdPort()}, on a randomly assigned + * port if the port is <code>0</code>, or NOT started if the port is + * <code>-1</code>. If the service is started, then the URL for the + * service is reported to the load balancer and also written into the + * file system. When started, the httpd service will be shutdown with + * the federation. + * + * @throws UnsupportedEncodingException + */ + protected void startHttpdService() throws UnsupportedEncodingException { + + final int httpdPort = Integer.valueOf(getProperty( + Options.HTTPD_PORT, Options.DEFAULT_HTTPD_PORT)); + + if (log.isInfoEnabled()) + log.info(Options.HTTPD_PORT + "=" + httpdPort + + (httpdPort == -1 ? " (disabled)" : "")); + + if (httpdPort == -1) { + + return; + + } + + final AbstractHTTPD httpd; + try { + + httpd = new CounterSetHTTPD(httpdPort, Journal.this); + + } catch (IOException e) { + + log.error("Could not start httpd: port=" + httpdPort, e); + + return; + + } + + if (httpd != null) { + + // Note: synchronized(Journal.this) keeps findbugs happy. + synchronized (Journal.this) { + + // save reference to the daemon. + Journal.this.httpd = httpd; + + // the URL that may be used to access the local httpd. + Journal.this.httpdURL = "http://" + + AbstractStatisticsCollector.fullyQualifiedHostName + + ":" + httpd.getPort() + "/?path=" + + URL... [truncated message content] |