SF.net SVN: fclient: [6] trunk/fclient/fclient_lib/fcp/fcp20.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <ju...@us...> - 2007-10-16 17:54:04
|
Revision: 6
http://fclient.svn.sourceforge.net/fclient/?rev=6&view=rev
Author: jurner
Date: 2007-10-16 10:54:01 -0700 (Tue, 16 Oct 2007)
Log Message:
-----------
added a bit of documentation and a class to deal with freenet uris
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp20.py
Modified: trunk/fclient/fclient_lib/fcp/fcp20.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 10:19:24 UTC (rev 5)
+++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 17:54:01 UTC (rev 6)
@@ -2,6 +2,7 @@
import atexit
import logging
import os
+import re
import socket
import subprocess
import sys
@@ -13,6 +14,7 @@
#**************************************************************
# consts
#**************************************************************
+NameClient = 'Fcp20Client'
DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
DefaultFcpPort = 9481
try:
@@ -21,12 +23,14 @@
SocketTimeout = 0.1
class JobIdentifiers:
- # fixed job identifiers
- # note that the client can only handle one job of these at a time
+ """Fixed job identifiers
+ @note: he client can only handle one job of these at a time
+ """
ClientHello = 'ClientHello'
ListPeers = 'ListPeers'
class Messages:
+ """All messages supported by the client"""
# client messages
ClientHello = 'ClientHello'
@@ -92,6 +96,8 @@
class Priorities:
+ """All priorities supported by the client"""
+
Maximum = 0
Interactive = 1
SemiInteractive = 2
@@ -107,6 +113,8 @@
# errors
class FetchErrors:
+ """All fetch errors supported by the client"""
+
MaxArchiveRecursionExceeded = '1'
UnknownSplitfileMetadata = '2'
UnknownMetadata = '3'
@@ -138,6 +146,8 @@
class InsertErrors:
+ """All insert errors supported by the client"""
+
InvalidUri = '1'
BucketError = '2'
InternalError = '3'
@@ -151,6 +161,8 @@
class ProtocolErrors:
+ """All protocol errors supported by the client"""
+
ClientHelloMustBeFirst = '1'
NoLateClientHellos = '2'
MessageParseError = '3'
@@ -187,6 +199,9 @@
# functions
#**********************************************************************
def newIdentifier():
+ """Returns a new unique identifier
+ @return: (str) uuid
+ """
return str(uuid.uuid4())
def saveReadFile(fpath):
@@ -206,6 +221,10 @@
return read
def saveRemoveFile(fpath):
+ """Savely removes a file
+ @param fpath: filepath of the file to remove or None
+ @return: True if the file was removed, False otherwise
+ """
if fpath is not None:
if os.path.isfile(fpath):
os.remove(fpath)
@@ -265,99 +284,101 @@
"""
return fcpBool == 'true'
-
-def uriToList(uri):
- """Splits a freenet uri into its components
- @param uri: (str) uri to split
- @return: (list) or components
- @note: additional dexoration like 'freenet:' or 'http://blah' are ignored
+#**********************************************************************
+# classes
+#**********************************************************************
+class FcpSocketError(Exception): pass
+class FcpUri(object):
+ """Wrapper class for freenet uris"""
- >>> uriToList('SSK@foo')
- ['SSK@foo']
- >>> uriToList('SSK@foo/bar/baz')
- ['SSK@foo', 'bar', 'baz']
+ KeySSK = 'SSK@'
+ KeyKSK = 'KSK@'
+ KeyCHK = 'CHK@'
+ KeyUSK = 'USK@'
+ KeySVK = 'SVK@'
+ KeyUnknown = ''
+ KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK)
+
+ ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I)
+ ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I)
- >>> uriToList('freenet:SSK@foo')
- ['SSK@foo']
- >>> uriToList('http://foo/SSK@foo')
- ['SSK@foo']
+ def __init__(self, uri):
+ """
+ @param uri: uri to wrap
+ @param cvar ReUriPattern: pattern matching a freenet uri
+ @param cvar ReKeyPattern: pattern matching the key type of a freenet uri
+
+ @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible
+
+
+ >>> uri = FcpUri('freenet:SSK@foo/bar')
+ >>> str(uri)
+ 'SSK@foo/bar'
+ >>> uri.keyType() == FcpUri.KeySSK
+ True
+ >>> uri.split()
+ ('SSK@foo', 'bar')
+ >>> uri.fileName()
+ 'bar'
+
+ >>> uri = FcpUri('http://SSK@foo/bar')
+ >>> str(uri)
+ 'SSK@foo/bar'
+
+ # uris not containing freenet keys are left unchanged
+ >>> uri = FcpUri('http://foo/bar')
+ >>> str(uri)
+ 'http://foo/bar'
+ >>> uri.keyType() == FcpUri.KeyUnknown
+ True
+ >>> uri.split()
+ ('http://foo/bar', '')
+ >>> uri.fileName()
+ 'http://foo/bar'
+
+ """
+ self._uri = uri
+
+ result = self.ReUriPattern.search(uri)
+ if result is not None:
+ self._uri = result.group(0)
+
+ def __str__(self):
+ return str(self._uri)
- >>> uriToList('http://foo')
- []
+ def __unicode__(self):
+ return unicode(self._uri)
- """
- if uri.startswith('freenet:'):
- uri = uri[len('freenet:'): ]
-
- components = []
- head = uri
- while head:
- head, tail = posixpath.split(head)
- components.append(tail)
+ def keyType(self):
+ """Retuns the key type of the uri
+ @return: one of the Key* consts
+ """
+ result = self.ReKeyPattern.search(self._uri)
+ if result is not None:
+ return result.group(0).upper()
+ return self.KeyUnknown
- components.reverse()
+ def split(self):
+ """Splits the uri
+ @return: tuple(freenet-key, file-name)
+ """
+ if self.keyType() != self.KeyUnknown:
+ head, sep, tail = self._uri.partition('/')
+ return head, tail
+ return self._uri, ''
+
+ def fileName(self):
+ """Returns the filename part of the uri
+ @return: str
+ """
+ head, tail = self.split()
+ if tail:
+ return tail
+ return self._uri
- while components:
- if components[0][:4] in FcpKeys.KeysAll:
- break
- else:
- components.pop(0)
-
- return components
-def uriKeyType(uri):
- pass
-
-
-def isValidUri(uri):
- """Checks if an uri is a valid freenet uri
- @param uri: (str) uri to check
- @reuturn: (bool)
- """
- return bool(UriToList(uri))
-
-
-def splitUri(uri):
- """Splits an uri into uri and filename
- @param uri: uri to split
- @return: tuple(uri, filename)
-
- >>> splitUri('SSK@foo/bar/baz')
- ('SSK@foo', 'bar/baz')
-
- >>> splitUri('SSK@foo')
- ('SSK@foo', '')
-
- >>> splitUri('NoUri')
- ('NoUri', '')
-
- """
- L = uriToList(uri)
- if not L:
- return (uri, '')
- names = L[1:]
- if not names:
- name = ''
- else:
- name = '/'.join(names)
- return (L[0], name)
-
-
-def fileNameFromUri(uri):
- """Returns the filename part of an uri
- @return: (str) filename. If no filename is found the uri is returned unchanged
- """
- tmp_uri, name = splitUri(uri)
- if name:
- return name
- return uri
-
-#**********************************************************************
-# classes
-#**********************************************************************
-class FcpSocketError(Exception): pass
class Message(object):
"""Class wrapping a freenet message"""
@@ -417,6 +438,8 @@
def __init__(self):
Message.__init__(self, self.Name)
+
+
#**************************************************************************
# jobs
#**************************************************************************
@@ -426,6 +449,18 @@
_fcp_auto_remove_ = True
def __init__(self, fcpClient, identifier, message):
+ """
+ @param fcpClient: FcpClient() instance
+ @param identifier: (str) identifier of the job
+ @param message: (Message) to send to the node whne the job ist started
+ @ivar fcpClient: FcpClient() instance
+ @ivar fcpError: holding the error message if an error was encountered while running
+ the job, None otherwise
+ @ivar fcpIdentifier: identifier of the job
+ @ivar fcpMessage: initial message send to the node
+ @ivar fcpResult: if no error was encountered, holding the result of the job when complete
+ @ivar fcpTime: when the job is complete, holding the time the job took to complete
+ """
self.fcpClient = fcpClient # FcpClient() instance the job belongs to
self.fcpError = None # last error (either this is set or dcpResult)
@@ -435,31 +470,50 @@
self.fcpTime = 0 # start time (will hld duration whern the job is complte)
def displayName(self):
+ """Returns the display name of the job
+ @return: (str) display name
+ """
return 'JobBase'
def start(self):
+ """Starts the job"""
self.fcpTime = time.time()
self.fcpClient.sendMessageEx(self.fcpMessage)
def error(self, msg):
+ """Called on job completion if an error was encounterd while runnng the job
+ @param msg: (Message) to pass to the job
+ """
self.fcpTime = time.time() - self.fcpTime
self.fcpError = msg
self.fcpResult = None
- def stop(self, result):
+ def stop(self, msg):
+ """Called on job completion to stop the job
+ @param msg: (Message) to pass to the job
+ """
self.fcpTime = time.time() - self.fcpTime
self.fcpError = None
- self.fcpResult = result
+ self.fcpResult = msg
-class JobNodeHello(JobBase):
+class JobClientHello(JobBase):
+ """Sed a ClientHello message to the node
+ @note: this must be the first message passed to the node. If everything
+ goes well, you will get a NodeHello in response.
+ """
+
_fcp_auto_remove_ = True
- def __init__(self, fcpClient, expectedVersion='2.0'):
+ def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
+ """
+ @param name: (str) connection name or None, to use an arbitrary name
+ @param expectedVersion: (str) node version expected
+ """
message = Message(
Messages.ClientHello,
- Name=newIdentifier(),
+ Name=name if name is not None else newIdentifier(),
ExpectedVersion=expectedVersion,
)
JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
@@ -469,6 +523,8 @@
class JobListPeers(JobBase):
+ """Lists all known peers of the node
+ """
_fcp_auto_remove_ = True
@@ -485,13 +541,20 @@
return 'ListPeers'
def handlePeer(self,msg):
- pass
+ """Handles the next peer send by the node in form of a 'Peer' message
+ while the job is running. Overwrite to process.
+ """
+
-
class JobGetFileInfo(JobBase):
- """"""
+ """Tries to retieve information about a file. If everything goes well
+ On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
+ Note, that both members may be '' (empty string)
+
+ """
+
_fcp_auto_remove_ = False
@@ -524,9 +587,11 @@
return 'GetFileInfo'
def handleProgress(self, msg):
- pass
+ """Handles the next progress made of a 'SimpleProgress' message
+ while the job is running. Overwrite to process.
+ """
+
-
def error(self, msg):
JobBase.error(self, msg)
if msg.name == Messages.GetFailed:
@@ -554,7 +619,9 @@
#TODO: handle case where directories are registered multiple times
class JobTestDDA(JobBase):
-
+ """Tests a directory for read / write accesss
+ """
+
_fcp_auto_remove_ = False
def __init__(self, fcpClient, directory, read=True, write=True):
@@ -608,6 +675,7 @@
# fcp client
#**************************************************************************
class LogMessages:
+ """Message strings used for log infos"""
Connecting = 'Connecting to node...'
Connected = 'Connected to node'
ConnectionRetry = 'Connecting to node failed... retrying'
@@ -628,8 +696,11 @@
#TODO: no idea what happens on reconnect if socket died. What about running jobs?
#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
+#TODO: no idea if to add support for pending jobs and queue management here
class FcpClient(object):
+ """Fcp client implementation"""
+
def __init__(self,
name='',
errorHandler=None,
@@ -648,12 +719,12 @@
self._isConnected = False
self._jobs = {
'all': {},
- 'pending': [],
+ 'pending': [], # ???
'running': [],
- 'complete': [],
+ 'complete': [], # ???
}
- self._errrorHandler = errorHandler
- self._log = logging.getLogger('FcpClient20:%s' % name)
+ self._errorHandler = errorHandler
+ self._log = logging.getLogger(NameClient + ':' + name)
self._logMessages = logMessages
self._lock = thread.allocate_lock()
self._socket = None
@@ -662,6 +733,9 @@
atexit.register(self.close)
def close(self):
+ """Closes the client
+ @note: make shure to call close() when done with the client
+ """
self._log.info(self._logMessages.ClientClose)
if self._socket is not None:
self._socket.close()
@@ -670,7 +744,13 @@
#TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
-
+ """Establishes the connection to a freenet node
+ @param host: (str) host of th node
+ @param port: (int) port of the node
+ @param repeat: (int) how many seconds try to connect before giving up
+ @param timeout: (int) how much time to wait before another attempt to connect
+ @return: True if successful, False otherwise
+ """
self._log.info(self._logMessages.Connecting)
# poll untill freenet responds
@@ -699,8 +779,55 @@
self._log.info(self._logMessages.ConnectingFailed)
return False
+
+ def handleMessage(self, msg):
+ """Handles the next message from the freenet node
+ @param msg: Message() to handle
+ """
+ self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
+
+ if msg.name == Messages.NodeHello:
+ #connectionIdentifier = msg['ConnectionIdentifier']
+ self.jobStop(JobIdentifiers.ClientHello, msg)
+
+ elif msg.name == Messages.ProtocolError:
+ code = msg['Code']
+ if code == ProtocolErrors.NoLateClientHellos:
+ self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+
+ else:
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ pass # raise ???
+ else:
+ self.jobStop(identifier, msg, error=True)
+
+ elif msg.name == Messages.Peer:
+ self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg)
+
+ elif msg.name == Messages.EndListPeers:
+ self.jobStop(IdentifierListPeers, msg)
+
+ elif msg.name == Messages.GetFailed:
+ self.jobStop(msg['Identifier'], msg, error=True)
+
+ elif msg.name == Messages.SimpleProgress:
+ self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+
+ elif msg.name == Messages.TestDDAReply:
+ self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
+
+ elif msg.name == Messages.TestDDAComplete:
+ self.jobStop(msg['Directory'], msg)
+
+ elif msg.name == Messages.IdentifierCollision:
+ pass
- def addJob(self, job):
+
+ def jobAdd(self, job):
+ """Adds a job to the client
+ @param job: (Job*) job to add
+ """
self._lock.acquire(True)
try:
if job.fcpIdentifier in self._jobs['all']:
@@ -712,12 +839,33 @@
self._log.info(self._logMessages.JobStart + job.displayName())
job.start()
-
- def stopJob(self, identifier, msg, error=False):
+
+ def jobNotify(self, identifier, handler, msg):
+ """Notifies a job about an event while it is running
+ @param identifier: identifier of the job to notify
+ @param handler: (str) method of the job to call to handle the notification
+ @param msg: Message() to pass to the job
+ """
self._lock.acquire(True)
try:
job = self._jobs['all'].get(identifier, None)
+ finally:
+ self._lock.release()
+ if job is None:
+ raise ValueError('No such job: %r' % identifier)
+ getattr(job, handler)(msg)
+
+
+ def jobStop(self, identifier, msg, error=False):
+ """Stops a job
+ @param identifier: identifier of the job to stop
+ @param msg: Message() to pass to the job as result
+ @param error: set to True to indicate unsuccessful completion of the job, True otherwisse
+ """
+ self._lock.acquire(True)
+ try:
+ job = self._jobs['all'].get(identifier, None)
if job is not None:
self._jobs['running'].remove(job)
if job._fcp_auto_remove_:
@@ -734,92 +882,22 @@
job.error(msg)
else:
job.stop(msg)
-
- def notifyJob(self, identifier, handler, msg):
- self._lock.acquire(True)
- try:
- job = self._jobs['all'].get(identifier, None)
- finally:
- self._lock.release()
- if job is None:
- raise ValueError('No such job: %r' % identifier)
- getattr(job, handler)(msg)
-
- def run(self):
-
- # TODO:
- # x. push pending jobs
- try:
- while True:
- if not self._lock.acquire(False):
- continue
-
- try:
- if not self._jobs['pending'] and not self._jobs['running']:
- self._log.info(self._logMessages.JobsCompleted)
- break
- finally:
- self._lock.release()
-
- self.next()
- except KeyboardInterrupt:
- self._log(self._logMessages.KeyboardInterrupt)
- self.close()
-
-
#TODO: some info when all jobs are completed
def next(self):
+ """Pumps the next message waiting
+ @note: use this method instead of run() to run the client step by step
+ """
msg = self.readMessage()
self.handleMessage(msg)
-
- def handleMessage(self, msg):
-
- self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
-
- if msg.name == Messages.NodeHello:
- #connectionIdentifier = msg['ConnectionIdentifier']
- self.stopJob(JobIdentifiers.ClientHello, msg)
-
- elif msg.name == Messages.ProtocolError:
- code = msg['Code']
- if code == ProtocolErrors.NoLateClientHellos:
- self.stopJob(JobIdentifiers.ClientHello, msg, error=True)
-
- else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- pass # raise ???
- else:
- self.stopJob(identifier, msg, error=True)
-
- elif msg.name == Messages.Peer:
- self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg)
-
- elif msg.name == Messages.EndListPeers:
- self.stopJob(IdentifierListPeers, msg)
-
- elif msg.name == Messages.GetFailed:
- self.stopJob(msg['Identifier'], msg, error=True)
-
- elif msg.name == Messages.SimpleProgress:
- self.notifyJob(msg['Identifier'], 'handleProgress', msg)
-
- elif msg.name == Messages.TestDDAReply:
- self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg)
-
- elif msg.name == Messages.TestDDAComplete:
- self.stopJob(msg['Directory'], msg)
-
- elif msg.name == Messages.IdentifierCollision:
- pass
-
-
def readMessage(self):
"""Reads the next message directly from the socket and dispatches it
- @return: valid or invalid Message()
+ @return: (Message) the next message read from the socket
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
"""
msg = Message(None)
buf = []
@@ -872,20 +950,42 @@
pass
return msg
+
+
+ def run(self):
+ """Runs the client untill all jobs passed to it are completed
+ @note: use KeyboardInterrupt to stop prematurely
+ """
+ # TODO:
+ # x. push pending jobs
+ try:
+ while True:
+ if not self._lock.acquire(False):
+ continue
+
+ try:
+ if not self._jobs['pending'] and not self._jobs['running']:
+ self._log.info(self._logMessages.JobsCompleted)
+ break
+ finally:
+ self._lock.release()
+
+ self.next()
+ except KeyboardInterrupt:
+ self._log(self._logMessages.KeyboardInterrupt)
+ self.close()
- def setLogMessages(self, logMessages):
- self._logMessages = logMessages
- def setVerbosity(self, verbosity):
- self._log.setLevel(verbosity)
-
def sendMessage(self, name, data=None, **params):
"""Sends a message to freenet
@param name: name of the message to send
@param data: data to atatch to the message
@param params: {para-name: param-calue, ...} of parameters to pass along
with the message (see freenet protocol)
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
"""
return self.sendMessageEx(Message(name, data=data, **params))
@@ -894,6 +994,9 @@
"""Sends a message to freenet
@param msg: (Message) message to send
@return: Message
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly.
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
"""
rawMsg = msg.toString()
self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint())
@@ -907,14 +1010,24 @@
raise FcpSocketError(d)
return msg
+
+ def setLogMessages(self, logMessages):
+ """"""
+ self._logMessages = logMessages
+
+
+ def setVerbosity(self, verbosity):
+ """"""
+ self._log.setLevel(verbosity)
+
#*****************************************************************************
#
#*****************************************************************************
if __name__ == '__main__':
c = FcpClient(name='test', verbosity=logging.DEBUG)
if c.connect():
- job1 = JobNodeHello(c)
- c.addJob(job1)
+ job1 = JobClientHello(c)
+ c.jobAdd(job1)
c.run()
print '---------------------------'
@@ -924,21 +1037,21 @@
print '---------------------------'
- job2 = JobTestDDA(c, os.path.dirname(__file__))
- c.addJob(job2)
- c.run()
- print '---------------------------'
- print job1.fcpError
- print job2.fcpResult
- print job2.fcpTime
- print '---------------------------'
-
- job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
- c.addJob(job2)
- c.run()
- print '---------------------------'
- print job1.fcpError
- print job2.fcpResult
- print job2.fcpTime
- print '---------------------------'
+ job2 = JobTestDDA(c, os.path.dirname(__file__))
+ c.jobAdd(job2)
+ c.run()
+ print '---------------------------'
+ print job1.fcpError
+ print job2.fcpResult
+ print job2.fcpTime
+ print '---------------------------'
+
+ job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
+ c.jobAdd(job2)
+ c.run()
+ print '---------------------------'
+ print job1.fcpError
+ print job2.fcpResult
+ print job2.fcpTime
+ print '---------------------------'
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|