Thread: SF.net SVN: fclient: [5] trunk/fclient/fclient_lib/fcp/fcp20.py
Status: Pre-Alpha
Brought to you by:
jurner
From: <ju...@us...> - 2007-10-16 10:19:40
|
Revision: 5 http://fclient.svn.sourceforge.net/fclient/?rev=5&view=rev Author: jurner Date: 2007-10-16 03:19:24 -0700 (Tue, 16 Oct 2007) Log Message: ----------- continued working on FcpClient Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp20.py Modified: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-15 17:42:24 UTC (rev 4) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 10:19:24 UTC (rev 5) @@ -1,9 +1,15 @@ - + +import atexit +import logging import os import socket +import subprocess +import sys import time import thread import uuid + +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) #************************************************************** # consts #************************************************************** @@ -13,7 +19,6 @@ DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) except: pass SocketTimeout = 0.1 -KeyTypes = ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@') class JobIdentifiers: # fixed job identifiers @@ -200,6 +205,13 @@ fp.close() return read +def saveRemoveFile(fpath): + if fpath is not None: + if os.path.isfile(fpath): + os.remove(fpath) + return True + return False + def saveWriteFile(fpath, data): """Writes data to a file i the savest manner possible @@ -221,6 +233,22 @@ fp.close() return written +def startFreenet(cmdline): + """Starts freenet + @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat') + @return: (string) whatever freenet returns + """ + #TODO: on windows it may be necessary to hide the comand window + p = subprocess.Popen( + args=cmdline, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = p.communicate() + return stdout + + def fcpBool(pythonBool): """Converts a python bool to a fcp bool @param pythonBool: (bool) @@ -237,6 +265,95 @@ """ return fcpBool == 'true' + +def uriToList(uri): + """Splits a freenet uri into its components + @param uri: (str) uri to split + @return: (list) or components + @note: additional dexoration like 'freenet:' or 'http://blah' are ignored + + >>> uriToList('SSK@foo') + ['SSK@foo'] + + >>> uriToList('SSK@foo/bar/baz') + ['SSK@foo', 'bar', 'baz'] + + >>> uriToList('freenet:SSK@foo') + ['SSK@foo'] + + >>> uriToList('http://foo/SSK@foo') + ['SSK@foo'] + + >>> uriToList('http://foo') + [] + + """ + if uri.startswith('freenet:'): + uri = uri[len('freenet:'): ] + + components = [] + head = uri + while head: + head, tail = posixpath.split(head) + components.append(tail) + + components.reverse() + + while components: + if components[0][:4] in FcpKeys.KeysAll: + break + else: + components.pop(0) + + return components + +def uriKeyType(uri): + pass + + +def isValidUri(uri): + """Checks if an uri is a valid freenet uri + @param uri: (str) uri to check + @reuturn: (bool) + """ + return bool(UriToList(uri)) + + +def splitUri(uri): + """Splits an uri into uri and filename + @param uri: uri to split + @return: tuple(uri, filename) + + >>> splitUri('SSK@foo/bar/baz') + ('SSK@foo', 'bar/baz') + + >>> splitUri('SSK@foo') + ('SSK@foo', '') + + >>> splitUri('NoUri') + ('NoUri', '') + + """ + L = uriToList(uri) + if not L: + return (uri, '') + names = L[1:] + if not names: + name = '' + else: + name = '/'.join(names) + return (L[0], name) + + +def fileNameFromUri(uri): + """Returns the filename part of an uri + @return: (str) filename. If no filename is found the uri is returned unchanged + """ + tmp_uri, name = splitUri(uri) + if name: + return name + return uri + #********************************************************************** # classes #********************************************************************** @@ -303,7 +420,6 @@ #************************************************************************** # jobs #************************************************************************** -#TODO: do somrthing that this class does not lock the queue class JobBase(object): """Base class for jobs""" @@ -311,18 +427,28 @@ def __init__(self, fcpClient, identifier, message): - self.fcpClient = fcpClient - self.fcpIdentifier = identifier - self.fcpMessage = message - self.fcpResult = None - self.fcpTime = 0 + self.fcpClient = fcpClient # FcpClient() instance the job belongs to + self.fcpError = None # last error (either this is set or dcpResult) + self.fcpIdentifier = identifier # + self.fcpMessage = message # message send to node + self.fcpResult = None # job result + self.fcpTime = 0 # start time (will hld duration whern the job is complte) + def displayName(self): + return 'JobBase' + def start(self): self.fcpTime = time.time() self.fcpClient.sendMessageEx(self.fcpMessage) + def error(self, msg): + self.fcpTime = time.time() - self.fcpTime + self.fcpError = msg + self.fcpResult = None + def stop(self, result): self.fcpTime = time.time() - self.fcpTime + self.fcpError = None self.fcpResult = result @@ -338,8 +464,10 @@ ) JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) + def displayName(self): + return 'NodeHello' + - class JobListPeers(JobBase): _fcp_auto_remove_ = True @@ -353,15 +481,21 @@ JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) + def displayName(self): + return 'ListPeers' + def handlePeer(self,msg): pass -class JobFileInfo(JobBase): +class JobGetFileInfo(JobBase): + """""" _fcp_auto_remove_ = False + + # idea is to provoke a GetFailed message and take mimetype and size from it def __init__(self, fcpClient, uri, **params): """ @param fcpClient: FcpClient() instance @@ -378,7 +512,7 @@ Messages.ClientGet, Identifier=identifier, URI=uri, - MaxSize='ase0', + MaxSize='0', ReturnType='none', Verbosity='1', **params @@ -386,36 +520,37 @@ JobBase.__init__(self, fcpClient, identifier, message) + def displayName(self): + return 'GetFileInfo' + def handleProgress(self, msg): pass - def stop(self, msg): - JobBase.stop(self, msg) - error = result = None + def error(self, msg): + JobBase.error(self, msg) if msg.name == Messages.GetFailed: if msg['Code'] == FetchErrors.TooBig: - result = ( + self.fcpError = None + self.fcpResult = ( msg.get('ExpectedMetadata.ContentType', ''), msg.get('ExpectedDataLength', '') ) - else: - error, result = msg['Code'], msg - - elif msg.name == Messages.DataFound: - result = ( + #else: + # raise ValueError('Unhandled message: %s' % msg.name) + + + def stop(self, msg): + JobBase.stop(self, msg) + if msg.name == Messages.DataFound: + self.fcpResult = ( msg.get('Metadata.ContentType', ''), msg.get('DataLength', '') ) - - elif msg.name == Messages.ProtocolError: - error, result = msg['Code'], msg - else: raise ValueError('Unhandled message: %s' % msg.name) - - self.fcpResult = error, result + #TODO: handle case where directories are registered multiple times class JobTestDDA(JobBase): @@ -432,7 +567,9 @@ JobBase.__init__(self, fcpClient, directory, message) self.fcpTmpFile = None - + def displayName(self): + return 'TestDDA' + def handleTestDDAReply(self, msg): fpathWrite = msg.params.get('WriteFilename', None) fpathRead = msg.params.get('ReadFilename', None) @@ -456,19 +593,58 @@ ReadContent=readContent, ) + def error(self, msg): + JobBase.error(self, msg) + saveRemoveFile(self.fcpTmpFile) + self.fcpTmpFile = None + + def stop(self, msg): JobBase.stop(self, msg) - if self.fcpTmpFile is not None: - if os.path.isfile(self.fcpTmpFile): - os.remove(self.fcpTmpFile) + saveRemoveFile(self.fcpTmpFile) + self.fcpTmpFile = None #************************************************************************** # fcp client #************************************************************************** +class LogMessages: + Connecting = 'Connecting to node...' + Connected = 'Connected to node' + ConnectionRetry = 'Connecting to node failed... retrying' + ConnectingFailed = 'Connecting to node failed' + + ClientClose = 'Closing client' + + MessageSend = 'Message send' + MessageReceived = 'Message received' + + JobStart = 'Starting job: ' + JobStop = 'Stopping job: ' + JobsCompleted = 'All jobs completed' + + KeyboardInterrupt = 'Keyboard interrupt' + SocketDead = 'Socket is dead' + + +#TODO: no idea what happens on reconnect if socket died. What about running jobs? +#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. class FcpClient(object): - def __init__(self): - + def __init__(self, + name='', + errorHandler=None, + verbosity=logging.WARNING, + logMessages=LogMessages + ): + """ + @param name: name of the client instance or '' (for debugging) + @param errorHandler: will be called if the socket conncetion to the node is dead + with two params: FcpSocketError + details. When the handler is called the client + is already closed. + @param verbosity: verbosity level for debugging + @param logMessages: LogMessages class containing messages + """ + self._isConnected = False self._jobs = { 'all': {}, @@ -476,24 +652,34 @@ 'running': [], 'complete': [], } + self._errrorHandler = errorHandler + self._log = logging.getLogger('FcpClient20:%s' % name) + self._logMessages = logMessages self._lock = thread.allocate_lock() self._socket = None - + self.setVerbosity(verbosity) + atexit.register(self.close) def close(self): + self._log.info(self._logMessages.ClientClose) if self._socket is not None: self._socket.close() self._socket = None + #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): + + self._log.info(self._logMessages.Connecting) + # poll untill freenet responds time_elapsed = 0 while time_elapsed <= repeat: # try to Connect socket - self.close() + if self._socket is not None: + self.close() self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.settimeout(SocketTimeout) try: @@ -501,19 +687,19 @@ except Exception, d: pass else: - #self._isConnected = True + self._log.info(self._logMessages.Connected) return True + self._log.info(self._logMessages.ConnectionRetry) + # continue polling time_elapsed += timeout time.sleep(timeout) + self._log.info(self._logMessages.ConnectingFailed) return False - #def __nonzero__(self): - # return self._isConnected - def addJob(self, job): self._lock.acquire(True) try: @@ -523,9 +709,12 @@ self._jobs['running'].append(job) finally: self._lock.release() + + self._log.info(self._logMessages.JobStart + job.displayName()) job.start() - def finishJob(self, identifier, msg): + + def stopJob(self, identifier, msg, error=False): self._lock.acquire(True) try: job = self._jobs['all'].get(identifier, None) @@ -540,7 +729,11 @@ if job is None: raise ValueError('No such job: %r' % identifier) - job.stop(msg) + self._log.info(self._logMessages.JobStop + job.displayName()) + if error: + job.error(msg) + else: + job.stop(msg) def notifyJob(self, identifier, handler, msg): @@ -558,27 +751,25 @@ # TODO: # x. push pending jobs - # x. on error stop this thingy - - n = 0 - while True: - if not self._lock.acquire(False): - continue + try: + while True: + if not self._lock.acquire(False): + continue + + try: + if not self._jobs['pending'] and not self._jobs['running']: + self._log.info(self._logMessages.JobsCompleted) + break + finally: + self._lock.release() + + self.next() + except KeyboardInterrupt: + self._log(self._logMessages.KeyboardInterrupt) + self.close() - try: - if not self._jobs['pending'] and not self._jobs['running']: - break - finally: - self._lock.release() - - msg = self.readMessage() - self.handleMessage(msg) - - - n += 1 - if n > 50: break - + #TODO: some info when all jobs are completed def next(self): msg = self.readMessage() self.handleMessage(msg) @@ -586,33 +777,32 @@ def handleMessage(self, msg): - print msg.pprint() - + self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) + if msg.name == Messages.NodeHello: #connectionIdentifier = msg['ConnectionIdentifier'] - self.finishJob(JobIdentifiers.ClientHello, msg) + self.stopJob(JobIdentifiers.ClientHello, msg) elif msg.name == Messages.ProtocolError: code = msg['Code'] - if code == ProtocolErrors.NoLateClientHellos: - self.finishJob(JobIdentifiers.ClientHello, msg) + self.stopJob(JobIdentifiers.ClientHello, msg, error=True) else: identifier = msg.get('Identifier', None) if identifier is None: pass # raise ??? else: - self.finishJob(identifier, msg) + self.stopJob(identifier, msg, error=True) elif msg.name == Messages.Peer: self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg) elif msg.name == Messages.EndListPeers: - self.finishJob(IdentifierListPeers, msg) + self.stopJob(IdentifierListPeers, msg) elif msg.name == Messages.GetFailed: - self.finishJob(msg['Identifier'], msg) + self.stopJob(msg['Identifier'], msg, error=True) elif msg.name == Messages.SimpleProgress: self.notifyJob(msg['Identifier'], 'handleProgress', msg) @@ -621,7 +811,7 @@ self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg) elif msg.name == Messages.TestDDAComplete: - self.finishJob(msg['Directory'], msg) + self.stopJob(msg['Directory'], msg) elif msg.name == Messages.IdentifierCollision: pass @@ -642,6 +832,10 @@ msg = MessageSocketTimeout() break except Exception, d: + self._log.info(self._logMessages.SocketDead) + self.close() + if self._errorHandler is not None: + self._errorHandler(FcpSocketError, d) raise FcpSocketError(d) #!! if p == '\r': # ignore @@ -662,7 +856,12 @@ n = int(msg.params['DataLength']) try: msg.data = self._socket.recv(n) + if not msg.data: raise ValueError('Socket is dead') except Exception, d: + self._log.info(self._logMessages.SocketDead) + self.close() + if self._errorHandler is not None: + self._errorHandler(FcpSocketError, d) raise FcpSocketError(d) #!! else: @@ -674,8 +873,13 @@ return msg - + def setLogMessages(self, logMessages): + self._logMessages = logMessages + + def setVerbosity(self, verbosity): + self._log.setLevel(verbosity) + def sendMessage(self, name, data=None, **params): """Sends a message to freenet @param name: name of the message to send @@ -691,25 +895,50 @@ @param msg: (Message) message to send @return: Message """ - #self.log.info('SendMessage\n' + msg.pprint()) rawMsg = msg.toString() + self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint()) try: self._socket.sendall(rawMsg) except Exception, d: + self._log.info(self._logMessages.SocketDead) + self.close() + if self._errorHandler is not None: + self._errorHandler(FcpSocketError, d) raise FcpSocketError(d) - #TODO: allow for an error handler to handle return msg #***************************************************************************** # #***************************************************************************** if __name__ == '__main__': - c = FcpClient() + c = FcpClient(name='test', verbosity=logging.DEBUG) if c.connect(): job1 = JobNodeHello(c) c.addJob(job1) c.run() print '---------------------------' - print job1.fcpResult.pprint() + print job1.fcpError + print job1.fcpResult + print job1.fcpTime + print '---------------------------' + + + job2 = JobTestDDA(c, os.path.dirname(__file__)) + c.addJob(job2) + c.run() + print '---------------------------' + print job1.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' + + job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + c.addJob(job2) + c.run() + print '---------------------------' + print job1.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ju...@us...> - 2007-10-16 17:54:04
|
Revision: 6 http://fclient.svn.sourceforge.net/fclient/?rev=6&view=rev Author: jurner Date: 2007-10-16 10:54:01 -0700 (Tue, 16 Oct 2007) Log Message: ----------- added a bit of documentation and a class to deal with freenet uris Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp20.py Modified: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 10:19:24 UTC (rev 5) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 17:54:01 UTC (rev 6) @@ -2,6 +2,7 @@ import atexit import logging import os +import re import socket import subprocess import sys @@ -13,6 +14,7 @@ #************************************************************** # consts #************************************************************** +NameClient = 'Fcp20Client' DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() DefaultFcpPort = 9481 try: @@ -21,12 +23,14 @@ SocketTimeout = 0.1 class JobIdentifiers: - # fixed job identifiers - # note that the client can only handle one job of these at a time + """Fixed job identifiers + @note: he client can only handle one job of these at a time + """ ClientHello = 'ClientHello' ListPeers = 'ListPeers' class Messages: + """All messages supported by the client""" # client messages ClientHello = 'ClientHello' @@ -92,6 +96,8 @@ class Priorities: + """All priorities supported by the client""" + Maximum = 0 Interactive = 1 SemiInteractive = 2 @@ -107,6 +113,8 @@ # errors class FetchErrors: + """All fetch errors supported by the client""" + MaxArchiveRecursionExceeded = '1' UnknownSplitfileMetadata = '2' UnknownMetadata = '3' @@ -138,6 +146,8 @@ class InsertErrors: + """All insert errors supported by the client""" + InvalidUri = '1' BucketError = '2' InternalError = '3' @@ -151,6 +161,8 @@ class ProtocolErrors: + """All protocol errors supported by the client""" + ClientHelloMustBeFirst = '1' NoLateClientHellos = '2' MessageParseError = '3' @@ -187,6 +199,9 @@ # functions #********************************************************************** def newIdentifier(): + """Returns a new unique identifier + @return: (str) uuid + """ return str(uuid.uuid4()) def saveReadFile(fpath): @@ -206,6 +221,10 @@ return read def saveRemoveFile(fpath): + """Savely removes a file + @param fpath: filepath of the file to remove or None + @return: True if the file was removed, False otherwise + """ if fpath is not None: if os.path.isfile(fpath): os.remove(fpath) @@ -265,99 +284,101 @@ """ return fcpBool == 'true' - -def uriToList(uri): - """Splits a freenet uri into its components - @param uri: (str) uri to split - @return: (list) or components - @note: additional dexoration like 'freenet:' or 'http://blah' are ignored +#********************************************************************** +# classes +#********************************************************************** +class FcpSocketError(Exception): pass +class FcpUri(object): + """Wrapper class for freenet uris""" - >>> uriToList('SSK@foo') - ['SSK@foo'] - >>> uriToList('SSK@foo/bar/baz') - ['SSK@foo', 'bar', 'baz'] + KeySSK = 'SSK@' + KeyKSK = 'KSK@' + KeyCHK = 'CHK@' + KeyUSK = 'USK@' + KeySVK = 'SVK@' + KeyUnknown = '' + KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK) + + ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I) + ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I) - >>> uriToList('freenet:SSK@foo') - ['SSK@foo'] - >>> uriToList('http://foo/SSK@foo') - ['SSK@foo'] + def __init__(self, uri): + """ + @param uri: uri to wrap + @param cvar ReUriPattern: pattern matching a freenet uri + @param cvar ReKeyPattern: pattern matching the key type of a freenet uri + + @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible + + + >>> uri = FcpUri('freenet:SSK@foo/bar') + >>> str(uri) + 'SSK@foo/bar' + >>> uri.keyType() == FcpUri.KeySSK + True + >>> uri.split() + ('SSK@foo', 'bar') + >>> uri.fileName() + 'bar' + + >>> uri = FcpUri('http://SSK@foo/bar') + >>> str(uri) + 'SSK@foo/bar' + + # uris not containing freenet keys are left unchanged + >>> uri = FcpUri('http://foo/bar') + >>> str(uri) + 'http://foo/bar' + >>> uri.keyType() == FcpUri.KeyUnknown + True + >>> uri.split() + ('http://foo/bar', '') + >>> uri.fileName() + 'http://foo/bar' + + """ + self._uri = uri + + result = self.ReUriPattern.search(uri) + if result is not None: + self._uri = result.group(0) + + def __str__(self): + return str(self._uri) - >>> uriToList('http://foo') - [] + def __unicode__(self): + return unicode(self._uri) - """ - if uri.startswith('freenet:'): - uri = uri[len('freenet:'): ] - - components = [] - head = uri - while head: - head, tail = posixpath.split(head) - components.append(tail) + def keyType(self): + """Retuns the key type of the uri + @return: one of the Key* consts + """ + result = self.ReKeyPattern.search(self._uri) + if result is not None: + return result.group(0).upper() + return self.KeyUnknown - components.reverse() + def split(self): + """Splits the uri + @return: tuple(freenet-key, file-name) + """ + if self.keyType() != self.KeyUnknown: + head, sep, tail = self._uri.partition('/') + return head, tail + return self._uri, '' + + def fileName(self): + """Returns the filename part of the uri + @return: str + """ + head, tail = self.split() + if tail: + return tail + return self._uri - while components: - if components[0][:4] in FcpKeys.KeysAll: - break - else: - components.pop(0) - - return components -def uriKeyType(uri): - pass - - -def isValidUri(uri): - """Checks if an uri is a valid freenet uri - @param uri: (str) uri to check - @reuturn: (bool) - """ - return bool(UriToList(uri)) - - -def splitUri(uri): - """Splits an uri into uri and filename - @param uri: uri to split - @return: tuple(uri, filename) - - >>> splitUri('SSK@foo/bar/baz') - ('SSK@foo', 'bar/baz') - - >>> splitUri('SSK@foo') - ('SSK@foo', '') - - >>> splitUri('NoUri') - ('NoUri', '') - - """ - L = uriToList(uri) - if not L: - return (uri, '') - names = L[1:] - if not names: - name = '' - else: - name = '/'.join(names) - return (L[0], name) - - -def fileNameFromUri(uri): - """Returns the filename part of an uri - @return: (str) filename. If no filename is found the uri is returned unchanged - """ - tmp_uri, name = splitUri(uri) - if name: - return name - return uri - -#********************************************************************** -# classes -#********************************************************************** -class FcpSocketError(Exception): pass class Message(object): """Class wrapping a freenet message""" @@ -417,6 +438,8 @@ def __init__(self): Message.__init__(self, self.Name) + + #************************************************************************** # jobs #************************************************************************** @@ -426,6 +449,18 @@ _fcp_auto_remove_ = True def __init__(self, fcpClient, identifier, message): + """ + @param fcpClient: FcpClient() instance + @param identifier: (str) identifier of the job + @param message: (Message) to send to the node whne the job ist started + @ivar fcpClient: FcpClient() instance + @ivar fcpError: holding the error message if an error was encountered while running + the job, None otherwise + @ivar fcpIdentifier: identifier of the job + @ivar fcpMessage: initial message send to the node + @ivar fcpResult: if no error was encountered, holding the result of the job when complete + @ivar fcpTime: when the job is complete, holding the time the job took to complete + """ self.fcpClient = fcpClient # FcpClient() instance the job belongs to self.fcpError = None # last error (either this is set or dcpResult) @@ -435,31 +470,50 @@ self.fcpTime = 0 # start time (will hld duration whern the job is complte) def displayName(self): + """Returns the display name of the job + @return: (str) display name + """ return 'JobBase' def start(self): + """Starts the job""" self.fcpTime = time.time() self.fcpClient.sendMessageEx(self.fcpMessage) def error(self, msg): + """Called on job completion if an error was encounterd while runnng the job + @param msg: (Message) to pass to the job + """ self.fcpTime = time.time() - self.fcpTime self.fcpError = msg self.fcpResult = None - def stop(self, result): + def stop(self, msg): + """Called on job completion to stop the job + @param msg: (Message) to pass to the job + """ self.fcpTime = time.time() - self.fcpTime self.fcpError = None - self.fcpResult = result + self.fcpResult = msg -class JobNodeHello(JobBase): +class JobClientHello(JobBase): + """Sed a ClientHello message to the node + @note: this must be the first message passed to the node. If everything + goes well, you will get a NodeHello in response. + """ + _fcp_auto_remove_ = True - def __init__(self, fcpClient, expectedVersion='2.0'): + def __init__(self, fcpClient, name=None, expectedVersion='2.0'): + """ + @param name: (str) connection name or None, to use an arbitrary name + @param expectedVersion: (str) node version expected + """ message = Message( Messages.ClientHello, - Name=newIdentifier(), + Name=name if name is not None else newIdentifier(), ExpectedVersion=expectedVersion, ) JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) @@ -469,6 +523,8 @@ class JobListPeers(JobBase): + """Lists all known peers of the node + """ _fcp_auto_remove_ = True @@ -485,13 +541,20 @@ return 'ListPeers' def handlePeer(self,msg): - pass + """Handles the next peer send by the node in form of a 'Peer' message + while the job is running. Overwrite to process. + """ + - class JobGetFileInfo(JobBase): - """""" + """Tries to retieve information about a file. If everything goes well + On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file. + Note, that both members may be '' (empty string) + + """ + _fcp_auto_remove_ = False @@ -524,9 +587,11 @@ return 'GetFileInfo' def handleProgress(self, msg): - pass + """Handles the next progress made of a 'SimpleProgress' message + while the job is running. Overwrite to process. + """ + - def error(self, msg): JobBase.error(self, msg) if msg.name == Messages.GetFailed: @@ -554,7 +619,9 @@ #TODO: handle case where directories are registered multiple times class JobTestDDA(JobBase): - + """Tests a directory for read / write accesss + """ + _fcp_auto_remove_ = False def __init__(self, fcpClient, directory, read=True, write=True): @@ -608,6 +675,7 @@ # fcp client #************************************************************************** class LogMessages: + """Message strings used for log infos""" Connecting = 'Connecting to node...' Connected = 'Connected to node' ConnectionRetry = 'Connecting to node failed... retrying' @@ -628,8 +696,11 @@ #TODO: no idea what happens on reconnect if socket died. What about running jobs? #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. +#TODO: no idea if to add support for pending jobs and queue management here class FcpClient(object): + """Fcp client implementation""" + def __init__(self, name='', errorHandler=None, @@ -648,12 +719,12 @@ self._isConnected = False self._jobs = { 'all': {}, - 'pending': [], + 'pending': [], # ??? 'running': [], - 'complete': [], + 'complete': [], # ??? } - self._errrorHandler = errorHandler - self._log = logging.getLogger('FcpClient20:%s' % name) + self._errorHandler = errorHandler + self._log = logging.getLogger(NameClient + ':' + name) self._logMessages = logMessages self._lock = thread.allocate_lock() self._socket = None @@ -662,6 +733,9 @@ atexit.register(self.close) def close(self): + """Closes the client + @note: make shure to call close() when done with the client + """ self._log.info(self._logMessages.ClientClose) if self._socket is not None: self._socket.close() @@ -670,7 +744,13 @@ #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): - + """Establishes the connection to a freenet node + @param host: (str) host of th node + @param port: (int) port of the node + @param repeat: (int) how many seconds try to connect before giving up + @param timeout: (int) how much time to wait before another attempt to connect + @return: True if successful, False otherwise + """ self._log.info(self._logMessages.Connecting) # poll untill freenet responds @@ -699,8 +779,55 @@ self._log.info(self._logMessages.ConnectingFailed) return False + + def handleMessage(self, msg): + """Handles the next message from the freenet node + @param msg: Message() to handle + """ + self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) + + if msg.name == Messages.NodeHello: + #connectionIdentifier = msg['ConnectionIdentifier'] + self.jobStop(JobIdentifiers.ClientHello, msg) + + elif msg.name == Messages.ProtocolError: + code = msg['Code'] + if code == ProtocolErrors.NoLateClientHellos: + self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + + else: + identifier = msg.get('Identifier', None) + if identifier is None: + pass # raise ??? + else: + self.jobStop(identifier, msg, error=True) + + elif msg.name == Messages.Peer: + self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg) + + elif msg.name == Messages.EndListPeers: + self.jobStop(IdentifierListPeers, msg) + + elif msg.name == Messages.GetFailed: + self.jobStop(msg['Identifier'], msg, error=True) + + elif msg.name == Messages.SimpleProgress: + self.jobNotify(msg['Identifier'], 'handleProgress', msg) + + elif msg.name == Messages.TestDDAReply: + self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) + + elif msg.name == Messages.TestDDAComplete: + self.jobStop(msg['Directory'], msg) + + elif msg.name == Messages.IdentifierCollision: + pass - def addJob(self, job): + + def jobAdd(self, job): + """Adds a job to the client + @param job: (Job*) job to add + """ self._lock.acquire(True) try: if job.fcpIdentifier in self._jobs['all']: @@ -712,12 +839,33 @@ self._log.info(self._logMessages.JobStart + job.displayName()) job.start() - - def stopJob(self, identifier, msg, error=False): + + def jobNotify(self, identifier, handler, msg): + """Notifies a job about an event while it is running + @param identifier: identifier of the job to notify + @param handler: (str) method of the job to call to handle the notification + @param msg: Message() to pass to the job + """ self._lock.acquire(True) try: job = self._jobs['all'].get(identifier, None) + finally: + self._lock.release() + if job is None: + raise ValueError('No such job: %r' % identifier) + getattr(job, handler)(msg) + + + def jobStop(self, identifier, msg, error=False): + """Stops a job + @param identifier: identifier of the job to stop + @param msg: Message() to pass to the job as result + @param error: set to True to indicate unsuccessful completion of the job, True otherwisse + """ + self._lock.acquire(True) + try: + job = self._jobs['all'].get(identifier, None) if job is not None: self._jobs['running'].remove(job) if job._fcp_auto_remove_: @@ -734,92 +882,22 @@ job.error(msg) else: job.stop(msg) - - def notifyJob(self, identifier, handler, msg): - self._lock.acquire(True) - try: - job = self._jobs['all'].get(identifier, None) - finally: - self._lock.release() - if job is None: - raise ValueError('No such job: %r' % identifier) - getattr(job, handler)(msg) - - def run(self): - - # TODO: - # x. push pending jobs - try: - while True: - if not self._lock.acquire(False): - continue - - try: - if not self._jobs['pending'] and not self._jobs['running']: - self._log.info(self._logMessages.JobsCompleted) - break - finally: - self._lock.release() - - self.next() - except KeyboardInterrupt: - self._log(self._logMessages.KeyboardInterrupt) - self.close() - - #TODO: some info when all jobs are completed def next(self): + """Pumps the next message waiting + @note: use this method instead of run() to run the client step by step + """ msg = self.readMessage() self.handleMessage(msg) - - def handleMessage(self, msg): - - self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) - - if msg.name == Messages.NodeHello: - #connectionIdentifier = msg['ConnectionIdentifier'] - self.stopJob(JobIdentifiers.ClientHello, msg) - - elif msg.name == Messages.ProtocolError: - code = msg['Code'] - if code == ProtocolErrors.NoLateClientHellos: - self.stopJob(JobIdentifiers.ClientHello, msg, error=True) - - else: - identifier = msg.get('Identifier', None) - if identifier is None: - pass # raise ??? - else: - self.stopJob(identifier, msg, error=True) - - elif msg.name == Messages.Peer: - self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg) - - elif msg.name == Messages.EndListPeers: - self.stopJob(IdentifierListPeers, msg) - - elif msg.name == Messages.GetFailed: - self.stopJob(msg['Identifier'], msg, error=True) - - elif msg.name == Messages.SimpleProgress: - self.notifyJob(msg['Identifier'], 'handleProgress', msg) - - elif msg.name == Messages.TestDDAReply: - self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg) - - elif msg.name == Messages.TestDDAComplete: - self.stopJob(msg['Directory'], msg) - - elif msg.name == Messages.IdentifierCollision: - pass - - def readMessage(self): """Reads the next message directly from the socket and dispatches it - @return: valid or invalid Message() + @return: (Message) the next message read from the socket + @raise FcpSocketError: if the socket connection to the node dies unexpectedly + If an error handler is passed to the client it is called emidiately before the error + is raised. """ msg = Message(None) buf = [] @@ -872,20 +950,42 @@ pass return msg + + + def run(self): + """Runs the client untill all jobs passed to it are completed + @note: use KeyboardInterrupt to stop prematurely + """ + # TODO: + # x. push pending jobs + try: + while True: + if not self._lock.acquire(False): + continue + + try: + if not self._jobs['pending'] and not self._jobs['running']: + self._log.info(self._logMessages.JobsCompleted) + break + finally: + self._lock.release() + + self.next() + except KeyboardInterrupt: + self._log(self._logMessages.KeyboardInterrupt) + self.close() - def setLogMessages(self, logMessages): - self._logMessages = logMessages - def setVerbosity(self, verbosity): - self._log.setLevel(verbosity) - def sendMessage(self, name, data=None, **params): """Sends a message to freenet @param name: name of the message to send @param data: data to atatch to the message @param params: {para-name: param-calue, ...} of parameters to pass along with the message (see freenet protocol) + @raise FcpSocketError: if the socket connection to the node dies unexpectedly + If an error handler is passed to the client it is called emidiately before the error + is raised. """ return self.sendMessageEx(Message(name, data=data, **params)) @@ -894,6 +994,9 @@ """Sends a message to freenet @param msg: (Message) message to send @return: Message + @raise FcpSocketError: if the socket connection to the node dies unexpectedly. + If an error handler is passed to the client it is called emidiately before the error + is raised. """ rawMsg = msg.toString() self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint()) @@ -907,14 +1010,24 @@ raise FcpSocketError(d) return msg + + def setLogMessages(self, logMessages): + """""" + self._logMessages = logMessages + + + def setVerbosity(self, verbosity): + """""" + self._log.setLevel(verbosity) + #***************************************************************************** # #***************************************************************************** if __name__ == '__main__': c = FcpClient(name='test', verbosity=logging.DEBUG) if c.connect(): - job1 = JobNodeHello(c) - c.addJob(job1) + job1 = JobClientHello(c) + c.jobAdd(job1) c.run() print '---------------------------' @@ -924,21 +1037,21 @@ print '---------------------------' - job2 = JobTestDDA(c, os.path.dirname(__file__)) - c.addJob(job2) - c.run() - print '---------------------------' - print job1.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' - - job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') - c.addJob(job2) - c.run() - print '---------------------------' - print job1.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' + job2 = JobTestDDA(c, os.path.dirname(__file__)) + c.jobAdd(job2) + c.run() + print '---------------------------' + print job1.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' + + job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + c.jobAdd(job2) + c.run() + print '---------------------------' + print job1.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-20 10:00:17
|
Revision: 7 http://fclient.svn.sourceforge.net/fclient/?rev=7&view=rev Author: jUrner Date: 2007-10-20 03:00:20 -0700 (Sat, 20 Oct 2007) Log Message: ----------- continued working on fcp protocol implementation Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp20.py Modified: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 17:54:01 UTC (rev 6) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:00:20 UTC (rev 7) @@ -288,6 +288,21 @@ # classes #********************************************************************** class FcpSocketError(Exception): pass +class FcpProtocolError(Exception): + def __init__(self, msg): + """ + @param msg: (Message) ProtocolError message + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + + def __str__(self): return self.value + + + class FcpUri(object): """Wrapper class for freenet uris""" @@ -467,8 +482,10 @@ self.fcpIdentifier = identifier # self.fcpMessage = message # message send to node self.fcpResult = None # job result - self.fcpTime = 0 # start time (will hld duration whern the job is complte) - + self.fcpTimeStart = 0 # time the job was started + self.fcpTimeStop = 0 # time the job was stopped + self.fcpStopped = False + def displayName(self): """Returns the display name of the job @return: (str) display name @@ -477,14 +494,16 @@ def start(self): """Starts the job""" - self.fcpTime = time.time() + self.fcpStopped = False + self.fcpTimeStart = time.time() self.fcpClient.sendMessageEx(self.fcpMessage) def error(self, msg): """Called on job completion if an error was encounterd while runnng the job @param msg: (Message) to pass to the job """ - self.fcpTime = time.time() - self.fcpTime + self.fcpStopped = True + self.fcpTimeStop = time.time() self.fcpError = msg self.fcpResult = None @@ -492,7 +511,8 @@ """Called on job completion to stop the job @param msg: (Message) to pass to the job """ - self.fcpTime = time.time() - self.fcpTime + self.fcpStopped = True + self.fcpTimeStop = time.time() self.fcpError = None self.fcpResult = msg @@ -519,7 +539,7 @@ JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) def displayName(self): - return 'NodeHello' + return 'ClientHello' class JobListPeers(JobBase): @@ -531,8 +551,8 @@ def __init__(self, fcpClient, withMetaData=False, withVolantile=False): message = Message( Messages.ListPeers, - WithMetadata='true' if withMetaData else 'false', - WithVolatile='true' if withVolantile else 'false', + WithMetadata=fcpBool(withMetaData), + WithVolatile=fcpBool(withVolantile), ) JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) @@ -587,8 +607,7 @@ return 'GetFileInfo' def handleProgress(self, msg): - """Handles the next progress made of a 'SimpleProgress' message - while the job is running. Overwrite to process. + """Handles the next progress made. Overwrite to process. """ @@ -625,6 +644,9 @@ _fcp_auto_remove_ = False def __init__(self, fcpClient, directory, read=True, write=True): + if not os.path.isdir(directory): + raise ValueError('No such directory: %r' % directory) + message = Message( Messages.TestDDARequest, Directory=directory, @@ -660,6 +682,7 @@ ReadContent=readContent, ) + def error(self, msg): JobBase.error(self, msg) saveRemoveFile(self.fcpTmpFile) @@ -728,7 +751,7 @@ self._logMessages = logMessages self._lock = thread.allocate_lock() self._socket = None - + self.setVerbosity(verbosity) atexit.register(self.close) @@ -749,8 +772,9 @@ @param port: (int) port of the node @param repeat: (int) how many seconds try to connect before giving up @param timeout: (int) how much time to wait before another attempt to connect - @return: True if successful, False otherwise + @return: (Message) NodeHello if successful,None otherwise """ + self._clientHello = None self._log.info(self._logMessages.Connecting) # poll untill freenet responds @@ -768,7 +792,10 @@ pass else: self._log.info(self._logMessages.Connected) - return True + job = JobClientHello(self) + self.jobAdd(job, synchron=True) + assert job.fcpError is None, 'Error should have been caught by handleMessage()' + return job.fcpResult self._log.info(self._logMessages.ConnectionRetry) @@ -777,7 +804,7 @@ time.sleep(timeout) self._log.info(self._logMessages.ConnectingFailed) - return False + return None def handleMessage(self, msg): @@ -792,15 +819,17 @@ elif msg.name == Messages.ProtocolError: code = msg['Code'] - if code == ProtocolErrors.NoLateClientHellos: - self.jobStop(JobIdentifiers.ClientHello, msg, error=True) - + #if code == ProtocolErrors.NoLateClientHellos: + # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + #elif code == ProtocolErrors.ClientHelloMustBeFirst: + # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + #else: + identifier = msg.get('Identifier', None) + if identifier is None: + #TODO: inform caller + raise FcpProtocolError(msg) else: - identifier = msg.get('Identifier', None) - if identifier is None: - pass # raise ??? - else: - self.jobStop(identifier, msg, error=True) + self.jobStop(identifier, msg, error=True) elif msg.name == Messages.Peer: self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg) @@ -824,9 +853,10 @@ pass - def jobAdd(self, job): + def jobAdd(self, job, synchron=False): """Adds a job to the client @param job: (Job*) job to add + @param synchron: if True, wait untill the job is completed, if False return emidiately """ self._lock.acquire(True) try: @@ -839,6 +869,10 @@ self._log.info(self._logMessages.JobStart + job.displayName()) job.start() + if synchron: + while not job.fcpStopped: + self.next() + def jobNotify(self, identifier, handler, msg): @@ -857,6 +891,7 @@ getattr(job, handler)(msg) + #TODO: quite unclear when to remove a job def jobStop(self, identifier, msg, error=False): """Stops a job @param identifier: identifier of the job to stop @@ -891,6 +926,7 @@ """ msg = self.readMessage() self.handleMessage(msg) + return msg def readMessage(self): """Reads the next message directly from the socket and dispatches it @@ -1026,32 +1062,37 @@ if __name__ == '__main__': c = FcpClient(name='test', verbosity=logging.DEBUG) if c.connect(): - job1 = JobClientHello(c) - c.jobAdd(job1) + def foo(): + job1 = JobClientHello(c) + c.jobAdd(job1) - c.run() - print '---------------------------' - print job1.fcpError - print job1.fcpResult - print job1.fcpTime - print '---------------------------' + c.run() + print '---------------------------' + print job1.fcpError + print job1.fcpResult + print job1.fcpTime + print '---------------------------' + #foo() + + def foo(): + d = os.path.dirname(os.path.abspath(__file__)) + job2 = JobTestDDA(c, d) + c.jobAdd(job2) + c.run() + print '---------------------------' + print job2.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' - job2 = JobTestDDA(c, os.path.dirname(__file__)) - c.jobAdd(job2) - c.run() - print '---------------------------' - print job1.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' - + def foo(): job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') c.jobAdd(job2) c.run() print '---------------------------' - print job1.fcpError + print job2.fcpError print job2.fcpResult print job2.fcpTime print '---------------------------' - + #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |