|
From: <am...@us...> - 2010-10-03 17:54:56
|
Revision: 7130
http://jython.svn.sourceforge.net/jython/?rev=7130&view=rev
Author: amak
Date: 2010-10-03 17:54:49 +0000 (Sun, 03 Oct 2010)
Log Message:
-----------
Copying over SocketServer and test_socketserver.py from CPythonLib in order to make some jython-specific bug fixes.
Added Paths:
-----------
trunk/jython/Lib/SocketServer.py
trunk/jython/Lib/test/test_socketserver.py
Added: trunk/jython/Lib/SocketServer.py
===================================================================
--- trunk/jython/Lib/SocketServer.py (rev 0)
+++ trunk/jython/Lib/SocketServer.py 2010-10-03 17:54:49 UTC (rev 7130)
@@ -0,0 +1,588 @@
+"""Generic socket server classes.
+
+This module tries to capture the various aspects of defining a server:
+
+For socket-based servers:
+
+- address family:
+ - AF_INET{,6}: IP (Internet Protocol) sockets (default)
+ - AF_UNIX: Unix domain sockets
+ - others, e.g. AF_DECNET are conceivable (see <socket.h>
+- socket type:
+ - SOCK_STREAM (reliable stream, e.g. TCP)
+ - SOCK_DGRAM (datagrams, e.g. UDP)
+
+For request-based servers (including socket-based):
+
+- client address verification before further looking at the request
+ (This is actually a hook for any processing that needs to look
+ at the request before anything else, e.g. logging)
+- how to handle multiple requests:
+ - synchronous (one request is handled at a time)
+ - forking (each request is handled by a new process)
+ - threading (each request is handled by a new thread)
+
+The classes in this module favor the server type that is simplest to
+write: a synchronous TCP/IP server. This is bad class design, but
+save some typing. (There's also the issue that a deep class hierarchy
+slows down method lookups.)
+
+There are five classes in an inheritance diagram, four of which represent
+synchronous servers of four types:
+
+ +------------+
+ | BaseServer |
+ +------------+
+ |
+ v
+ +-----------+ +------------------+
+ | TCPServer |------->| UnixStreamServer |
+ +-----------+ +------------------+
+ |
+ v
+ +-----------+ +--------------------+
+ | UDPServer |------->| UnixDatagramServer |
+ +-----------+ +--------------------+
+
+Note that UnixDatagramServer derives from UDPServer, not from
+UnixStreamServer -- the only difference between an IP and a Unix
+stream server is the address family, which is simply repeated in both
+unix server classes.
+
+Forking and threading versions of each type of server can be created
+using the ForkingMixIn and ThreadingMixIn mix-in classes. For
+instance, a threading UDP server class is created as follows:
+
+ class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
+
+The Mix-in class must come first, since it overrides a method defined
+in UDPServer! Setting the various member variables also changes
+the behavior of the underlying server mechanism.
+
+To implement a service, you must derive a class from
+BaseRequestHandler and redefine its handle() method. You can then run
+various versions of the service by combining one of the server classes
+with your request handler class.
+
+The request handler class must be different for datagram or stream
+services. This can be hidden by using the request handler
+subclasses StreamRequestHandler or DatagramRequestHandler.
+
+Of course, you still have to use your head!
+
+For instance, it makes no sense to use a forking server if the service
+contains state in memory that can be modified by requests (since the
+modifications in the child process would never reach the initial state
+kept in the parent process and passed to each child). In this case,
+you can use a threading server, but you will probably have to use
+locks to avoid two requests that come in nearly simultaneous to apply
+conflicting changes to the server state.
+
+On the other hand, if you are building e.g. an HTTP server, where all
+data is stored externally (e.g. in the file system), a synchronous
+class will essentially render the service "deaf" while one request is
+being handled -- which may be for a very long time if a client is slow
+to reqd all the data it has requested. Here a threading or forking
+server is appropriate.
+
+In some cases, it may be appropriate to process part of a request
+synchronously, but to finish processing in a forked child depending on
+the request data. This can be implemented by using a synchronous
+server and doing an explicit fork in the request handler class
+handle() method.
+
+Another approach to handling multiple simultaneous requests in an
+environment that supports neither threads nor fork (or where these are
+too expensive or inappropriate for the service) is to maintain an
+explicit table of partially finished requests and to use select() to
+decide which request to work on next (or whether to handle a new
+incoming request). This is particularly important for stream services
+where each client can potentially be connected for a long time (if
+threads or subprocesses cannot be used).
+
+Future work:
+- Standard classes for Sun RPC (which uses either UDP or TCP)
+- Standard mix-in classes to implement various authentication
+ and encryption schemes
+- Standard framework for select-based multiplexing
+
+XXX Open problems:
+- What to do with out-of-band data?
+
+BaseServer:
+- split generic "request" functionality out into BaseServer class.
+ Copyright (C) 2000 Luke Kenneth Casson Leighton <lk...@sa...>
+
+ example: read entries from a SQL database (requires overriding
+ get_request() to return a table entry from the database).
+ entry is processed by a RequestHandlerClass.
+
+"""
+
+# Author of the BaseServer patch: Luke Kenneth Casson Leighton
+
+# XXX Warning!
+# There is a test suite for this module, but it cannot be run by the
+# standard regression test.
+# To run it manually, run Lib/test/test_socketserver.py.
+
+__version__ = "0.4"
+
+
+import socket
+import sys
+import os
+
+__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
+ "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
+ "StreamRequestHandler","DatagramRequestHandler",
+ "ThreadingMixIn", "ForkingMixIn"]
+if hasattr(socket, "AF_UNIX"):
+ __all__.extend(["UnixStreamServer","UnixDatagramServer",
+ "ThreadingUnixStreamServer",
+ "ThreadingUnixDatagramServer"])
+
+class BaseServer:
+
+ """Base class for server classes.
+
+ Methods for the caller:
+
+ - __init__(server_address, RequestHandlerClass)
+ - serve_forever()
+ - handle_request() # if you do not use serve_forever()
+ - fileno() -> int # for select()
+
+ Methods that may be overridden:
+
+ - server_bind()
+ - server_activate()
+ - get_request() -> request, client_address
+ - verify_request(request, client_address)
+ - server_close()
+ - process_request(request, client_address)
+ - close_request(request)
+ - handle_error()
+
+ Methods for derived classes:
+
+ - finish_request(request, client_address)
+
+ Class variables that may be overridden by derived classes or
+ instances:
+
+ - address_family
+ - socket_type
+ - allow_reuse_address
+
+ Instance variables:
+
+ - RequestHandlerClass
+ - socket
+
+ """
+
+ def __init__(self, server_address, RequestHandlerClass):
+ """Constructor. May be extended, do not override."""
+ self.server_address = server_address
+ self.RequestHandlerClass = RequestHandlerClass
+
+ def server_activate(self):
+ """Called by constructor to activate the server.
+
+ May be overridden.
+
+ """
+ pass
+
+ def serve_forever(self):
+ """Handle one request at a time until doomsday."""
+ while 1:
+ self.handle_request()
+
+ # The distinction between handling, getting, processing and
+ # finishing a request is fairly arbitrary. Remember:
+ #
+ # - handle_request() is the top-level call. It calls
+ # get_request(), verify_request() and process_request()
+ # - get_request() is different for stream or datagram sockets
+ # - process_request() is the place that may fork a new process
+ # or create a new thread to finish the request
+ # - finish_request() instantiates the request handler class;
+ # this constructor will handle the request all by itself
+
+ def handle_request(self):
+ """Handle one request, possibly blocking."""
+ try:
+ request, client_address = self.get_request()
+ except socket.error:
+ return
+ if self.verify_request(request, client_address):
+ try:
+ self.process_request(request, client_address)
+ except:
+ self.handle_error(request, client_address)
+ self.close_request(request)
+
+ def verify_request(self, request, client_address):
+ """Verify the request. May be overridden.
+
+ Return True if we should proceed with this request.
+
+ """
+ return True
+
+ def process_request(self, request, client_address):
+ """Call finish_request.
+
+ Overridden by ForkingMixIn and ThreadingMixIn.
+
+ """
+ self.finish_request(request, client_address)
+ self.close_request(request)
+
+ def server_close(self):
+ """Called to clean-up the server.
+
+ May be overridden.
+
+ """
+ pass
+
+ def finish_request(self, request, client_address):
+ """Finish one request by instantiating RequestHandlerClass."""
+ self.RequestHandlerClass(request, client_address, self)
+
+ def close_request(self, request):
+ """Called to clean up an individual request."""
+ pass
+
+ def handle_error(self, request, client_address):
+ """Handle an error gracefully. May be overridden.
+
+ The default is to print a traceback and continue.
+
+ """
+ print '-'*40
+ print 'Exception happened during processing of request from',
+ print client_address
+ import traceback
+ traceback.print_exc() # XXX But this goes to stderr!
+ print '-'*40
+
+
+class TCPServer(BaseServer):
+
+ """Base class for various socket-based server classes.
+
+ Defaults to synchronous IP stream (i.e., TCP).
+
+ Methods for the caller:
+
+ - __init__(server_address, RequestHandlerClass)
+ - serve_forever()
+ - handle_request() # if you don't use serve_forever()
+ - fileno() -> int # for select()
+
+ Methods that may be overridden:
+
+ - server_bind()
+ - server_activate()
+ - get_request() -> request, client_address
+ - verify_request(request, client_address)
+ - process_request(request, client_address)
+ - close_request(request)
+ - handle_error()
+
+ Methods for derived classes:
+
+ - finish_request(request, client_address)
+
+ Class variables that may be overridden by derived classes or
+ instances:
+
+ - address_family
+ - socket_type
+ - request_queue_size (only for stream sockets)
+ - allow_reuse_address
+
+ Instance variables:
+
+ - server_address
+ - RequestHandlerClass
+ - socket
+
+ """
+
+ address_family = socket.AF_INET
+
+ socket_type = socket.SOCK_STREAM
+
+ request_queue_size = 5
+
+ allow_reuse_address = False
+
+ def __init__(self, server_address, RequestHandlerClass):
+ """Constructor. May be extended, do not override."""
+ BaseServer.__init__(self, server_address, RequestHandlerClass)
+ self.socket = socket.socket(self.address_family,
+ self.socket_type)
+ self.server_bind()
+ self.server_activate()
+
+ def server_bind(self):
+ """Called by constructor to bind the socket.
+
+ May be overridden.
+
+ """
+ if self.allow_reuse_address:
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.socket.bind(self.server_address)
+ self.server_address = self.socket.getsockname()
+
+ def server_activate(self):
+ """Called by constructor to activate the server.
+
+ May be overridden.
+
+ """
+ self.socket.listen(self.request_queue_size)
+
+ def server_close(self):
+ """Called to clean-up the server.
+
+ May be overridden.
+
+ """
+ self.socket.close()
+
+ def fileno(self):
+ """Return socket file number.
+
+ Interface required by select().
+
+ """
+ return self.socket.fileno()
+
+ def get_request(self):
+ """Get the request and client address from the socket.
+
+ May be overridden.
+
+ """
+ return self.socket.accept()
+
+ def close_request(self, request):
+ """Called to clean up an individual request."""
+ request.close()
+
+
+class UDPServer(TCPServer):
+
+ """UDP server class."""
+
+ allow_reuse_address = False
+
+ socket_type = socket.SOCK_DGRAM
+
+ max_packet_size = 8192
+
+ def get_request(self):
+ data, client_addr = self.socket.recvfrom(self.max_packet_size)
+ return (data, self.socket), client_addr
+
+ def server_activate(self):
+ # No need to call listen() for UDP.
+ pass
+
+ def close_request(self, request):
+ # No need to close anything.
+ pass
+
+class ForkingMixIn:
+
+ """Mix-in class to handle each request in a new process."""
+
+ active_children = None
+ max_children = 40
+
+ def collect_children(self):
+ """Internal routine to wait for died children."""
+ while self.active_children:
+ if len(self.active_children) < self.max_children:
+ options = os.WNOHANG
+ else:
+ # If the maximum number of children are already
+ # running, block while waiting for a child to exit
+ options = 0
+ try:
+ pid, status = os.waitpid(0, options)
+ except os.error:
+ pid = None
+ if not pid: break
+ self.active_children.remove(pid)
+
+ def process_request(self, request, client_address):
+ """Fork a new subprocess to process the request."""
+ self.collect_children()
+ pid = os.fork()
+ if pid:
+ # Parent process
+ if self.active_children is None:
+ self.active_children = []
+ self.active_children.append(pid)
+ self.close_request(request)
+ return
+ else:
+ # Child process.
+ # This must never return, hence os._exit()!
+ try:
+ self.finish_request(request, client_address)
+ os._exit(0)
+ except:
+ try:
+ self.handle_error(request, client_address)
+ finally:
+ os._exit(1)
+
+
+class ThreadingMixIn:
+ """Mix-in class to handle each request in a new thread."""
+
+ # Decides how threads will act upon termination of the
+ # main process
+ daemon_threads = False
+
+ def process_request_thread(self, request, client_address):
+ """Same as in BaseServer but as a thread.
+
+ In addition, exception handling is done here.
+
+ """
+ try:
+ self.finish_request(request, client_address)
+ self.close_request(request)
+ except:
+ self.handle_error(request, client_address)
+ self.close_request(request)
+
+ def process_request(self, request, client_address):
+ """Start a new thread to process the request."""
+ import threading
+ t = threading.Thread(target = self.process_request_thread,
+ args = (request, client_address))
+ if self.daemon_threads:
+ t.setDaemon (1)
+ t.start()
+
+
+class ForkingUDPServer(ForkingMixIn, UDPServer): pass
+class ForkingTCPServer(ForkingMixIn, TCPServer): pass
+
+class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
+class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
+
+if hasattr(socket, 'AF_UNIX'):
+
+ class UnixStreamServer(TCPServer):
+ address_family = socket.AF_UNIX
+
+ class UnixDatagramServer(UDPServer):
+ address_family = socket.AF_UNIX
+
+ class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
+
+ class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
+
+class BaseRequestHandler:
+
+ """Base class for request handler classes.
+
+ This class is instantiated for each request to be handled. The
+ constructor sets the instance variables request, client_address
+ and server, and then calls the handle() method. To implement a
+ specific service, all you need to do is to derive a class which
+ defines a handle() method.
+
+ The handle() method can find the request as self.request, the
+ client address as self.client_address, and the server (in case it
+ needs access to per-server information) as self.server. Since a
+ separate instance is created for each request, the handle() method
+ can define arbitrary other instance variariables.
+
+ """
+
+ def __init__(self, request, client_address, server):
+ self.request = request
+ self.client_address = client_address
+ self.server = server
+ try:
+ self.setup()
+ self.handle()
+ self.finish()
+ finally:
+ sys.exc_traceback = None # Help garbage collection
+
+ def setup(self):
+ pass
+
+ def handle(self):
+ pass
+
+ def finish(self):
+ pass
+
+
+# The following two classes make it possible to use the same service
+# class for stream or datagram servers.
+# Each class sets up these instance variables:
+# - rfile: a file object from which receives the request is read
+# - wfile: a file object to which the reply is written
+# When the handle() method returns, wfile is flushed properly
+
+
+class StreamRequestHandler(BaseRequestHandler):
+
+ """Define self.rfile and self.wfile for stream sockets."""
+
+ # Default buffer sizes for rfile, wfile.
+ # We default rfile to buffered because otherwise it could be
+ # really slow for large data (a getc() call per byte); we make
+ # wfile unbuffered because (a) often after a write() we want to
+ # read and we need to flush the line; (b) big writes to unbuffered
+ # files are typically optimized by stdio even when big reads
+ # aren't.
+ rbufsize = -1
+ wbufsize = 0
+
+ def setup(self):
+ self.connection = self.request
+ self.rfile = self.connection.makefile('rb', self.rbufsize)
+ self.wfile = self.connection.makefile('wb', self.wbufsize)
+
+ def finish(self):
+ if not self.wfile.closed:
+ self.wfile.flush()
+ self.wfile.close()
+ self.rfile.close()
+
+
+class DatagramRequestHandler(BaseRequestHandler):
+
+ # XXX Regrettably, I cannot get this working on Linux;
+ # s.recvfrom() doesn't return a meaningful client address.
+
+ """Define self.rfile and self.wfile for datagram sockets."""
+
+ def setup(self):
+ try:
+ from cStringIO import StringIO
+ except ImportError:
+ from StringIO import StringIO
+ self.packet, self.socket = self.request
+ self.rfile = StringIO(self.packet)
+ self.wfile = StringIO()
+
+ def finish(self):
+ self.socket.sendto(self.wfile.getvalue(), self.client_address)
Added: trunk/jython/Lib/test/test_socketserver.py
===================================================================
--- trunk/jython/Lib/test/test_socketserver.py (rev 0)
+++ trunk/jython/Lib/test/test_socketserver.py 2010-10-03 17:54:49 UTC (rev 7130)
@@ -0,0 +1,218 @@
+# Test suite for SocketServer.py
+
+from test import test_support
+from test.test_support import (verbose, verify, TESTFN, TestSkipped,
+ reap_children)
+test_support.requires('network')
+
+from SocketServer import *
+import socket
+import errno
+import select
+import time
+import threading
+import os
+
+NREQ = 3
+DELAY = 0.5
+
+class MyMixinHandler:
+ def handle(self):
+ time.sleep(DELAY)
+ line = self.rfile.readline()
+ time.sleep(DELAY)
+ self.wfile.write(line)
+
+class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
+ pass
+
+class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
+ pass
+
+class MyMixinServer:
+ def serve_a_few(self):
+ for i in range(NREQ):
+ self.handle_request()
+ def handle_error(self, request, client_address):
+ self.close_request(request)
+ self.server_close()
+ raise
+
+teststring = "hello world\n"
+
+def receive(sock, n, timeout=20):
+ r, w, x = select.select([sock], [], [], timeout)
+ if sock in r:
+ return sock.recv(n)
+ else:
+ raise RuntimeError, "timed out on %r" % (sock,)
+
+def testdgram(proto, addr):
+ s = socket.socket(proto, socket.SOCK_DGRAM)
+ s.sendto(teststring, addr)
+ buf = data = receive(s, 100)
+ while data and '\n' not in buf:
+ data = receive(s, 100)
+ buf += data
+ verify(buf == teststring)
+ s.close()
+
+def teststream(proto, addr):
+ s = socket.socket(proto, socket.SOCK_STREAM)
+ s.connect(addr)
+ s.sendall(teststring)
+ buf = data = receive(s, 100)
+ while data and '\n' not in buf:
+ data = receive(s, 100)
+ buf += data
+ verify(buf == teststring)
+ s.close()
+
+class ServerThread(threading.Thread):
+ def __init__(self, addr, svrcls, hdlrcls):
+ threading.Thread.__init__(self)
+ self.__addr = addr
+ self.__svrcls = svrcls
+ self.__hdlrcls = hdlrcls
+ def run(self):
+ class svrcls(MyMixinServer, self.__svrcls):
+ pass
+ if verbose: print "thread: creating server"
+ svr = svrcls(self.__addr, self.__hdlrcls)
+ # pull the address out of the server in case it changed
+ # this can happen if another process is using the port
+ addr = svr.server_address
+ if addr:
+ self.__addr = addr
+ if self.__addr != svr.socket.getsockname():
+ raise RuntimeError('server_address was %s, expected %s' %
+ (self.__addr, svr.socket.getsockname()))
+ if verbose: print "thread: serving three times"
+ svr.serve_a_few()
+ if verbose: print "thread: done"
+
+seed = 0
+def pickport():
+ global seed
+ seed += 1
+ return 10000 + (os.getpid() % 1000)*10 + seed
+
+host = "localhost"
+testfiles = []
+def pickaddr(proto):
+ if proto == socket.AF_INET:
+ return (host, pickport())
+ else:
+ fn = TESTFN + str(pickport())
+ if os.name == 'os2':
+ # AF_UNIX socket names on OS/2 require a specific prefix
+ # which can't include a drive letter and must also use
+ # backslashes as directory separators
+ if fn[1] == ':':
+ fn = fn[2:]
+ if fn[0] in (os.sep, os.altsep):
+ fn = fn[1:]
+ fn = os.path.join('\socket', fn)
+ if os.sep == '/':
+ fn = fn.replace(os.sep, os.altsep)
+ else:
+ fn = fn.replace(os.altsep, os.sep)
+ testfiles.append(fn)
+ return fn
+
+def cleanup():
+ for fn in testfiles:
+ try:
+ os.remove(fn)
+ except os.error:
+ pass
+ testfiles[:] = []
+
+def testloop(proto, servers, hdlrcls, testfunc):
+ for svrcls in servers:
+ addr = pickaddr(proto)
+ if verbose:
+ print "ADDR =", addr
+ print "CLASS =", svrcls
+ t = ServerThread(addr, svrcls, hdlrcls)
+ if verbose: print "server created"
+ t.start()
+ if verbose: print "server running"
+ for i in range(NREQ):
+ time.sleep(DELAY)
+ if verbose: print "test client", i
+ testfunc(proto, addr)
+ if verbose: print "waiting for server"
+ t.join()
+ if verbose: print "done"
+
+class ForgivingTCPServer(TCPServer):
+ # prevent errors if another process is using the port we want
+ def server_bind(self):
+ host, default_port = self.server_address
+ # this code shamelessly stolen from test.test_support
+ # the ports were changed to protect the innocent
+ import sys
+ for port in [default_port, 3434, 8798, 23833]:
+ try:
+ self.server_address = host, port
+ TCPServer.server_bind(self)
+ break
+ except socket.error, (err, msg):
+ if err != errno.EADDRINUSE:
+ raise
+ print >>sys.__stderr__, \
+ ' WARNING: failed to listen on port %d, trying another' % port
+
+tcpservers = [ForgivingTCPServer, ThreadingTCPServer]
+if hasattr(os, 'fork') and os.name not in ('os2',):
+ tcpservers.append(ForkingTCPServer)
+udpservers = [UDPServer, ThreadingUDPServer]
+if hasattr(os, 'fork') and os.name not in ('os2',):
+ udpservers.append(ForkingUDPServer)
+
+if not hasattr(socket, 'AF_UNIX'):
+ streamservers = []
+ dgramservers = []
+else:
+ class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
+ streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
+ if hasattr(os, 'fork') and os.name not in ('os2',):
+ streamservers.append(ForkingUnixStreamServer)
+ class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
+ dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
+ if hasattr(os, 'fork') and os.name not in ('os2',):
+ dgramservers.append(ForkingUnixDatagramServer)
+
+def sloppy_cleanup():
+ # See http://python.org/sf/1540386
+ # We need to reap children here otherwise a child from one server
+ # can be left running for the next server and cause a test failure.
+ time.sleep(DELAY)
+ reap_children()
+
+def testall():
+ testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
+ sloppy_cleanup()
+ testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
+ if hasattr(socket, 'AF_UNIX'):
+ sloppy_cleanup()
+ testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
+ # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
+ # client address so this cannot work:
+ ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
+
+def test_main():
+ import imp
+ if imp.lock_held():
+ # If the import lock is held, the threads will hang.
+ raise TestSkipped("can't run when import lock is held")
+
+ try:
+ testall()
+ finally:
+ cleanup()
+ reap_children()
+
+if __name__ == "__main__":
+ test_main()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|