From: <am...@us...> - 2010-10-03 17:54:56
|
Revision: 7130 http://jython.svn.sourceforge.net/jython/?rev=7130&view=rev Author: amak Date: 2010-10-03 17:54:49 +0000 (Sun, 03 Oct 2010) Log Message: ----------- Copying over SocketServer and test_socketserver.py from CPythonLib in order to make some jython-specific bug fixes. Added Paths: ----------- trunk/jython/Lib/SocketServer.py trunk/jython/Lib/test/test_socketserver.py Added: trunk/jython/Lib/SocketServer.py =================================================================== --- trunk/jython/Lib/SocketServer.py (rev 0) +++ trunk/jython/Lib/SocketServer.py 2010-10-03 17:54:49 UTC (rev 7130) @@ -0,0 +1,588 @@ +"""Generic socket server classes. + +This module tries to capture the various aspects of defining a server: + +For socket-based servers: + +- address family: + - AF_INET{,6}: IP (Internet Protocol) sockets (default) + - AF_UNIX: Unix domain sockets + - others, e.g. AF_DECNET are conceivable (see <socket.h> +- socket type: + - SOCK_STREAM (reliable stream, e.g. TCP) + - SOCK_DGRAM (datagrams, e.g. UDP) + +For request-based servers (including socket-based): + +- client address verification before further looking at the request + (This is actually a hook for any processing that needs to look + at the request before anything else, e.g. logging) +- how to handle multiple requests: + - synchronous (one request is handled at a time) + - forking (each request is handled by a new process) + - threading (each request is handled by a new thread) + +The classes in this module favor the server type that is simplest to +write: a synchronous TCP/IP server. This is bad class design, but +save some typing. (There's also the issue that a deep class hierarchy +slows down method lookups.) + +There are five classes in an inheritance diagram, four of which represent +synchronous servers of four types: + + +------------+ + | BaseServer | + +------------+ + | + v + +-----------+ +------------------+ + | TCPServer |------->| UnixStreamServer | + +-----------+ +------------------+ + | + v + +-----------+ +--------------------+ + | UDPServer |------->| UnixDatagramServer | + +-----------+ +--------------------+ + +Note that UnixDatagramServer derives from UDPServer, not from +UnixStreamServer -- the only difference between an IP and a Unix +stream server is the address family, which is simply repeated in both +unix server classes. + +Forking and threading versions of each type of server can be created +using the ForkingMixIn and ThreadingMixIn mix-in classes. For +instance, a threading UDP server class is created as follows: + + class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass + +The Mix-in class must come first, since it overrides a method defined +in UDPServer! Setting the various member variables also changes +the behavior of the underlying server mechanism. + +To implement a service, you must derive a class from +BaseRequestHandler and redefine its handle() method. You can then run +various versions of the service by combining one of the server classes +with your request handler class. + +The request handler class must be different for datagram or stream +services. This can be hidden by using the request handler +subclasses StreamRequestHandler or DatagramRequestHandler. + +Of course, you still have to use your head! + +For instance, it makes no sense to use a forking server if the service +contains state in memory that can be modified by requests (since the +modifications in the child process would never reach the initial state +kept in the parent process and passed to each child). In this case, +you can use a threading server, but you will probably have to use +locks to avoid two requests that come in nearly simultaneous to apply +conflicting changes to the server state. + +On the other hand, if you are building e.g. an HTTP server, where all +data is stored externally (e.g. in the file system), a synchronous +class will essentially render the service "deaf" while one request is +being handled -- which may be for a very long time if a client is slow +to reqd all the data it has requested. Here a threading or forking +server is appropriate. + +In some cases, it may be appropriate to process part of a request +synchronously, but to finish processing in a forked child depending on +the request data. This can be implemented by using a synchronous +server and doing an explicit fork in the request handler class +handle() method. + +Another approach to handling multiple simultaneous requests in an +environment that supports neither threads nor fork (or where these are +too expensive or inappropriate for the service) is to maintain an +explicit table of partially finished requests and to use select() to +decide which request to work on next (or whether to handle a new +incoming request). This is particularly important for stream services +where each client can potentially be connected for a long time (if +threads or subprocesses cannot be used). + +Future work: +- Standard classes for Sun RPC (which uses either UDP or TCP) +- Standard mix-in classes to implement various authentication + and encryption schemes +- Standard framework for select-based multiplexing + +XXX Open problems: +- What to do with out-of-band data? + +BaseServer: +- split generic "request" functionality out into BaseServer class. + Copyright (C) 2000 Luke Kenneth Casson Leighton <lk...@sa...> + + example: read entries from a SQL database (requires overriding + get_request() to return a table entry from the database). + entry is processed by a RequestHandlerClass. + +""" + +# Author of the BaseServer patch: Luke Kenneth Casson Leighton + +# XXX Warning! +# There is a test suite for this module, but it cannot be run by the +# standard regression test. +# To run it manually, run Lib/test/test_socketserver.py. + +__version__ = "0.4" + + +import socket +import sys +import os + +__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer", + "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler", + "StreamRequestHandler","DatagramRequestHandler", + "ThreadingMixIn", "ForkingMixIn"] +if hasattr(socket, "AF_UNIX"): + __all__.extend(["UnixStreamServer","UnixDatagramServer", + "ThreadingUnixStreamServer", + "ThreadingUnixDatagramServer"]) + +class BaseServer: + + """Base class for server classes. + + Methods for the caller: + + - __init__(server_address, RequestHandlerClass) + - serve_forever() + - handle_request() # if you do not use serve_forever() + - fileno() -> int # for select() + + Methods that may be overridden: + + - server_bind() + - server_activate() + - get_request() -> request, client_address + - verify_request(request, client_address) + - server_close() + - process_request(request, client_address) + - close_request(request) + - handle_error() + + Methods for derived classes: + + - finish_request(request, client_address) + + Class variables that may be overridden by derived classes or + instances: + + - address_family + - socket_type + - allow_reuse_address + + Instance variables: + + - RequestHandlerClass + - socket + + """ + + def __init__(self, server_address, RequestHandlerClass): + """Constructor. May be extended, do not override.""" + self.server_address = server_address + self.RequestHandlerClass = RequestHandlerClass + + def server_activate(self): + """Called by constructor to activate the server. + + May be overridden. + + """ + pass + + def serve_forever(self): + """Handle one request at a time until doomsday.""" + while 1: + self.handle_request() + + # The distinction between handling, getting, processing and + # finishing a request is fairly arbitrary. Remember: + # + # - handle_request() is the top-level call. It calls + # get_request(), verify_request() and process_request() + # - get_request() is different for stream or datagram sockets + # - process_request() is the place that may fork a new process + # or create a new thread to finish the request + # - finish_request() instantiates the request handler class; + # this constructor will handle the request all by itself + + def handle_request(self): + """Handle one request, possibly blocking.""" + try: + request, client_address = self.get_request() + except socket.error: + return + if self.verify_request(request, client_address): + try: + self.process_request(request, client_address) + except: + self.handle_error(request, client_address) + self.close_request(request) + + def verify_request(self, request, client_address): + """Verify the request. May be overridden. + + Return True if we should proceed with this request. + + """ + return True + + def process_request(self, request, client_address): + """Call finish_request. + + Overridden by ForkingMixIn and ThreadingMixIn. + + """ + self.finish_request(request, client_address) + self.close_request(request) + + def server_close(self): + """Called to clean-up the server. + + May be overridden. + + """ + pass + + def finish_request(self, request, client_address): + """Finish one request by instantiating RequestHandlerClass.""" + self.RequestHandlerClass(request, client_address, self) + + def close_request(self, request): + """Called to clean up an individual request.""" + pass + + def handle_error(self, request, client_address): + """Handle an error gracefully. May be overridden. + + The default is to print a traceback and continue. + + """ + print '-'*40 + print 'Exception happened during processing of request from', + print client_address + import traceback + traceback.print_exc() # XXX But this goes to stderr! + print '-'*40 + + +class TCPServer(BaseServer): + + """Base class for various socket-based server classes. + + Defaults to synchronous IP stream (i.e., TCP). + + Methods for the caller: + + - __init__(server_address, RequestHandlerClass) + - serve_forever() + - handle_request() # if you don't use serve_forever() + - fileno() -> int # for select() + + Methods that may be overridden: + + - server_bind() + - server_activate() + - get_request() -> request, client_address + - verify_request(request, client_address) + - process_request(request, client_address) + - close_request(request) + - handle_error() + + Methods for derived classes: + + - finish_request(request, client_address) + + Class variables that may be overridden by derived classes or + instances: + + - address_family + - socket_type + - request_queue_size (only for stream sockets) + - allow_reuse_address + + Instance variables: + + - server_address + - RequestHandlerClass + - socket + + """ + + address_family = socket.AF_INET + + socket_type = socket.SOCK_STREAM + + request_queue_size = 5 + + allow_reuse_address = False + + def __init__(self, server_address, RequestHandlerClass): + """Constructor. May be extended, do not override.""" + BaseServer.__init__(self, server_address, RequestHandlerClass) + self.socket = socket.socket(self.address_family, + self.socket_type) + self.server_bind() + self.server_activate() + + def server_bind(self): + """Called by constructor to bind the socket. + + May be overridden. + + """ + if self.allow_reuse_address: + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind(self.server_address) + self.server_address = self.socket.getsockname() + + def server_activate(self): + """Called by constructor to activate the server. + + May be overridden. + + """ + self.socket.listen(self.request_queue_size) + + def server_close(self): + """Called to clean-up the server. + + May be overridden. + + """ + self.socket.close() + + def fileno(self): + """Return socket file number. + + Interface required by select(). + + """ + return self.socket.fileno() + + def get_request(self): + """Get the request and client address from the socket. + + May be overridden. + + """ + return self.socket.accept() + + def close_request(self, request): + """Called to clean up an individual request.""" + request.close() + + +class UDPServer(TCPServer): + + """UDP server class.""" + + allow_reuse_address = False + + socket_type = socket.SOCK_DGRAM + + max_packet_size = 8192 + + def get_request(self): + data, client_addr = self.socket.recvfrom(self.max_packet_size) + return (data, self.socket), client_addr + + def server_activate(self): + # No need to call listen() for UDP. + pass + + def close_request(self, request): + # No need to close anything. + pass + +class ForkingMixIn: + + """Mix-in class to handle each request in a new process.""" + + active_children = None + max_children = 40 + + def collect_children(self): + """Internal routine to wait for died children.""" + while self.active_children: + if len(self.active_children) < self.max_children: + options = os.WNOHANG + else: + # If the maximum number of children are already + # running, block while waiting for a child to exit + options = 0 + try: + pid, status = os.waitpid(0, options) + except os.error: + pid = None + if not pid: break + self.active_children.remove(pid) + + def process_request(self, request, client_address): + """Fork a new subprocess to process the request.""" + self.collect_children() + pid = os.fork() + if pid: + # Parent process + if self.active_children is None: + self.active_children = [] + self.active_children.append(pid) + self.close_request(request) + return + else: + # Child process. + # This must never return, hence os._exit()! + try: + self.finish_request(request, client_address) + os._exit(0) + except: + try: + self.handle_error(request, client_address) + finally: + os._exit(1) + + +class ThreadingMixIn: + """Mix-in class to handle each request in a new thread.""" + + # Decides how threads will act upon termination of the + # main process + daemon_threads = False + + def process_request_thread(self, request, client_address): + """Same as in BaseServer but as a thread. + + In addition, exception handling is done here. + + """ + try: + self.finish_request(request, client_address) + self.close_request(request) + except: + self.handle_error(request, client_address) + self.close_request(request) + + def process_request(self, request, client_address): + """Start a new thread to process the request.""" + import threading + t = threading.Thread(target = self.process_request_thread, + args = (request, client_address)) + if self.daemon_threads: + t.setDaemon (1) + t.start() + + +class ForkingUDPServer(ForkingMixIn, UDPServer): pass +class ForkingTCPServer(ForkingMixIn, TCPServer): pass + +class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass +class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass + +if hasattr(socket, 'AF_UNIX'): + + class UnixStreamServer(TCPServer): + address_family = socket.AF_UNIX + + class UnixDatagramServer(UDPServer): + address_family = socket.AF_UNIX + + class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass + + class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass + +class BaseRequestHandler: + + """Base class for request handler classes. + + This class is instantiated for each request to be handled. The + constructor sets the instance variables request, client_address + and server, and then calls the handle() method. To implement a + specific service, all you need to do is to derive a class which + defines a handle() method. + + The handle() method can find the request as self.request, the + client address as self.client_address, and the server (in case it + needs access to per-server information) as self.server. Since a + separate instance is created for each request, the handle() method + can define arbitrary other instance variariables. + + """ + + def __init__(self, request, client_address, server): + self.request = request + self.client_address = client_address + self.server = server + try: + self.setup() + self.handle() + self.finish() + finally: + sys.exc_traceback = None # Help garbage collection + + def setup(self): + pass + + def handle(self): + pass + + def finish(self): + pass + + +# The following two classes make it possible to use the same service +# class for stream or datagram servers. +# Each class sets up these instance variables: +# - rfile: a file object from which receives the request is read +# - wfile: a file object to which the reply is written +# When the handle() method returns, wfile is flushed properly + + +class StreamRequestHandler(BaseRequestHandler): + + """Define self.rfile and self.wfile for stream sockets.""" + + # Default buffer sizes for rfile, wfile. + # We default rfile to buffered because otherwise it could be + # really slow for large data (a getc() call per byte); we make + # wfile unbuffered because (a) often after a write() we want to + # read and we need to flush the line; (b) big writes to unbuffered + # files are typically optimized by stdio even when big reads + # aren't. + rbufsize = -1 + wbufsize = 0 + + def setup(self): + self.connection = self.request + self.rfile = self.connection.makefile('rb', self.rbufsize) + self.wfile = self.connection.makefile('wb', self.wbufsize) + + def finish(self): + if not self.wfile.closed: + self.wfile.flush() + self.wfile.close() + self.rfile.close() + + +class DatagramRequestHandler(BaseRequestHandler): + + # XXX Regrettably, I cannot get this working on Linux; + # s.recvfrom() doesn't return a meaningful client address. + + """Define self.rfile and self.wfile for datagram sockets.""" + + def setup(self): + try: + from cStringIO import StringIO + except ImportError: + from StringIO import StringIO + self.packet, self.socket = self.request + self.rfile = StringIO(self.packet) + self.wfile = StringIO() + + def finish(self): + self.socket.sendto(self.wfile.getvalue(), self.client_address) Added: trunk/jython/Lib/test/test_socketserver.py =================================================================== --- trunk/jython/Lib/test/test_socketserver.py (rev 0) +++ trunk/jython/Lib/test/test_socketserver.py 2010-10-03 17:54:49 UTC (rev 7130) @@ -0,0 +1,218 @@ +# Test suite for SocketServer.py + +from test import test_support +from test.test_support import (verbose, verify, TESTFN, TestSkipped, + reap_children) +test_support.requires('network') + +from SocketServer import * +import socket +import errno +import select +import time +import threading +import os + +NREQ = 3 +DELAY = 0.5 + +class MyMixinHandler: + def handle(self): + time.sleep(DELAY) + line = self.rfile.readline() + time.sleep(DELAY) + self.wfile.write(line) + +class MyStreamHandler(MyMixinHandler, StreamRequestHandler): + pass + +class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler): + pass + +class MyMixinServer: + def serve_a_few(self): + for i in range(NREQ): + self.handle_request() + def handle_error(self, request, client_address): + self.close_request(request) + self.server_close() + raise + +teststring = "hello world\n" + +def receive(sock, n, timeout=20): + r, w, x = select.select([sock], [], [], timeout) + if sock in r: + return sock.recv(n) + else: + raise RuntimeError, "timed out on %r" % (sock,) + +def testdgram(proto, addr): + s = socket.socket(proto, socket.SOCK_DGRAM) + s.sendto(teststring, addr) + buf = data = receive(s, 100) + while data and '\n' not in buf: + data = receive(s, 100) + buf += data + verify(buf == teststring) + s.close() + +def teststream(proto, addr): + s = socket.socket(proto, socket.SOCK_STREAM) + s.connect(addr) + s.sendall(teststring) + buf = data = receive(s, 100) + while data and '\n' not in buf: + data = receive(s, 100) + buf += data + verify(buf == teststring) + s.close() + +class ServerThread(threading.Thread): + def __init__(self, addr, svrcls, hdlrcls): + threading.Thread.__init__(self) + self.__addr = addr + self.__svrcls = svrcls + self.__hdlrcls = hdlrcls + def run(self): + class svrcls(MyMixinServer, self.__svrcls): + pass + if verbose: print "thread: creating server" + svr = svrcls(self.__addr, self.__hdlrcls) + # pull the address out of the server in case it changed + # this can happen if another process is using the port + addr = svr.server_address + if addr: + self.__addr = addr + if self.__addr != svr.socket.getsockname(): + raise RuntimeError('server_address was %s, expected %s' % + (self.__addr, svr.socket.getsockname())) + if verbose: print "thread: serving three times" + svr.serve_a_few() + if verbose: print "thread: done" + +seed = 0 +def pickport(): + global seed + seed += 1 + return 10000 + (os.getpid() % 1000)*10 + seed + +host = "localhost" +testfiles = [] +def pickaddr(proto): + if proto == socket.AF_INET: + return (host, pickport()) + else: + fn = TESTFN + str(pickport()) + if os.name == 'os2': + # AF_UNIX socket names on OS/2 require a specific prefix + # which can't include a drive letter and must also use + # backslashes as directory separators + if fn[1] == ':': + fn = fn[2:] + if fn[0] in (os.sep, os.altsep): + fn = fn[1:] + fn = os.path.join('\socket', fn) + if os.sep == '/': + fn = fn.replace(os.sep, os.altsep) + else: + fn = fn.replace(os.altsep, os.sep) + testfiles.append(fn) + return fn + +def cleanup(): + for fn in testfiles: + try: + os.remove(fn) + except os.error: + pass + testfiles[:] = [] + +def testloop(proto, servers, hdlrcls, testfunc): + for svrcls in servers: + addr = pickaddr(proto) + if verbose: + print "ADDR =", addr + print "CLASS =", svrcls + t = ServerThread(addr, svrcls, hdlrcls) + if verbose: print "server created" + t.start() + if verbose: print "server running" + for i in range(NREQ): + time.sleep(DELAY) + if verbose: print "test client", i + testfunc(proto, addr) + if verbose: print "waiting for server" + t.join() + if verbose: print "done" + +class ForgivingTCPServer(TCPServer): + # prevent errors if another process is using the port we want + def server_bind(self): + host, default_port = self.server_address + # this code shamelessly stolen from test.test_support + # the ports were changed to protect the innocent + import sys + for port in [default_port, 3434, 8798, 23833]: + try: + self.server_address = host, port + TCPServer.server_bind(self) + break + except socket.error, (err, msg): + if err != errno.EADDRINUSE: + raise + print >>sys.__stderr__, \ + ' WARNING: failed to listen on port %d, trying another' % port + +tcpservers = [ForgivingTCPServer, ThreadingTCPServer] +if hasattr(os, 'fork') and os.name not in ('os2',): + tcpservers.append(ForkingTCPServer) +udpservers = [UDPServer, ThreadingUDPServer] +if hasattr(os, 'fork') and os.name not in ('os2',): + udpservers.append(ForkingUDPServer) + +if not hasattr(socket, 'AF_UNIX'): + streamservers = [] + dgramservers = [] +else: + class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass + streamservers = [UnixStreamServer, ThreadingUnixStreamServer] + if hasattr(os, 'fork') and os.name not in ('os2',): + streamservers.append(ForkingUnixStreamServer) + class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass + dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer] + if hasattr(os, 'fork') and os.name not in ('os2',): + dgramservers.append(ForkingUnixDatagramServer) + +def sloppy_cleanup(): + # See http://python.org/sf/1540386 + # We need to reap children here otherwise a child from one server + # can be left running for the next server and cause a test failure. + time.sleep(DELAY) + reap_children() + +def testall(): + testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream) + sloppy_cleanup() + testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram) + if hasattr(socket, 'AF_UNIX'): + sloppy_cleanup() + testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream) + # Alas, on Linux (at least) recvfrom() doesn't return a meaningful + # client address so this cannot work: + ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram) + +def test_main(): + import imp + if imp.lock_held(): + # If the import lock is held, the threads will hang. + raise TestSkipped("can't run when import lock is held") + + try: + testall() + finally: + cleanup() + reap_children() + +if __name__ == "__main__": + test_main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |