From: <am...@us...> - 2007-06-17 17:16:12
|
Revision: 3251 http://svn.sourceforge.net/jython/?rev=3251&view=rev Author: amak Date: 2007-06-17 10:16:10 -0700 (Sun, 17 Jun 2007) Log Message: ----------- 1. Added support for selecting on unconnected sockets 2. Added some unit tests for poll objects. Modified Paths: -------------- trunk/sandbox/kennedya/asynch_sockets/select.py trunk/sandbox/kennedya/asynch_sockets/socket.py trunk/sandbox/kennedya/asynch_sockets/test/test_select.py Modified: trunk/sandbox/kennedya/asynch_sockets/select.py =================================================================== --- trunk/sandbox/kennedya/asynch_sockets/select.py 2007-06-07 02:35:42 UTC (rev 3250) +++ trunk/sandbox/kennedya/asynch_sockets/select.py 2007-06-17 17:16:10 UTC (rev 3251) @@ -7,6 +7,8 @@ import java.nio.channels.Selector from java.nio.channels.SelectionKey import OP_ACCEPT, OP_CONNECT, OP_WRITE, OP_READ +import socket + class error(Exception): pass POLLIN = 1 @@ -18,19 +20,18 @@ def __init__(self): self.selector = java.nio.channels.Selector.open() self.chanmap = {} + self.unconnected_sockets = [] - def _getselectable(self, userobject): - if isinstance(userobject, java.nio.channels.SelectableChannel): - return userobject - else: - if hasattr(userobject, 'fileno') and callable(getattr(userobject, 'fileno')): - result = getattr(userobject, 'fileno')() - if isinstance(result, java.nio.channels.SelectableChannel): - return result - raise error("Object '%s' is not a watchable channel" % userobject, 10038) + def _getselectable(self, socket_object): + for st in socket.SocketTypes: + if isinstance(socket_object, st): + try: + return socket_object.getchannel() + except: + return None + raise error("Object '%s' is not watchable" % socket_object, 10038) - def register(self, userobject, mask): - channel = self._getselectable(userobject) + def _register_channel(self, socket_object, channel, mask): jmask = 0 if mask & POLLIN: # Note that OP_READ is NOT a valid event on server socket channels. @@ -43,10 +44,29 @@ if channel.validOps() & OP_CONNECT: jmask |= OP_CONNECT selectionkey = channel.register(self.selector, jmask) - self.chanmap[channel] = (userobject, selectionkey) + self.chanmap[channel] = (socket_object, selectionkey) - def unregister(self, userobject): - channel = self._getselectable(userobject) + def _check_unconnected_sockets(self): + temp_list = [] + for socket_object, mask in self.unconnected_sockets: + channel = self._getselectable(socket_object) + if channel is not None: + self._register_channel(socket_object, channel, mask) + else: + temp_list.append( (socket_object, mask) ) + self.unconnected_sockets = temp_list + + def register(self, socket_object, mask): + channel = self._getselectable(socket_object) + if channel is None: + # The socket is not yet connected, and thus has no channel + # Add it to a pending list, and return + self.unconnected_sockets.append( (socket_object, mask) ) + return + self._register_channel(socket_object, channel, mask) + + def unregister(self, socket_object): + channel = self._getselectable(socket_object) self.chanmap[channel][1].cancel() del self.chanmap[channel] @@ -62,6 +82,7 @@ return self.selector.selectedKeys() def poll(self, timeout=None): + self._check_unconnected_sockets() selectedkeys = self._dopoll(timeout) results = [] for k in selectedkeys.iterator(): @@ -72,7 +93,6 @@ if jmask & OP_ACCEPT: pymask |= POLLIN if jmask & OP_CONNECT: pymask |= POLLOUT # Now return the original userobject, and the return event mask - # A python 2.2 generator would be sweet here results.append( (self.chanmap[k.channel()][0], pymask) ) return results Modified: trunk/sandbox/kennedya/asynch_sockets/socket.py =================================================================== --- trunk/sandbox/kennedya/asynch_sockets/socket.py 2007-06-07 02:35:42 UTC (rev 3250) +++ trunk/sandbox/kennedya/asynch_sockets/socket.py 2007-06-17 17:16:10 UTC (rev 3251) @@ -224,7 +224,7 @@ __all__ = [ 'AF_INET', 'SO_REUSEADDR', 'SOCK_DGRAM', 'SOCK_RAW', 'SOCK_RDM', 'SOCK_SEQPACKET', 'SOCK_STREAM', 'SOL_SOCKET', - 'SocketType', 'error', 'herror', 'gaierror', 'timeout', + 'SocketType', 'SocketTypes', 'error', 'herror', 'gaierror', 'timeout', 'getfqdn', 'gethostbyaddr', 'gethostbyname', 'gethostname', 'socket', 'getaddrinfo', 'getdefaulttimeout', 'setdefaulttimeout', 'has_ipv6', 'htons', 'htonl', 'ntohs', 'ntohl', @@ -368,10 +368,11 @@ def getchannel(self): if not self.sock_impl: - raise error("No channel for indeterminate socket") - if hasattr(self.sock_impl, 'getchannel'): - return self.sock_impl.getchannel() - raise error('Operation not implemented on this JVM') + return None + return self.sock_impl.getchannel() +# if hasattr(self.sock_impl, 'getchannel'): +# return self.sock_impl.getchannel() +# raise error('Operation not implemented on this JVM') fileno = getchannel @@ -732,6 +733,7 @@ return None SocketType = _tcpsocket +SocketTypes = [_tcpsocket, _udpsocket] # Define the SSL support Modified: trunk/sandbox/kennedya/asynch_sockets/test/test_select.py =================================================================== --- trunk/sandbox/kennedya/asynch_sockets/test/test_select.py 2007-06-07 02:35:42 UTC (rev 3250) +++ trunk/sandbox/kennedya/asynch_sockets/test/test_select.py 2007-06-17 17:16:10 UTC (rev 3251) @@ -105,10 +105,7 @@ class TestSelectClientSocket(unittest.TestCase): - def testUnopenedSocket(self): - # This one passes on cpython - # But fails on jython, because of the deferred creation of impl sockets - if sys.platform[:4] == 'java': return + def testUnconnectedSocket(self): sockets = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for x in range(5)] for pos in range(2): # OOB not supported on Java args = [[], [], []] @@ -119,6 +116,52 @@ self.failIf(s in rfd) self.failIf(s in wfd) +def check_server_running_on_localhost_port(port_number): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.connect( ('localhost', port_number) ) + s.close() + except: + return 0 + return 1 + +class TestPollClientSocket(unittest.TestCase): + + def testSocketRegisteredBeforeConnected(self): + # You MUST be running a server on port 80 for this one to work + if not check_server_running_on_localhost_port(80): + print "Unable to run testSocketRegisteredBeforeConnected: no server on port 80" + return + sockets = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for x in range(5)] + timeout = 1 # Can't wait forever + poll_object = select.poll() + for s in sockets: + # Register the sockets before they are connected + poll_object.register(s, select.POLLOUT) + result_list = poll_object.poll(timeout) + result_sockets = [r[0] for r in result_list] + for s in sockets: + self.failIf(s in result_sockets) + # Now connect the sockets, but DO NOT register them again + for s in sockets: + s.setblocking(0) + s.connect( ('localhost', 80) ) + # Now poll again, to see if the poll object has recognised that the sockets are now connected + result_list = poll_object.poll(timeout) + result_sockets = [r[0] for r in result_list] + for s in sockets: + self.failUnless(s in result_sockets) + + def testUnregisterRaisesKeyError(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + poll_object = select.poll() + try: + poll_object.unregister(s) + except KeyError: + pass + else: + self.fail("Unregistering socket that is not registered should have raised KeyError") + class TestPipes(unittest.TestCase): verbose = 1 @@ -154,6 +197,7 @@ tests = [ TestSelectInvalidParameters, TestSelectClientSocket, + TestPollClientSocket, ] if sys.platform[:4] != 'java': tests.append(TestPipes) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |