Thread: SF.net SVN: fclient: [315] trunk/sandbox/fcp2/test_fcp/dummy_socket.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <ju...@us...> - 2008-03-06 12:01:04
|
Revision: 315
http://fclient.svn.sourceforge.net/fclient/?rev=315&view=rev
Author: jurner
Date: 2008-03-06 04:01:06 -0800 (Thu, 06 Mar 2008)
Log Message:
-----------
just an intermediate, new TestIO object
Modified Paths:
--------------
trunk/sandbox/fcp2/test_fcp/dummy_socket.py
Modified: trunk/sandbox/fcp2/test_fcp/dummy_socket.py
===================================================================
--- trunk/sandbox/fcp2/test_fcp/dummy_socket.py 2008-03-06 12:00:08 UTC (rev 314)
+++ trunk/sandbox/fcp2/test_fcp/dummy_socket.py 2008-03-06 12:01:06 UTC (rev 315)
@@ -1,9 +1,122 @@
"""Dummy socket object for testing"""
import socket
+
+
+import os, sys
+
+#--> rel import hack
+class _RelImportHack(object):
+ def __init__(self, n):
+ fpath = os.path.abspath(__file__)
+ for i in xrange(n): fpath = os.path.dirname(fpath)
+ sys.path.insert(0, fpath)
+ def __del__(self): sys.path.pop(0)
+hack = _RelImportHack(3)
+
+from fcp2 import client
+from fcp2.client import iohandler
+from fcp2 import consts
+
+del hack
+#<-- rel import hack
#********************************************************************
#
#********************************************************************
+class TestIO(iohandler.IOObject):
+
+ def __init__(self, ):
+ self.readBuffer = '' # buffer client reads from
+ self.writeBuffer = '' # buffer client writes to
+
+ self._isOpen = False
+ self._isBroken = False
+ self._disallowConnect = False
+ self._reverseDirection = False
+
+ def connect(self, **kwargs):
+ self._isOpen = True
+
+ def read(self, n):
+ if self._isBroken:
+ raise iohandler.IOBroken('Broken')
+ if not self.isOpen():
+ raise iohandler.IOClosed('Closed')
+
+ if self._reverseDirection:
+ if not self.writeBuffer:
+ raise iohandler.IOTimeout('Timeout')
+ bytes, self.writeBuffer = self.writeBuffer[ :n], self.writeBuffer[n: ]
+ else:
+ if not self.readBuffer:
+ raise iohandler.IOTimeout('Timeout')
+ bytes, self.readBuffer = self.readBuffer[ :n], self.readBuffer[n: ]
+ return bytes
+
+ def write(self, bytes):
+ if self._isBroken:
+ raise iohandler.IOBroken('Broken')
+ if not self.isOpen():
+ raise iohandler.IOClosed('Closed')
+ self.writeBuffer += bytes
+
+ def close(self):
+ self._isBroken = False
+ self._disallowConnect = False
+ self._reverseDirection = False
+ if self.isOpen():
+ self._isOpen = False
+ self.readBuffer = ''
+ self.writeBuffer = ''
+ else:
+ raise iohandler.IOClosed('Closed')
+
+ def isOpen(self):
+ return self._isOpen
+
+ def setTimeout(self, n):
+ pass
+
+
+ ############################
+ ## for testing...
+
+ def setOpen(self, flag):
+ self._isOpen = flag
+
+ def setBroken(self, flag):
+ self._isBroken = flag
+
+ def setDissallowConnect(self, flag):
+ self.disallowConnect = flag
+
+ def setReverseDirection(self, flag):
+ """"""
+ self._reverseDirection = flag
+
+
+ def sendResponseMessage(self, name, data=None, **params):
+ buf = [name, ]
+ for name, value in params.items():
+ buf.append('%s=%s' % (name, value) )
+ if data is None:
+ buf.append('EndMessage\n')
+ else:
+ buf.append('Data\n')
+ self.readBuffer += '\n'.join(buf)
+ if data:
+ assert 'DataLength' in params
+ assert params['DataLength'] == len(data)
+ self.readBuffer += data
+
+
+
+
+
+
+#********************************************************************
+#
+#********************************************************************
class DummySocket(object):
"""Dummy socket for testing"""
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ju...@us...> - 2008-03-06 12:05:47
|
Revision: 318
http://fclient.svn.sourceforge.net/fclient/?rev=318&view=rev
Author: jurner
Date: 2008-03-06 04:05:53 -0800 (Thu, 06 Mar 2008)
Log Message:
-----------
removed dummy socket. Use io now
Modified Paths:
--------------
trunk/sandbox/fcp2/test_fcp/dummy_socket.py
Modified: trunk/sandbox/fcp2/test_fcp/dummy_socket.py
===================================================================
--- trunk/sandbox/fcp2/test_fcp/dummy_socket.py 2008-03-06 12:05:35 UTC (rev 317)
+++ trunk/sandbox/fcp2/test_fcp/dummy_socket.py 2008-03-06 12:05:53 UTC (rev 318)
@@ -1,8 +1,5 @@
"""Dummy socket object for testing"""
-import socket
-
-
import os, sys
#--> rel import hack
@@ -91,7 +88,6 @@
self.disallowConnect = flag
def setReverseDirection(self, flag):
- """"""
self._reverseDirection = flag
@@ -109,94 +105,4 @@
assert params['DataLength'] == len(data)
self.readBuffer += data
-
-
-
-
-#********************************************************************
-#
-#********************************************************************
-class DummySocket(object):
- """Dummy socket for testing"""
-
- def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
-
- self.closed = False
- self.error = False
- self.timeout = 0
- self.bytesReceiver = None
-
- self.bytes = ''
- self.responseBytes = ''
-
- def setBytes(self, bytes):
- self.bytes = bytes
-
- def setResponse(self, bytes):
- self.responseBytes += bytes
-
- def sendResponseMessage(self, name, data=None, **params):
- buf = [name, ]
- for name, value in params.items():
- buf.append('%s=%s' % (name, value) )
- if data is None:
- buf.append('EndMessage\n')
- else:
- buf.append('Data\n')
- self.responseBytes += '\n'.join(buf)
- if data:
- assert 'DataLength' in params
- assert params['DataLength'] == len(data)
- self.responseBytes += data
-
-
- def setBytesReceiver(self, cb):
- self.bytesReceiver = cb
-
- def setClosed(self, flag):
- self.closed = flag
-
- def setError(self, flag):
- self.error = flag
-
- def __call__(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
- self.bytes = ''
- self.closed = False
- self.error = False
- return self
-
-
- def connect(self, (host, port)):
- if self.error:
- raise socket.error(0, 'I am dead')
-
- def close(self):
- self.closed = True
- self.bytes = ''
-
- def recv(self, n):
- if self.closed:
- return ''
- elif self.error:
- raise socket.error(0, 'I am dead!')
- self.responseBytes, bytes = self.responseBytes[n:], self.responseBytes[:n]
- if bytes:
- return bytes
- raise socket.timeout(0)
-
- def sendall(self, bytes):
- if self.closed:
- raise socket.error(0, 'I am dead!')
- self.bytes += bytes
- if self.bytesReceiver is not None:
- self.bytesReceiver(bytes)
-
-
- def settimeout(self, n):
- self.timeout = n
-
-
-
-
-
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|