Update of /cvsroot/webware/Webware/WebKit
In directory sc8-pr-cvs1:/tmp/cvs-serv23653
Modified Files:
ThreadedAppServer.py
Log Message:
Added/fixed docstrings
Index: ThreadedAppServer.py
===================================================================
RCS file: /cvsroot/webware/Webware/WebKit/ThreadedAppServer.py,v
retrieving revision 1.64
retrieving revision 1.65
diff -C2 -d -r1.64 -r1.65
*** ThreadedAppServer.py 19 Mar 2003 11:37:28 -0000 1.64
--- ThreadedAppServer.py 25 Mar 2003 22:24:36 -0000 1.65
***************
*** 1,16 ****
#!/usr/bin/env python
"""
! AppServer
!
! The WebKit app server is a TCP/IP server that accepts requests, hands them
! off to the Application and sends the request back over the connection.
!
! The fact that the app server stays resident is what makes it so much quicker
! than traditional CGI programming. Everything gets cached.
!
!
! FUTURE
!
! * Implement the additional settings that are commented out below.
"""
--- 1,9 ----
#!/usr/bin/env python
"""
! ThreadedAppServer uses a threaded model for handling multiple
! requests. At one time there were other experimental execution
! models for AppServer, but none of these were successful and
! have been removed. The ThreadedAppServer/AppServer distinction
! is thus largely historical.
"""
***************
*** 54,57 ****
--- 47,60 ----
class ThreadedAppServer(AppServer):
"""
+ `ThreadedAppServer` accepts incoming socket requests, spawns a
+ new thread or reuses an existing one, then dispatches the request
+ to the appropriate handler (e.g., an Adapter handler, HTTP
+ handler, etc, one for each protocol).
+
+ The transaction is connected directly to the socket, so that the
+ response is sent directly (if streaming is used, like if you call
+ ``response.flush()``). Thus the ThreadedAppServer packages
+ the socket/response, rather than value being returned up the
+ call chain.
"""
***************
*** 59,62 ****
--- 62,72 ----
def __init__(self, path=None):
+ """
+ Setup the AppServer, create an initial thread pool
+ (threads created with `spawnThread`), record the PID
+ in a file, and add the Adapter handler (which always runs,
+ unlike monitor or http).
+ """
+
AppServer.__init__(self, path)
threadCount = self.setting('StartServerThreads')
***************
*** 91,94 ****
--- 101,118 ----
def addSocketHandler(self, serverAddress, handlerClass):
+ """
+ Adds a socket handler for `serverAddress` -- `serverAddress`
+ is a tuple (*host*, *port*), where *host* is the interface
+ to connect to (for instance, the IP address on a machine
+ with multiple IP numbers), and *port* is the port (e.g.
+ HTTP is on 80 by default, and Webware adapters use 8086 by
+ default)
+
+ The `handlerClass` is a subclass of `Handler`, and is used
+ to handle the actual request -- usually returning control
+ back to ThreadedAppServer in some fashion. See Handler_
+ for more.
+ """
+
self._socketHandlers[serverAddress] = handlerClass
self._handlerCache[serverAddress] = []
***************
*** 111,117 ****
--- 135,166 ----
def isPersistent(self):
+ ":ignore:"
return 1
def mainloop(self, timeout=1):
+ """
+ This is the main thread loop that accepts and dispatches
+ socket requests.
+
+ It goes through an loop as long as ``self.running``
+ is true (i.e., ``self.running = 0`` asks the server to
+ stop running).
+
+ The loop waits for connections, then based on the connecting
+ port it initiates the proper Handler (e.g.,
+ AdapterHandler, HTTPHandler). Handlers are reused when
+ possible.
+
+ The initiated handlers are put into a queue, and
+ worker threads poll that queue to look for requests that
+ need to be handled (worker threads use `threadloop`).
+
+ Every so often (every 5 loops) it updates thread usage
+ information (`updateThreadUsage`), and every
+ ``MaxServerThreads`` * 2 loops it it will manage
+ threads (killing or spawning new ones, in
+ `manageThreadCount`).
+ """
+
from errno import EINTR
***************
*** 133,137 ****
for sock in input:
- print sock.getsockname()
self._requestID += 1
client, addr = sock.accept()
--- 182,185 ----
***************
*** 155,171 ****
self.restartIfNecessary()
! def activeThreadCount(self):
! """
! Get a snapshot of the number of threads currently in use.
! """
! count = 0
! for i in self._threadPool:
! if i._processing:
! count = count + 1
! return count
def updateThreadUsage(self):
"""
! Update the threadUseCounter list.
"""
count = self.activeThreadCount()
--- 203,219 ----
self.restartIfNecessary()
! """
! **Thread Management**
!
! These methods handle the thread pool. The AppServer pre-allocates
! threads, and reuses threads for requests. So as more threads
! are needed with varying load, new threads are spawned, and if there
! are excess threads than threads are removed.
! """
def updateThreadUsage(self):
"""
! Update the threadUseCounter list. Called periodically
! from `mainloop`.
"""
count = self.activeThreadCount()
***************
*** 174,186 ****
self._threadUseCounter.append(count)
def manageThreadCount(self):
"""
! Adjust the number of threads in use. This algorithm
! needs work. The edges (ie at the minserverthreads)
! are tricky. When working with this, remember thread
! creation is CHEAP
"""
average = 0
max = 0
--- 222,249 ----
self._threadUseCounter.append(count)
+ def activeThreadCount(self):
+ """
+ Get a snapshot of the number of threads currently in use.
+ Called from `updateThreadUsage`.
+ """
+ count = 0
+ for i in self._threadPool:
+ if i._processing:
+ count = count + 1
+ return count
def manageThreadCount(self):
"""
! Adjust the number of threads in use. From information
! gleened from `updateThreadUsage`, we see about how
! many threads are being used, to see if we have too
! many threads or too few. Based on this we create or
! absorb threads.
"""
+ ## @@: This algorithm needs work. The edges (ie at
+ ## the minserverthreads) are tricky. When working
+ ## with this, remember thread creation is CHEAP
+
average = 0
max = 0
***************
*** 221,224 ****
--- 284,291 ----
def spawnThread(self):
+ """
+ Create a new worker thread. Worker threads poll
+ with the `threadloop` method.
+ """
debug=0
if debug: print "Spawning new thread"
***************
*** 229,233 ****
self._threadCount += 1
if debug: print "New Thread Spawned, threadCount=", self._threadCount
- #self.threadUseCounter=[] #reset
def absorbThread(self, count=1):
--- 296,299 ----
***************
*** 235,258 ****
Absorb a thread. We do this by putting a None on the
Queue. When a thread gets it, that tells it to exit.
! BUT, even though we put it on, the thread may not have
! retrieved it before we exit this function. So we need
! to decrement the thread count even if we didn't find a
! thread that isn't alive. We'll get it the next time
! through.
"""
- debug = 0
- if debug: print "Absorbing %s Threads" % count
for i in range(count):
self._requestQueue.put(None)
self._threadCount -= 1
for i in self._threadPool:
if not i.isAlive():
rv = i.join() #Don't need a timeout, it isn't alive
self._threadPool.remove(i)
if debug: print "Thread Absorbed, Real Thread Count=", len(self.threadPool)
- #self.threadUseCounter=[] #reset
def threadloop(self):
self.initThread()
--- 301,346 ----
Absorb a thread. We do this by putting a None on the
Queue. When a thread gets it, that tells it to exit.
!
! We also keep track of the threads, so after killing
! threads we go through all the threads and find the
! thread(s) that have exited, so that we can take them
! out of the thread pool.
"""
for i in range(count):
self._requestQueue.put(None)
+ # _threadCount is an estimate, just because we
+ # put None in the queue, the threads don't immediately
+ # disapear, but they will eventually.
self._threadCount -= 1
for i in self._threadPool:
+ # There may still be a None in the queue, and some
+ # of the threads we want gone may not yet be gone.
+ # But we'll pick them up later -- they'll wait,.
if not i.isAlive():
rv = i.join() #Don't need a timeout, it isn't alive
self._threadPool.remove(i)
if debug: print "Thread Absorbed, Real Thread Count=", len(self.threadPool)
+ """
+ **Worker Threads**
+ """
def threadloop(self):
+ """
+ This is the main loop for worker threads. Worker
+ threads poll the ``_requestQueue`` to find a request
+ handler waiting to run. If they find a None in the
+ queue, this thread has been selected to die, which is
+ the way the loop ends.
+
+ The handler object does all the work when its
+ `handleRequest` method is called.
+
+ `initThread` and `delThread` methods are called at
+ the beginning and end of the thread loop, but they
+ aren't being used for anything (future use as a
+ hook).
+ """
+
self.initThread()
***************
*** 296,311 ****
pass
! def awakeSelect(self):
! """ Send a connect to ourself to pop the select() call
! out of it's loop safely """
! for addr in self._sockets.keys():
! sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
! try:
! sock.connect(addr)
! sock.close()
! except:
! pass
def shutDown(self):
self.running=0
self.awakeSelect()
--- 384,398 ----
pass
! """
! **Shutting Down**
! """
def shutDown(self):
+ """
+ Called on shutdown. Also calls `AppServer.shutDown`,
+ but first closes all sockets and tells all the threads
+ to die.
+ """
+
self.running=0
self.awakeSelect()
***************
*** 323,329 ****
AppServer.shutDown(self)
! ## Network Server ##
def address(self):
if self._addr is None:
self._addr = (self.setting('Host'), self.setting('Port'))
--- 410,438 ----
AppServer.shutDown(self)
! def awakeSelect(self):
! """
! The ``select()`` in `mainloop` is blocking, so when
! we shut down we have to make a connect to unblock it.
! Here's where we do that, called `shutDown`.
! """
!
! for addr in self._sockets.keys():
! sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
! try:
! sock.connect(addr)
! sock.close()
! except:
! pass
!
! """
! **Misc**
! """
def address(self):
+ """
+ The address for the Adapter (Host/interface, and port),
+ taken from ``Configs/Application.config``, setting
+ ``Host`` and ``Port``.
+ """
if self._addr is None:
self._addr = (self.setting('Host'), self.setting('Port'))
***************
*** 332,336 ****
--- 441,465 ----
class Handler:
+ """
+ Handler is an abstract superclass -- specific protocol
+ implementations will subclass this. A Handler takes a socket
+ to interact with, and creates a raw request.
+
+ Handlers will be reused. When a socket is received
+ `activate` will be called -- but the handler should not do
+ anything, as it is still running in the main thread. The
+ handler is put into a queue, and a worker thread picks it
+ up and runs `handleRequest`, which subclasses should override.
+
+ Several methods are provided which are typically used by
+ subclasses.
+ """
+
def __init__(self, server, serverAddress):
+ """
+ Each handler is attached to a specific host and port,
+ and of course to the AppServer.
+ """
+
self._server = server
self._serverAddress = serverAddress
***************
*** 339,359 ****
"""
Activates the handler for processing the request.
! Number is the number of the request, mostly used to identify
! verbose output. Each request should be given a unique,
! incremental number.
"""
self._requestID = requestID
self._sock = sock
def close(self):
self._sock = None
self._server._handlerCache[self._serverAddress].append(self)
def handleRequest(self):
pass
def receiveDict(self):
"""
! Utility function to receive a marshalled dictionary.
"""
chunk = ''
--- 468,502 ----
"""
Activates the handler for processing the request.
! `sock` is the incoming socket that this handler
! will work with, and `requestID` is a serial number
! unique for each request.
!
! This isn't where work gets done -- the handler is
! queued after this, and work is done when
! `handleRequest` is called.
"""
+
self._requestID = requestID
self._sock = sock
def close(self):
+ """
+ Called when the handler is finished. Closes the socket
+ and returns the handler to the pool of inactive handlers.
+ """
self._sock = None
self._server._handlerCache[self._serverAddress].append(self)
def handleRequest(self):
+ """
+ Subclasses should override this -- this is where
+ work gets done.
+ """
pass
def receiveDict(self):
"""
! Utility function to receive a marshalled dictionary from
! the socket.
"""
chunk = ''
***************
*** 384,388 ****
class MonitorHandler(Handler):
! protcolName = 'monitor'
def handleRequest(self):
--- 527,546 ----
class MonitorHandler(Handler):
! """
! Monitor is a minimal service that accepts a simple protocol,
! and returns a value indicating the status of the server.
!
! The protocol passes a marshalled dict, much like the Adapter
! interface, which looks like ``{'format': 'XXX'}``, where XXX
! is a command (``STATUS`` or ``QUIT``). Responds with a simple
! string, either the number of requests we've received (for
! ``STATUS``) or ``OK`` for ``QUIT`` (which also stops the
! server)
! """
!
! # @@ 2003-03 ib: we should have a RESTART command, and
! # perhaps better status indicators (# of threads, etc).
!
! protocolName = 'monitor'
def handleRequest(self):
***************
*** 413,421 ****
--- 571,601 ----
class TASASStreamOut(ASStreamOut):
+ """
+ The `TASASStreamOut` class streams to a given socket, so that
+ when `flush` is called and the buffer is ready to be written,
+ it sends the data from the buffer out on the socket. This is
+ the response stream used for requests generated by
+ ThreadedAppServer.
+
+ TAS stands for ThreadedAppServer (AS for AppServer... a little
+ redundant).
+ """
+
def __init__(self, sock):
+ """
+ We get an extra `sock` argument, which is the socket
+ which we'll stream output to (if we're streaming).
+ """
+
ASStreamOut.__init__(self)
self._socket = sock
def flush(self):
+ """
+ Calls `ASStreamOut.ASStreamOut.flush`, and if that
+ returns true (indicating the buffer is full enough)
+ then we send data from the buffer out on the socket.
+ """
+
debug=0
result = ASStreamOut.flush(self)
***************
*** 437,443 ****
--- 617,644 ----
class AdapterHandler(Handler):
+ """
+ Handles the Adapter protocol (as used in mod_webkit, wkcgi,
+ WebKit.cgi, HTTPAdapter, etc). This protocol passes a marshalled
+ dictionary which contains the keys ``format`` and ``environ``.
+ ``format`` is currently always the string ``CGI``, and ``environ``
+ is a dictionary of string: string, with values like those passed
+ in the environment to a CGI request (QUERY_STRING, HTTP_HOST, etc).
+
+ The handler adds one more key, ``input``, which contains a file
+ object based off the socket, which contains the body of the
+ request (the POST data, for instance). It's left to Application
+ to handle that data.
+ """
+
protocolName = 'address'
def handleRequest(self):
+ """
+ Creates the request dictionary, and creates a
+ `TASASStreamOut` object for the response, then calls
+ `Application.dispatchRawRequest`, which does the
+ rest of the work (here we just clean up after).
+ """
+
verbose = self._server._verbose
self._startTime = time.time()
***************
*** 475,481 ****
--- 676,700 ----
def makeInput(self):
+ """
+ Create a file-like object from the socket
+ """
return self._sock.makefile("rb",8012)
+
def run(useMonitor = 0, http=0, workDir=None):
+ """
+ Starts the server (`ThreadedAppServer`).
+
+ `workDir` is the server-side path for the server, which may
+ not be the ``Webware/WebKit`` directory (though by default
+ it is). The monitor and HTTP handlers are started based
+ on `useMonitor` and `http`. For monitor see `MonitorHandler`_,
+ and for HTTP see `HTTPServer.HTTPAppServerHandler`.
+
+ After setting up the ThreadedAppServer we call
+ `ThreadedAppServer.mainloop` to start the server main loop.
+ It also catches exceptions as a last resort.
+ """
+
global server
global monitor
***************
*** 531,534 ****
--- 750,754 ----
except KeyboardInterrupt, e:
server.shutDown()
+
except Exception, e:
if not isinstance(e, SystemExit):
***************
*** 549,553 ****
--- 769,777 ----
+ # 2003-03 ib @@: is this right? arg1, arg2?
def shutDown(arg1,arg2):
+ """
+ Shut down the server.
+ """
global server
print "Shutdown Called", time.asctime(time.localtime(time.time()))
|