SF.net SVN: fclient: [310] trunk/sandbox/fcp2/iohandler.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <ju...@us...> - 2008-03-06 11:57:29
|
Revision: 310
http://fclient.svn.sourceforge.net/fclient/?rev=310&view=rev
Author: jurner
Date: 2008-03-06 03:57:34 -0800 (Thu, 06 Mar 2008)
Log Message:
-----------
separated a module to handle message io
Added Paths:
-----------
trunk/sandbox/fcp2/iohandler.py
Added: trunk/sandbox/fcp2/iohandler.py
===================================================================
--- trunk/sandbox/fcp2/iohandler.py (rev 0)
+++ trunk/sandbox/fcp2/iohandler.py 2008-03-06 11:57:34 UTC (rev 310)
@@ -0,0 +1,385 @@
+
+
+import os, sys
+import logging
+import socket
+import time
+
+
+#--> rel import hack
+class _RelImportHack(object):
+ def __init__(self, n):
+ fpath = os.path.abspath(__file__)
+ for i in xrange(n): fpath = os.path.dirname(fpath)
+ sys.path.insert(0, fpath)
+ def __del__(self): sys.path.pop(0)
+hack = _RelImportHack(2)
+
+from fcp2 import consts
+from fcp2 import message
+from fcp2 import types
+
+del hack
+#<-- rel import hack
+
+logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
+logging.addLevelName(consts.DebugVerbosity.Quiet, '')
+#*****************************************************************************
+#
+#*****************************************************************************
+class MessageParseError(Exception):
+ """Exception raised when a message could not be parsed succesfuly"""
+
+class IOConnectFailed(Exception):
+ """Exception raised if the object can not be connected"""
+
+class IOClosed(Exception):
+ """Exception raised if the object is closed"""
+
+class IOBroken(Exception):
+ """Exception raised if the IO connection is broken"""
+
+class IOTimeout(Exception):
+ """Exception raised when the io connection is closed"""
+
+
+class IOObject(object):
+ Timeout = 0
+ BufferSize = 4096
+
+ def __init__(self, ):
+ pass
+
+ def connect(self, **kwargs):
+ raise IOConnectFailed('Failed')
+
+ def read(self, n):
+ raise IOBroken('Broken')
+
+ def write(self, bytes):
+ raise IOBroken('Broken')
+
+ def close(self):
+ raise IOClosed('Closed')
+
+ def isOpen(self):
+ return False
+
+ def setTimeout(self, n):
+ pass
+
+
+#*****************************************************************************
+#
+#*****************************************************************************
+class SocketIO(IOObject):
+ Timeout = 0.1
+ BufferSize = 4096
+
+ def __init__(self):
+ self.socket = None
+
+ def connect(self, **kwargs):
+ if self.isOpen():
+ self.close()
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ self.socket.connect((kwargs['host'], kwargs['port']))
+ except socket.error, details:
+ raise IOConnectFailed(details)
+
+ def read(self, n):
+ try:
+ p = self.socket.recv(n)
+ if p == '':
+ raise socket.error('Socket closed by host')
+ except socket.timeout, details:
+ raise IOTimeout(details)
+ except socket.error, details:
+ self.close()
+ raise IOBroken(details)
+ else:
+ return p
+
+ def write(self, bytes):
+ try:
+ self.socket.sendall(bytes)
+ except socket.error, details:
+ self.close()
+ raise IOBroken(details)
+
+ def close(self):
+ if self.socket is None:
+ raise IOClosed('Closed')
+ self.socket.close()
+ self.socket = None
+
+ def isOpen(self):
+ return self.socket is not None
+
+ def setTimeout(self, n):
+ self.socket.settimeout(n)
+
+#*****************************************************************************
+#
+#*****************************************************************************
+class IOHandler(object):
+
+ TerminatorEndMessage = '\nEndMessage\n'
+ TerminatorData = '\nData\n'
+
+
+ def __init__(self, ioPrototype=SocketIO):
+ self._ioPrototype = ioPrototype
+ self._log = logging.getLogger(consts.LoggerNames.ClientIOHandler)
+ self._receiveBuffer = ''
+
+ self.io = None
+
+ def connect(self, duration=20, timeout=0.5, **kwargs):
+ for result in self.iterConnect(duration=duration, timeout=timeout, **kwargs): pass
+ return result
+
+
+ def iterConnect(self, duration=20, timeout=0.5, **kwargs):
+
+ if not duration >= 0:
+ raise ValueError('duration must be >= 0')
+ if not timeout >= 0:
+ raise ValueError('timeout must be >= 0')
+
+ if self.isOpen():
+ self.close()
+
+ timeElapsed = 0
+ while timeElapsed <= duration:
+
+ self._log.info(consts.LogMessages.Connecting + ' %r' % kwargs)
+ self.io = self._ioPrototype()
+ try:
+ self.io.connect(**kwargs)
+ except IOConnectFailed, details:
+ self._log.info(consts.LogMessages.ConnectingFailed + ' %s %s' % (IOConnectFailed, details))
+ yield False
+ else:
+ self.io.setTimeout(self.io.Timeout)
+ self._log.info(consts.LogMessages.Connected)
+ yield True
+ break
+
+ # continue polling
+ self._log.info(consts.LogMessages.Retry)
+ timeElapsed += timeout
+ time.sleep(timeout)
+
+ raise StopIteration
+
+
+ def close(self):
+ self._log.debug(consts.LogMessages.Closing)
+ self._receiveBuffer = ''
+ if self.io is not None and self.io.isOpen():
+ self.io.close()
+ self.io = None
+
+
+ def isOpen(self):
+ if self.io is not None:
+ return self.io.isOpen()
+ return False
+
+
+ def readBytes(self, n):
+ """Reads n bytes from socket
+ @param n: (int) number of bytes to read
+ @return: (tuple) (error-message, bytes-read). If no error was encountered, error-message will be None
+ """
+ try:
+ #TODO: if \r\n is possible in Fcp, replace it by \n
+ p = self.io.read(n)
+ if not p:
+ raise ValueError('No bytes received and IO did not raise as expected?!?')
+ self._receiveBuffer += p
+ except IOBroken, details:
+ self.close()
+ self._log.critical(consts.LogMessages.SocketDied)
+ return IOBroken, details
+ except IOTimeout, details: # nothing in the queue
+ return IOTimeout, details
+ return None, None
+
+
+ def readMessage(self):
+ """Reads the next a message from io
+ @return: (Message) next message from the socket
+
+ @note: if something goes wrong the according exception is raised
+ """
+ # read message from io
+ #
+ #NOTE: messages carying data may end with 'EndMessage' or 'Data'.
+ # This is a bit messed up in Fcp. We assume here all messages from the
+ # node end with "Data" if data is passed. Alternative would be to check for both
+ # and rely on the 'DataLength' member to indicate if data is included. This
+ # should work for all messages except 'DataFound'
+ hasData = False
+ eof = -1
+ while eof < 0:
+ eof = self._receiveBuffer.find(self.TerminatorEndMessage)
+ if eof > -1:
+ eof += len(self.TerminatorEndMessage)
+ else:
+ eof = self._receiveBuffer.find(self.TerminatorData)
+ if eof > -1:
+ eof += len(self.TerminatorData)
+ hasData = True
+ if eof < 0:
+ exception, details = self.readBytes(self.io.BufferSize)
+ if exception is not None:
+ raise exception(details)
+
+ # prep message
+ chunk, self._receiveBuffer = self._receiveBuffer[ :eof], self._receiveBuffer[eof: ]
+ p = [i for i in chunk.split('\n') if i] # Fcp ignores empty lines, so do we
+ p.pop()
+ if not p:
+ raise MessageParseError('Missing message name')
+ msgName = p.pop(0)
+ msg = message.Message(msgName)
+ paramTypes = types.MessageParamTypes.get(msgName, None)
+
+ # process param --> value fields
+ #
+ #NOTE:usually if data is passed DataLength determines how much have to handle
+ # special case ClientPutComplexDir where it is passed in Files.(N).DataLength.
+ # Additionally Files.(N).DataLength is converted to int here.
+ clientPutComplexDirDataLength = 0
+ isClientPutComplexDir = msgName == consts.Message.ClientPutComplexDir
+ for line in p:
+ paramName, sep, paramValue = line.partition('=')
+
+ # covert fcp to python value if necessary
+ if paramTypes is not None:
+ paramType = paramTypes.get(paramName, None)
+ if paramType is not None:
+ paramValue = paramType.fcpToPython(paramValue)
+ msg[paramName] = paramValue
+
+ # handle special case PutComplexDir
+ if isClientPutComplexDir:
+ tmp_paramName = paramName.split('.')
+ if len(tmp_paramName) == 3:
+ if tmp_paramName[-1] == 'DataLength':
+ n = types.FcpTypeInt.fcpToPython(paramValue)
+ clientPutComplexDirDataLength += n
+ msg[paramName] = n
+
+ # get associated data if necessary
+ if hasData:
+ if isClientPutComplexDir:
+ n = clientPutComplexDirDataLength
+ else:
+ n = msg['DataLength']
+ if n > 0:
+ while self._receiveBuffer:
+ if len(self._receiveBuffer) >= n:
+ msg.data, self._receiveBuffer = self._receiveBuffer[ :n], self._receiveBuffer[n: ]
+ break
+
+ exception, details = self.readBytes(self.io.BufferSize)
+ if exception == IOTimeout: # try again later
+ self._receiveBuffer = chunk + self._receiveBuffer
+ elif exception is not None:
+ raise exception(details)
+
+ self._log.debug(consts.LogMessages.Received + msg.pprint())
+ return msg
+
+
+
+ def sendMessage(self, name, data=None, **params):
+ """Sends a message to freenet
+ @param name: name of the message to send
+ @param data: data to atatch to the message
+ @param params: {para-name: param-calue, ...} of parameters to pass along
+ with the message (see freenet protocol)
+ @raise SocketError: if the socket connection to the node dies unexpectedly
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
+
+ @note: you can use this method to send a message to the node, bypassing all
+ track keeping methods of the client
+ """
+ return self.sendMessageEx(message.Message(name, data=data, **params))
+
+
+ def sendMessageEx(self, msg):
+ """Sends a message to freenet
+ @param msg: (Message) message to send
+ @return: Message
+ @raise SocketError: if the socket connection to the node dies unexpectedly.
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
+
+ @note: you can use this method to send a message to the node, bypassing all
+ track keeping methods of the client
+ """
+ self._log.debug(consts.LogMessages.Sending + msg.pprint())
+ try:
+ self.io.write(msg.toString())
+ except IOBroken, details:
+ self.close()
+ self._log.critical(consts.LogMessages.SocketDied)
+ raise IOBroken(details)
+
+
+ def setDebugVerbosity(self, debugVerbosity):
+ self._log.setLevel(debugVerbosity)
+
+
+ def setIOPrototype(self, ioObject):
+ """
+ @note: if the connection is open the connection is closed in the call
+ """
+ if self.isOpen():
+ self.close()
+
+ if ioObject.BufferSize <= 0:
+ raise ValueError('IOObject.BufferSize must be > 0')
+ self._ioPrototype = ioObject
+
+
+#***********************************************************************************************
+#
+#***********************************************************************************************
+if __name__ == '__main__':
+
+ c = IOHandler()
+ def cb(event, *params):
+ print event
+ #print event == c.events.MessageReceived
+ #print event.msg
+
+
+ #for e in c.events:
+ # e += cb
+
+ if c.connect(duration=1, host='127.0.0.1', port=9481):
+
+
+ c.sendMessageEx(message.Message(consts.Message.ClientHello, Name='foo', ExpectedVersion="2,0"))
+ msg = c.readMessage()
+
+
+ #print 222, c.nextMessage()
+
+ c.close()
+
+
+
+
+
+
+
+
+
\ No newline at end of file
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|