SF.net SVN: fclient: [315] trunk/sandbox/fcp2/test_fcp/dummy_socket.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <ju...@us...> - 2008-03-06 12:01:04
|
Revision: 315
http://fclient.svn.sourceforge.net/fclient/?rev=315&view=rev
Author: jurner
Date: 2008-03-06 04:01:06 -0800 (Thu, 06 Mar 2008)
Log Message:
-----------
just an intermediate, new TestIO object
Modified Paths:
--------------
trunk/sandbox/fcp2/test_fcp/dummy_socket.py
Modified: trunk/sandbox/fcp2/test_fcp/dummy_socket.py
===================================================================
--- trunk/sandbox/fcp2/test_fcp/dummy_socket.py 2008-03-06 12:00:08 UTC (rev 314)
+++ trunk/sandbox/fcp2/test_fcp/dummy_socket.py 2008-03-06 12:01:06 UTC (rev 315)
@@ -1,9 +1,122 @@
"""Dummy socket object for testing"""
import socket
+
+
+import os, sys
+
+#--> rel import hack
+class _RelImportHack(object):
+ def __init__(self, n):
+ fpath = os.path.abspath(__file__)
+ for i in xrange(n): fpath = os.path.dirname(fpath)
+ sys.path.insert(0, fpath)
+ def __del__(self): sys.path.pop(0)
+hack = _RelImportHack(3)
+
+from fcp2 import client
+from fcp2.client import iohandler
+from fcp2 import consts
+
+del hack
+#<-- rel import hack
#********************************************************************
#
#********************************************************************
+class TestIO(iohandler.IOObject):
+
+ def __init__(self, ):
+ self.readBuffer = '' # buffer client reads from
+ self.writeBuffer = '' # buffer client writes to
+
+ self._isOpen = False
+ self._isBroken = False
+ self._disallowConnect = False
+ self._reverseDirection = False
+
+ def connect(self, **kwargs):
+ self._isOpen = True
+
+ def read(self, n):
+ if self._isBroken:
+ raise iohandler.IOBroken('Broken')
+ if not self.isOpen():
+ raise iohandler.IOClosed('Closed')
+
+ if self._reverseDirection:
+ if not self.writeBuffer:
+ raise iohandler.IOTimeout('Timeout')
+ bytes, self.writeBuffer = self.writeBuffer[ :n], self.writeBuffer[n: ]
+ else:
+ if not self.readBuffer:
+ raise iohandler.IOTimeout('Timeout')
+ bytes, self.readBuffer = self.readBuffer[ :n], self.readBuffer[n: ]
+ return bytes
+
+ def write(self, bytes):
+ if self._isBroken:
+ raise iohandler.IOBroken('Broken')
+ if not self.isOpen():
+ raise iohandler.IOClosed('Closed')
+ self.writeBuffer += bytes
+
+ def close(self):
+ self._isBroken = False
+ self._disallowConnect = False
+ self._reverseDirection = False
+ if self.isOpen():
+ self._isOpen = False
+ self.readBuffer = ''
+ self.writeBuffer = ''
+ else:
+ raise iohandler.IOClosed('Closed')
+
+ def isOpen(self):
+ return self._isOpen
+
+ def setTimeout(self, n):
+ pass
+
+
+ ############################
+ ## for testing...
+
+ def setOpen(self, flag):
+ self._isOpen = flag
+
+ def setBroken(self, flag):
+ self._isBroken = flag
+
+ def setDissallowConnect(self, flag):
+ self.disallowConnect = flag
+
+ def setReverseDirection(self, flag):
+ """"""
+ self._reverseDirection = flag
+
+
+ def sendResponseMessage(self, name, data=None, **params):
+ buf = [name, ]
+ for name, value in params.items():
+ buf.append('%s=%s' % (name, value) )
+ if data is None:
+ buf.append('EndMessage\n')
+ else:
+ buf.append('Data\n')
+ self.readBuffer += '\n'.join(buf)
+ if data:
+ assert 'DataLength' in params
+ assert params['DataLength'] == len(data)
+ self.readBuffer += data
+
+
+
+
+
+
+#********************************************************************
+#
+#********************************************************************
class DummySocket(object):
"""Dummy socket for testing"""
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|