SF.net SVN: fclient: [6] trunk/fclient/fclient_lib/fcp/fcp20.py
Status: Pre-Alpha
Brought to you by:
jurner
From: <ju...@us...> - 2007-10-16 17:54:04
|
Revision: 6 http://fclient.svn.sourceforge.net/fclient/?rev=6&view=rev Author: jurner Date: 2007-10-16 10:54:01 -0700 (Tue, 16 Oct 2007) Log Message: ----------- added a bit of documentation and a class to deal with freenet uris Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp20.py Modified: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 10:19:24 UTC (rev 5) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 17:54:01 UTC (rev 6) @@ -2,6 +2,7 @@ import atexit import logging import os +import re import socket import subprocess import sys @@ -13,6 +14,7 @@ #************************************************************** # consts #************************************************************** +NameClient = 'Fcp20Client' DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() DefaultFcpPort = 9481 try: @@ -21,12 +23,14 @@ SocketTimeout = 0.1 class JobIdentifiers: - # fixed job identifiers - # note that the client can only handle one job of these at a time + """Fixed job identifiers + @note: he client can only handle one job of these at a time + """ ClientHello = 'ClientHello' ListPeers = 'ListPeers' class Messages: + """All messages supported by the client""" # client messages ClientHello = 'ClientHello' @@ -92,6 +96,8 @@ class Priorities: + """All priorities supported by the client""" + Maximum = 0 Interactive = 1 SemiInteractive = 2 @@ -107,6 +113,8 @@ # errors class FetchErrors: + """All fetch errors supported by the client""" + MaxArchiveRecursionExceeded = '1' UnknownSplitfileMetadata = '2' UnknownMetadata = '3' @@ -138,6 +146,8 @@ class InsertErrors: + """All insert errors supported by the client""" + InvalidUri = '1' BucketError = '2' InternalError = '3' @@ -151,6 +161,8 @@ class ProtocolErrors: + """All protocol errors supported by the client""" + ClientHelloMustBeFirst = '1' NoLateClientHellos = '2' MessageParseError = '3' @@ -187,6 +199,9 @@ # functions #********************************************************************** def newIdentifier(): + """Returns a new unique identifier + @return: (str) uuid + """ return str(uuid.uuid4()) def saveReadFile(fpath): @@ -206,6 +221,10 @@ return read def saveRemoveFile(fpath): + """Savely removes a file + @param fpath: filepath of the file to remove or None + @return: True if the file was removed, False otherwise + """ if fpath is not None: if os.path.isfile(fpath): os.remove(fpath) @@ -265,99 +284,101 @@ """ return fcpBool == 'true' - -def uriToList(uri): - """Splits a freenet uri into its components - @param uri: (str) uri to split - @return: (list) or components - @note: additional dexoration like 'freenet:' or 'http://blah' are ignored +#********************************************************************** +# classes +#********************************************************************** +class FcpSocketError(Exception): pass +class FcpUri(object): + """Wrapper class for freenet uris""" - >>> uriToList('SSK@foo') - ['SSK@foo'] - >>> uriToList('SSK@foo/bar/baz') - ['SSK@foo', 'bar', 'baz'] + KeySSK = 'SSK@' + KeyKSK = 'KSK@' + KeyCHK = 'CHK@' + KeyUSK = 'USK@' + KeySVK = 'SVK@' + KeyUnknown = '' + KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK) + + ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I) + ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I) - >>> uriToList('freenet:SSK@foo') - ['SSK@foo'] - >>> uriToList('http://foo/SSK@foo') - ['SSK@foo'] + def __init__(self, uri): + """ + @param uri: uri to wrap + @param cvar ReUriPattern: pattern matching a freenet uri + @param cvar ReKeyPattern: pattern matching the key type of a freenet uri + + @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible + + + >>> uri = FcpUri('freenet:SSK@foo/bar') + >>> str(uri) + 'SSK@foo/bar' + >>> uri.keyType() == FcpUri.KeySSK + True + >>> uri.split() + ('SSK@foo', 'bar') + >>> uri.fileName() + 'bar' + + >>> uri = FcpUri('http://SSK@foo/bar') + >>> str(uri) + 'SSK@foo/bar' + + # uris not containing freenet keys are left unchanged + >>> uri = FcpUri('http://foo/bar') + >>> str(uri) + 'http://foo/bar' + >>> uri.keyType() == FcpUri.KeyUnknown + True + >>> uri.split() + ('http://foo/bar', '') + >>> uri.fileName() + 'http://foo/bar' + + """ + self._uri = uri + + result = self.ReUriPattern.search(uri) + if result is not None: + self._uri = result.group(0) + + def __str__(self): + return str(self._uri) - >>> uriToList('http://foo') - [] + def __unicode__(self): + return unicode(self._uri) - """ - if uri.startswith('freenet:'): - uri = uri[len('freenet:'): ] - - components = [] - head = uri - while head: - head, tail = posixpath.split(head) - components.append(tail) + def keyType(self): + """Retuns the key type of the uri + @return: one of the Key* consts + """ + result = self.ReKeyPattern.search(self._uri) + if result is not None: + return result.group(0).upper() + return self.KeyUnknown - components.reverse() + def split(self): + """Splits the uri + @return: tuple(freenet-key, file-name) + """ + if self.keyType() != self.KeyUnknown: + head, sep, tail = self._uri.partition('/') + return head, tail + return self._uri, '' + + def fileName(self): + """Returns the filename part of the uri + @return: str + """ + head, tail = self.split() + if tail: + return tail + return self._uri - while components: - if components[0][:4] in FcpKeys.KeysAll: - break - else: - components.pop(0) - - return components -def uriKeyType(uri): - pass - - -def isValidUri(uri): - """Checks if an uri is a valid freenet uri - @param uri: (str) uri to check - @reuturn: (bool) - """ - return bool(UriToList(uri)) - - -def splitUri(uri): - """Splits an uri into uri and filename - @param uri: uri to split - @return: tuple(uri, filename) - - >>> splitUri('SSK@foo/bar/baz') - ('SSK@foo', 'bar/baz') - - >>> splitUri('SSK@foo') - ('SSK@foo', '') - - >>> splitUri('NoUri') - ('NoUri', '') - - """ - L = uriToList(uri) - if not L: - return (uri, '') - names = L[1:] - if not names: - name = '' - else: - name = '/'.join(names) - return (L[0], name) - - -def fileNameFromUri(uri): - """Returns the filename part of an uri - @return: (str) filename. If no filename is found the uri is returned unchanged - """ - tmp_uri, name = splitUri(uri) - if name: - return name - return uri - -#********************************************************************** -# classes -#********************************************************************** -class FcpSocketError(Exception): pass class Message(object): """Class wrapping a freenet message""" @@ -417,6 +438,8 @@ def __init__(self): Message.__init__(self, self.Name) + + #************************************************************************** # jobs #************************************************************************** @@ -426,6 +449,18 @@ _fcp_auto_remove_ = True def __init__(self, fcpClient, identifier, message): + """ + @param fcpClient: FcpClient() instance + @param identifier: (str) identifier of the job + @param message: (Message) to send to the node whne the job ist started + @ivar fcpClient: FcpClient() instance + @ivar fcpError: holding the error message if an error was encountered while running + the job, None otherwise + @ivar fcpIdentifier: identifier of the job + @ivar fcpMessage: initial message send to the node + @ivar fcpResult: if no error was encountered, holding the result of the job when complete + @ivar fcpTime: when the job is complete, holding the time the job took to complete + """ self.fcpClient = fcpClient # FcpClient() instance the job belongs to self.fcpError = None # last error (either this is set or dcpResult) @@ -435,31 +470,50 @@ self.fcpTime = 0 # start time (will hld duration whern the job is complte) def displayName(self): + """Returns the display name of the job + @return: (str) display name + """ return 'JobBase' def start(self): + """Starts the job""" self.fcpTime = time.time() self.fcpClient.sendMessageEx(self.fcpMessage) def error(self, msg): + """Called on job completion if an error was encounterd while runnng the job + @param msg: (Message) to pass to the job + """ self.fcpTime = time.time() - self.fcpTime self.fcpError = msg self.fcpResult = None - def stop(self, result): + def stop(self, msg): + """Called on job completion to stop the job + @param msg: (Message) to pass to the job + """ self.fcpTime = time.time() - self.fcpTime self.fcpError = None - self.fcpResult = result + self.fcpResult = msg -class JobNodeHello(JobBase): +class JobClientHello(JobBase): + """Sed a ClientHello message to the node + @note: this must be the first message passed to the node. If everything + goes well, you will get a NodeHello in response. + """ + _fcp_auto_remove_ = True - def __init__(self, fcpClient, expectedVersion='2.0'): + def __init__(self, fcpClient, name=None, expectedVersion='2.0'): + """ + @param name: (str) connection name or None, to use an arbitrary name + @param expectedVersion: (str) node version expected + """ message = Message( Messages.ClientHello, - Name=newIdentifier(), + Name=name if name is not None else newIdentifier(), ExpectedVersion=expectedVersion, ) JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) @@ -469,6 +523,8 @@ class JobListPeers(JobBase): + """Lists all known peers of the node + """ _fcp_auto_remove_ = True @@ -485,13 +541,20 @@ return 'ListPeers' def handlePeer(self,msg): - pass + """Handles the next peer send by the node in form of a 'Peer' message + while the job is running. Overwrite to process. + """ + - class JobGetFileInfo(JobBase): - """""" + """Tries to retieve information about a file. If everything goes well + On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file. + Note, that both members may be '' (empty string) + + """ + _fcp_auto_remove_ = False @@ -524,9 +587,11 @@ return 'GetFileInfo' def handleProgress(self, msg): - pass + """Handles the next progress made of a 'SimpleProgress' message + while the job is running. Overwrite to process. + """ + - def error(self, msg): JobBase.error(self, msg) if msg.name == Messages.GetFailed: @@ -554,7 +619,9 @@ #TODO: handle case where directories are registered multiple times class JobTestDDA(JobBase): - + """Tests a directory for read / write accesss + """ + _fcp_auto_remove_ = False def __init__(self, fcpClient, directory, read=True, write=True): @@ -608,6 +675,7 @@ # fcp client #************************************************************************** class LogMessages: + """Message strings used for log infos""" Connecting = 'Connecting to node...' Connected = 'Connected to node' ConnectionRetry = 'Connecting to node failed... retrying' @@ -628,8 +696,11 @@ #TODO: no idea what happens on reconnect if socket died. What about running jobs? #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. +#TODO: no idea if to add support for pending jobs and queue management here class FcpClient(object): + """Fcp client implementation""" + def __init__(self, name='', errorHandler=None, @@ -648,12 +719,12 @@ self._isConnected = False self._jobs = { 'all': {}, - 'pending': [], + 'pending': [], # ??? 'running': [], - 'complete': [], + 'complete': [], # ??? } - self._errrorHandler = errorHandler - self._log = logging.getLogger('FcpClient20:%s' % name) + self._errorHandler = errorHandler + self._log = logging.getLogger(NameClient + ':' + name) self._logMessages = logMessages self._lock = thread.allocate_lock() self._socket = None @@ -662,6 +733,9 @@ atexit.register(self.close) def close(self): + """Closes the client + @note: make shure to call close() when done with the client + """ self._log.info(self._logMessages.ClientClose) if self._socket is not None: self._socket.close() @@ -670,7 +744,13 @@ #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): - + """Establishes the connection to a freenet node + @param host: (str) host of th node + @param port: (int) port of the node + @param repeat: (int) how many seconds try to connect before giving up + @param timeout: (int) how much time to wait before another attempt to connect + @return: True if successful, False otherwise + """ self._log.info(self._logMessages.Connecting) # poll untill freenet responds @@ -699,8 +779,55 @@ self._log.info(self._logMessages.ConnectingFailed) return False + + def handleMessage(self, msg): + """Handles the next message from the freenet node + @param msg: Message() to handle + """ + self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) + + if msg.name == Messages.NodeHello: + #connectionIdentifier = msg['ConnectionIdentifier'] + self.jobStop(JobIdentifiers.ClientHello, msg) + + elif msg.name == Messages.ProtocolError: + code = msg['Code'] + if code == ProtocolErrors.NoLateClientHellos: + self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + + else: + identifier = msg.get('Identifier', None) + if identifier is None: + pass # raise ??? + else: + self.jobStop(identifier, msg, error=True) + + elif msg.name == Messages.Peer: + self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg) + + elif msg.name == Messages.EndListPeers: + self.jobStop(IdentifierListPeers, msg) + + elif msg.name == Messages.GetFailed: + self.jobStop(msg['Identifier'], msg, error=True) + + elif msg.name == Messages.SimpleProgress: + self.jobNotify(msg['Identifier'], 'handleProgress', msg) + + elif msg.name == Messages.TestDDAReply: + self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) + + elif msg.name == Messages.TestDDAComplete: + self.jobStop(msg['Directory'], msg) + + elif msg.name == Messages.IdentifierCollision: + pass - def addJob(self, job): + + def jobAdd(self, job): + """Adds a job to the client + @param job: (Job*) job to add + """ self._lock.acquire(True) try: if job.fcpIdentifier in self._jobs['all']: @@ -712,12 +839,33 @@ self._log.info(self._logMessages.JobStart + job.displayName()) job.start() - - def stopJob(self, identifier, msg, error=False): + + def jobNotify(self, identifier, handler, msg): + """Notifies a job about an event while it is running + @param identifier: identifier of the job to notify + @param handler: (str) method of the job to call to handle the notification + @param msg: Message() to pass to the job + """ self._lock.acquire(True) try: job = self._jobs['all'].get(identifier, None) + finally: + self._lock.release() + if job is None: + raise ValueError('No such job: %r' % identifier) + getattr(job, handler)(msg) + + + def jobStop(self, identifier, msg, error=False): + """Stops a job + @param identifier: identifier of the job to stop + @param msg: Message() to pass to the job as result + @param error: set to True to indicate unsuccessful completion of the job, True otherwisse + """ + self._lock.acquire(True) + try: + job = self._jobs['all'].get(identifier, None) if job is not None: self._jobs['running'].remove(job) if job._fcp_auto_remove_: @@ -734,92 +882,22 @@ job.error(msg) else: job.stop(msg) - - def notifyJob(self, identifier, handler, msg): - self._lock.acquire(True) - try: - job = self._jobs['all'].get(identifier, None) - finally: - self._lock.release() - if job is None: - raise ValueError('No such job: %r' % identifier) - getattr(job, handler)(msg) - - def run(self): - - # TODO: - # x. push pending jobs - try: - while True: - if not self._lock.acquire(False): - continue - - try: - if not self._jobs['pending'] and not self._jobs['running']: - self._log.info(self._logMessages.JobsCompleted) - break - finally: - self._lock.release() - - self.next() - except KeyboardInterrupt: - self._log(self._logMessages.KeyboardInterrupt) - self.close() - - #TODO: some info when all jobs are completed def next(self): + """Pumps the next message waiting + @note: use this method instead of run() to run the client step by step + """ msg = self.readMessage() self.handleMessage(msg) - - def handleMessage(self, msg): - - self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) - - if msg.name == Messages.NodeHello: - #connectionIdentifier = msg['ConnectionIdentifier'] - self.stopJob(JobIdentifiers.ClientHello, msg) - - elif msg.name == Messages.ProtocolError: - code = msg['Code'] - if code == ProtocolErrors.NoLateClientHellos: - self.stopJob(JobIdentifiers.ClientHello, msg, error=True) - - else: - identifier = msg.get('Identifier', None) - if identifier is None: - pass # raise ??? - else: - self.stopJob(identifier, msg, error=True) - - elif msg.name == Messages.Peer: - self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg) - - elif msg.name == Messages.EndListPeers: - self.stopJob(IdentifierListPeers, msg) - - elif msg.name == Messages.GetFailed: - self.stopJob(msg['Identifier'], msg, error=True) - - elif msg.name == Messages.SimpleProgress: - self.notifyJob(msg['Identifier'], 'handleProgress', msg) - - elif msg.name == Messages.TestDDAReply: - self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg) - - elif msg.name == Messages.TestDDAComplete: - self.stopJob(msg['Directory'], msg) - - elif msg.name == Messages.IdentifierCollision: - pass - - def readMessage(self): """Reads the next message directly from the socket and dispatches it - @return: valid or invalid Message() + @return: (Message) the next message read from the socket + @raise FcpSocketError: if the socket connection to the node dies unexpectedly + If an error handler is passed to the client it is called emidiately before the error + is raised. """ msg = Message(None) buf = [] @@ -872,20 +950,42 @@ pass return msg + + + def run(self): + """Runs the client untill all jobs passed to it are completed + @note: use KeyboardInterrupt to stop prematurely + """ + # TODO: + # x. push pending jobs + try: + while True: + if not self._lock.acquire(False): + continue + + try: + if not self._jobs['pending'] and not self._jobs['running']: + self._log.info(self._logMessages.JobsCompleted) + break + finally: + self._lock.release() + + self.next() + except KeyboardInterrupt: + self._log(self._logMessages.KeyboardInterrupt) + self.close() - def setLogMessages(self, logMessages): - self._logMessages = logMessages - def setVerbosity(self, verbosity): - self._log.setLevel(verbosity) - def sendMessage(self, name, data=None, **params): """Sends a message to freenet @param name: name of the message to send @param data: data to atatch to the message @param params: {para-name: param-calue, ...} of parameters to pass along with the message (see freenet protocol) + @raise FcpSocketError: if the socket connection to the node dies unexpectedly + If an error handler is passed to the client it is called emidiately before the error + is raised. """ return self.sendMessageEx(Message(name, data=data, **params)) @@ -894,6 +994,9 @@ """Sends a message to freenet @param msg: (Message) message to send @return: Message + @raise FcpSocketError: if the socket connection to the node dies unexpectedly. + If an error handler is passed to the client it is called emidiately before the error + is raised. """ rawMsg = msg.toString() self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint()) @@ -907,14 +1010,24 @@ raise FcpSocketError(d) return msg + + def setLogMessages(self, logMessages): + """""" + self._logMessages = logMessages + + + def setVerbosity(self, verbosity): + """""" + self._log.setLevel(verbosity) + #***************************************************************************** # #***************************************************************************** if __name__ == '__main__': c = FcpClient(name='test', verbosity=logging.DEBUG) if c.connect(): - job1 = JobNodeHello(c) - c.addJob(job1) + job1 = JobClientHello(c) + c.jobAdd(job1) c.run() print '---------------------------' @@ -924,21 +1037,21 @@ print '---------------------------' - job2 = JobTestDDA(c, os.path.dirname(__file__)) - c.addJob(job2) - c.run() - print '---------------------------' - print job1.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' - - job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') - c.addJob(job2) - c.run() - print '---------------------------' - print job1.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' + job2 = JobTestDDA(c, os.path.dirname(__file__)) + c.jobAdd(job2) + c.run() + print '---------------------------' + print job1.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' + + job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + c.jobAdd(job2) + c.run() + print '---------------------------' + print job1.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |