Thread: SF.net SVN: fclient: [315] trunk/sandbox/fcp2/test_fcp/dummy_socket.py
Status: Pre-Alpha
Brought to you by:
jurner
From: <ju...@us...> - 2008-03-06 12:01:04
|
Revision: 315 http://fclient.svn.sourceforge.net/fclient/?rev=315&view=rev Author: jurner Date: 2008-03-06 04:01:06 -0800 (Thu, 06 Mar 2008) Log Message: ----------- just an intermediate, new TestIO object Modified Paths: -------------- trunk/sandbox/fcp2/test_fcp/dummy_socket.py Modified: trunk/sandbox/fcp2/test_fcp/dummy_socket.py =================================================================== --- trunk/sandbox/fcp2/test_fcp/dummy_socket.py 2008-03-06 12:00:08 UTC (rev 314) +++ trunk/sandbox/fcp2/test_fcp/dummy_socket.py 2008-03-06 12:01:06 UTC (rev 315) @@ -1,9 +1,122 @@ """Dummy socket object for testing""" import socket + + +import os, sys + +#--> rel import hack +class _RelImportHack(object): + def __init__(self, n): + fpath = os.path.abspath(__file__) + for i in xrange(n): fpath = os.path.dirname(fpath) + sys.path.insert(0, fpath) + def __del__(self): sys.path.pop(0) +hack = _RelImportHack(3) + +from fcp2 import client +from fcp2.client import iohandler +from fcp2 import consts + +del hack +#<-- rel import hack #******************************************************************** # #******************************************************************** +class TestIO(iohandler.IOObject): + + def __init__(self, ): + self.readBuffer = '' # buffer client reads from + self.writeBuffer = '' # buffer client writes to + + self._isOpen = False + self._isBroken = False + self._disallowConnect = False + self._reverseDirection = False + + def connect(self, **kwargs): + self._isOpen = True + + def read(self, n): + if self._isBroken: + raise iohandler.IOBroken('Broken') + if not self.isOpen(): + raise iohandler.IOClosed('Closed') + + if self._reverseDirection: + if not self.writeBuffer: + raise iohandler.IOTimeout('Timeout') + bytes, self.writeBuffer = self.writeBuffer[ :n], self.writeBuffer[n: ] + else: + if not self.readBuffer: + raise iohandler.IOTimeout('Timeout') + bytes, self.readBuffer = self.readBuffer[ :n], self.readBuffer[n: ] + return bytes + + def write(self, bytes): + if self._isBroken: + raise iohandler.IOBroken('Broken') + if not self.isOpen(): + raise iohandler.IOClosed('Closed') + self.writeBuffer += bytes + + def close(self): + self._isBroken = False + self._disallowConnect = False + self._reverseDirection = False + if self.isOpen(): + self._isOpen = False + self.readBuffer = '' + self.writeBuffer = '' + else: + raise iohandler.IOClosed('Closed') + + def isOpen(self): + return self._isOpen + + def setTimeout(self, n): + pass + + + ############################ + ## for testing... + + def setOpen(self, flag): + self._isOpen = flag + + def setBroken(self, flag): + self._isBroken = flag + + def setDissallowConnect(self, flag): + self.disallowConnect = flag + + def setReverseDirection(self, flag): + """""" + self._reverseDirection = flag + + + def sendResponseMessage(self, name, data=None, **params): + buf = [name, ] + for name, value in params.items(): + buf.append('%s=%s' % (name, value) ) + if data is None: + buf.append('EndMessage\n') + else: + buf.append('Data\n') + self.readBuffer += '\n'.join(buf) + if data: + assert 'DataLength' in params + assert params['DataLength'] == len(data) + self.readBuffer += data + + + + + + +#******************************************************************** +# +#******************************************************************** class DummySocket(object): """Dummy socket for testing""" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ju...@us...> - 2008-03-06 12:05:47
|
Revision: 318 http://fclient.svn.sourceforge.net/fclient/?rev=318&view=rev Author: jurner Date: 2008-03-06 04:05:53 -0800 (Thu, 06 Mar 2008) Log Message: ----------- removed dummy socket. Use io now Modified Paths: -------------- trunk/sandbox/fcp2/test_fcp/dummy_socket.py Modified: trunk/sandbox/fcp2/test_fcp/dummy_socket.py =================================================================== --- trunk/sandbox/fcp2/test_fcp/dummy_socket.py 2008-03-06 12:05:35 UTC (rev 317) +++ trunk/sandbox/fcp2/test_fcp/dummy_socket.py 2008-03-06 12:05:53 UTC (rev 318) @@ -1,8 +1,5 @@ """Dummy socket object for testing""" -import socket - - import os, sys #--> rel import hack @@ -91,7 +88,6 @@ self.disallowConnect = flag def setReverseDirection(self, flag): - """""" self._reverseDirection = flag @@ -109,94 +105,4 @@ assert params['DataLength'] == len(data) self.readBuffer += data - - - - -#******************************************************************** -# -#******************************************************************** -class DummySocket(object): - """Dummy socket for testing""" - - def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM): - - self.closed = False - self.error = False - self.timeout = 0 - self.bytesReceiver = None - - self.bytes = '' - self.responseBytes = '' - - def setBytes(self, bytes): - self.bytes = bytes - - def setResponse(self, bytes): - self.responseBytes += bytes - - def sendResponseMessage(self, name, data=None, **params): - buf = [name, ] - for name, value in params.items(): - buf.append('%s=%s' % (name, value) ) - if data is None: - buf.append('EndMessage\n') - else: - buf.append('Data\n') - self.responseBytes += '\n'.join(buf) - if data: - assert 'DataLength' in params - assert params['DataLength'] == len(data) - self.responseBytes += data - - - def setBytesReceiver(self, cb): - self.bytesReceiver = cb - - def setClosed(self, flag): - self.closed = flag - - def setError(self, flag): - self.error = flag - - def __call__(self, family=socket.AF_INET, type=socket.SOCK_STREAM): - self.bytes = '' - self.closed = False - self.error = False - return self - - - def connect(self, (host, port)): - if self.error: - raise socket.error(0, 'I am dead') - - def close(self): - self.closed = True - self.bytes = '' - - def recv(self, n): - if self.closed: - return '' - elif self.error: - raise socket.error(0, 'I am dead!') - self.responseBytes, bytes = self.responseBytes[n:], self.responseBytes[:n] - if bytes: - return bytes - raise socket.timeout(0) - - def sendall(self, bytes): - if self.closed: - raise socket.error(0, 'I am dead!') - self.bytes += bytes - if self.bytesReceiver is not None: - self.bytesReceiver(bytes) - - - def settimeout(self, n): - self.timeout = n - - - - - This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |