From: <jbo...@li...> - 2006-03-28 14:25:46
|
Author: rem...@jb... Date: 2006-03-28 09:25:43 -0500 (Tue, 28 Mar 2006) New Revision: 3314 Modified: trunk/labs/jbossweb/src/share/classes/org/apache/tomcat/util/net/AprEndpoint.java Log: - Port the new AprEndpoint. - Replace with non synced stack. - New threading capabilities. - New default for firstReadTimeout. Modified: trunk/labs/jbossweb/src/share/classes/org/apache/tomcat/util/net/AprEndpoint.java =================================================================== --- trunk/labs/jbossweb/src/share/classes/org/apache/tomcat/util/net/AprEndpoint.java 2006-03-28 14:23:20 UTC (rev 3313) +++ trunk/labs/jbossweb/src/share/classes/org/apache/tomcat/util/net/AprEndpoint.java 2006-03-28 14:25:43 UTC (rev 3314) @@ -19,7 +19,6 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; -import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -91,31 +90,12 @@ /** - * The acceptor thread. + * Available workers. */ - protected Thread acceptorThread = null; + protected WorkerStack workers = null; /** - * The socket poller thread. - */ - protected Thread pollerThread = null; - - - /** - * The sendfile thread. - */ - protected Thread sendfileThread = null; - - - /** - * Available processors. - */ - // FIXME: Stack is synced, which makes it a non optimal choice - protected Stack workers = new Stack(); - - - /** * Running state of the endpoint. */ protected volatile boolean running = false; @@ -197,7 +177,7 @@ /** * Size of the socket poller. */ - protected int pollerSize = 16 * 1024; + protected int pollerSize = 8 * 1024; public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; } public int getPollerSize() { return pollerSize; } @@ -271,7 +251,7 @@ /** * Timeout on first request read before going to the poller, in ms. */ - protected int firstReadTimeout = 100; + protected int firstReadTimeout = -1; public int getFirstReadTimeout() { return firstReadTimeout; } public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; } @@ -282,7 +262,7 @@ */ protected int pollTime = 2000; public int getPollTime() { return pollTime; } - public void setPollTime(int pollTime) { this.pollTime = pollTime; } + public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } } /** @@ -312,31 +292,49 @@ /** - * Number of keepalive sockets. + * Acceptor thread count. */ - protected int keepAliveCount = 0; - public int getKeepAliveCount() { return keepAliveCount; } + protected int acceptorThreadCount = 0; + public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } + public int getAcceptorThreadCount() { return acceptorThreadCount; } /** - * Number of sendfile sockets. + * Sendfile thread count. */ - protected int sendfileCount = 0; - public int getSendfileCount() { return sendfileCount; } + protected int sendfileThreadCount = 0; + public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; } + public int getSendfileThreadCount() { return sendfileThreadCount; } /** + * Poller thread count. + */ + protected int pollerThreadCount = 0; + public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } + public int getPollerThreadCount() { return pollerThreadCount; } + + + /** * The socket poller. */ - protected Poller poller = null; - public Poller getPoller() { return poller; } + protected Poller[] pollers = null; + protected int pollerRoundRobin = 0; + public Poller getPoller() { + pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; + return pollers[pollerRoundRobin]; + } /** * The static file sender. */ - protected Sendfile sendfile = null; - public Sendfile getSendfile() { return sendfile; } + protected Sendfile[] sendfiles = null; + protected int sendfileRoundRobin = 0; + public Sendfile getSendfile() { + sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length; + return sendfiles[sendfileRoundRobin]; + } /** @@ -460,6 +458,38 @@ /** + * Number of keepalive sockets. + */ + public int getKeepAliveCount() { + if (pollers == null) { + return 0; + } else { + int keepAliveCount = 0; + for (int i = 0; i < pollers.length; i++) { + keepAliveCount += pollers[i].getKeepAliveCount(); + } + return keepAliveCount; + } + } + + + /** + * Number of sendfile sockets. + */ + public int getSendfileCount() { + if (sendfiles == null) { + return 0; + } else { + int sendfileCount = 0; + for (int i = 0; i < sendfiles.length; i++) { + sendfileCount += sendfiles[i].getSendfileCount(); + } + return sendfileCount; + } + } + + + /** * Return the amount of threads that are managed by the pool. * * @return the amount of threads that are managed by the pool @@ -510,7 +540,7 @@ if (initialized) return; - + // Create the root APR memory pool rootPool = Pool.create(0); // Create the pool for the server socket @@ -556,6 +586,35 @@ useSendfile = false; } + // Initialize thread count defaults for acceptor, poller and sendfile + if (acceptorThreadCount == 0) { + // FIXME: Doesn't seem to work that well with multiple accept threads + acceptorThreadCount = 1; + } + if (pollerThreadCount == 0) { + if ((OS.IS_WIN32 || OS.IS_WIN64) && (pollerSize > 1024)) { + // The maximum per poller to get reasonable performance is 1024 + pollerThreadCount = pollerSize / 1024; + // Adjust poller size so that it won't reach the limit + pollerSize = pollerSize - (pollerSize % 1024); + } else { + // No explicit poller size limitation + pollerThreadCount = 1; + } + } + if (sendfileThreadCount == 0) { + if ((OS.IS_WIN32 || OS.IS_WIN64) && (sendfileSize > 1024)) { + // The maximum per poller to get reasonable performance is 1024 + sendfileThreadCount = sendfileSize / 1024; + // Adjust poller size so that it won't reach the limit + sendfileSize = sendfileSize - (sendfileSize % 1024); + } else { + // No explicit poller size limitation + // FIXME: Default to one per CPU ? + sendfileThreadCount = 1; + } + } + // Delay accepting of new connections until data is available // Only Linux kernels 2.4 + have that implemented // on other platforms this call is noop and will return APR_ENOTIMPL. @@ -625,28 +684,39 @@ running = true; paused = false; + // Create worker collection + workers = new WorkerStack(maxThreads); + // Start acceptor thread - acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor"); - acceptorThread.setPriority(threadPriority); - acceptorThread.setDaemon(daemon); - acceptorThread.start(); + for (int i = 0; i < acceptorThreadCount; i++) { + Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); + acceptorThread.setPriority(threadPriority); + acceptorThread.setDaemon(daemon); + acceptorThread.start(); + } // Start poller thread - poller = new Poller(); - poller.init(); - pollerThread = new Thread(poller, getName() + "-Poller"); - pollerThread.setPriority(threadPriority); - pollerThread.setDaemon(true); - pollerThread.start(); + pollers = new Poller[pollerThreadCount]; + for (int i = 0; i < pollerThreadCount; i++) { + pollers[i] = new Poller(); + pollers[i].init(); + Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); + pollerThread.setPriority(threadPriority); + pollerThread.setDaemon(true); + pollerThread.start(); + } // Start sendfile thread if (useSendfile) { - sendfile = new Sendfile(); - sendfile.init(); - sendfileThread = new Thread(sendfile, getName() + "-Sendfile"); - sendfileThread.setPriority(threadPriority); - sendfileThread.setDaemon(true); - sendfileThread.start(); + sendfiles = new Sendfile[sendfileThreadCount]; + for (int i = 0; i < sendfileThreadCount; i++) { + sendfiles[i] = new Sendfile(); + sendfiles[i].init(); + Thread sendfileThread = new Thread(sendfiles[i], getName() + "-Sendfile-" + i); + sendfileThread.setPriority(threadPriority); + sendfileThread.setDaemon(true); + sendfileThread.start(); + } } } } @@ -681,13 +751,16 @@ if (running) { running = false; unlockAccept(); - poller.destroy(); + for (int i = 0; i < pollers.length; i++) { + pollers[i].destroy(); + } + pollers = null; if (useSendfile) { - sendfile.destroy(); + for (int i = 0; i < sendfiles.length; i++) { + sendfiles[i].destroy(); + } + sendfiles = null; } - acceptorThread = null; - pollerThread = null; - sendfileThread = null; } } @@ -808,7 +881,7 @@ synchronized (workers) { if (workers.size() > 0) { curThreadsBusy++; - return ((Worker) workers.pop()); + return (workers.pop()); } if ((maxThreads > 0) && (curThreads < maxThreads)) { curThreadsBusy++; @@ -958,6 +1031,9 @@ protected long[] addS; protected int addCount = 0; + + protected int keepAliveCount = 0; + public int getKeepAliveCount() { return keepAliveCount; } /** * Create the poller. With some versions of APR, the maximum poller size will @@ -965,7 +1041,7 @@ */ protected void init() { pool = Pool.create(serverSockPool); - int size = pollerSize; + int size = pollerSize / pollerThreadCount; serverPollset = allocatePoller(size, pool, soTimeout); if (serverPollset == 0 && size > 1024) { size = 1024; @@ -1269,6 +1345,9 @@ protected long pool = 0; protected long[] desc; protected HashMap sendfileData; + + protected int sendfileCount; + public int getSendfileCount() { return sendfileCount; } protected ArrayList addS; @@ -1278,8 +1357,8 @@ */ protected void init() { pool = Pool.create(serverSockPool); - int size = sendfileSize; - sendfilePollset = allocatePoller(sendfileSize, pool, soTimeout); + int size = sendfileSize / sendfileThreadCount; + sendfilePollset = allocatePoller(size, pool, soTimeout); if (sendfilePollset == 0 && size > 1024) { size = 1024; sendfilePollset = allocatePoller(size, pool, soTimeout); @@ -1517,4 +1596,59 @@ } + // ------------------------------------------------- WorkerStack Inner Class + + + public class WorkerStack { + + protected Worker[] workers = null; + protected int end = 0; + + public WorkerStack(int size) { + workers = new Worker[size]; + } + + /** + * Put the object into the queue. + * + * @param object the object to be appended to the queue (first element). + */ + public void push(Worker worker) { + workers[end++] = worker; + } + + /** + * Get the first object out of the queue. Return null if the queue + * is empty. + */ + public Worker pop() { + if (end > 0) { + return workers[--end]; + } + return null; + } + + /** + * Get the first object out of the queue, Return null if the queue + * is empty. + */ + public Worker peek() { + return workers[end]; + } + + /** + * Is the queue empty? + */ + public boolean isEmpty() { + return (end == 0); + } + + /** + * How many elements are there in this queue? + */ + public int size() { + return (end); + } + } + } |