fclient-commit Mailing List for fclient (Page 39)
Status: Pre-Alpha
Brought to you by:
jurner
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(23) |
Nov
(54) |
Dec
|
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(17) |
Feb
(209) |
Mar
(63) |
Apr
(31) |
May
(7) |
Jun
(39) |
Jul
(390) |
Aug
(122) |
Sep
(6) |
Oct
|
Nov
|
Dec
|
From: <jU...@us...> - 2007-10-25 14:04:41
|
Revision: 12 http://fclient.svn.sourceforge.net/fclient/?rev=12&view=rev Author: jUrner Date: 2007-10-25 05:59:16 -0700 (Thu, 25 Oct 2007) Log Message: ----------- bit more work on protocol Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-25 12:59:16 UTC (rev 12) @@ -1,5 +1,16 @@ +'''Freenet client protocol 2.0 implementation +''' +#NOTE: +# +# downloading data to disk is not supported st the moment. TestDDA code is quite unwritable +# and as far as I can see there are plans to get rid of it. So wait... +# +# + + import atexit +import base64 import logging import os import re @@ -187,7 +198,7 @@ """Returns a new unique identifier @return: (str) uuid """ - return str(uuid.uuid4()) + return 'fclient::' + str(uuid.uuid4()) def pythonBool(fcpBool): @@ -251,7 +262,7 @@ @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat') @return: (string) whatever freenet returns """ - #TODO: on windows it may be necessary to hide the comand window + #TODO: on windows it may be necessary to hide the command window p = subprocess.Popen( args=cmdline, shell=True, @@ -368,10 +379,10 @@ ModifyPeerNote = 'ModifyPeerNote' RemovePeer = 'RemovePeer' GetNode = 'GetNode' - GetConfig = 'GetConfig' # (since 1027) + GetConfig = 'GetConfig' # (since 1027) ModifyConfig = 'ModifyConfig' # (since 1027) - TestDDARequest = 'TestDDARequest' # (since 1027) - TestDDAResponse = 'TestDDAResponse' # (since 1027) + TestDDARequest = 'TestDDARequest' # (since 1027) + TestDDAResponse = 'TestDDAResponse' # (since 1027) GenerateSSK = 'GenerateSSK' ClientPut = 'ClientPut' ClientPutDiskDir = 'ClientPutDiskDir' @@ -433,7 +444,7 @@ self.params = params def toString(self): - """Returns a string with the formated message, ready to be send""" + """Returns the message as formated string ready to be send""" # TODO: "Data" not yet implemented out = [self.name, ] for param, value in self.params.items(): @@ -447,7 +458,6 @@ for param, value in self.params.items(): out.append('>> %s=%s' % (param, value)) out.append('>>EndMessage') - out.append('') return '\n'.join(out) def __getitem__(self, name): @@ -508,6 +518,8 @@ self.jobTimeStart = time.time() self.jobClient.sendMessageEx(self.jobMessage) + + # XXX def handleStop(self, flagError, msg): """Called on job completion to stop the job @param flagError: True if an error was encountered, False otherwise @@ -569,13 +581,18 @@ """ def __init__(self, fcpClient, withMetaData=False, withVolantile=False): + """ + @param withMetaData: include meta data for each peer? + @param withVolantile: include volantile data for each peer? + @ivar jobResult: on job completion, will be a list containing all perrs as one 'Peer' message for each peer + """ message = Message( Message.ListPeers, WithMetadata=fcpBool(withMetaData), WithVolatile=fcpBool(withVolantile), ) JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) - + def handleMessage(self,msg): if msg.name == Message.EndListPeers: @@ -585,17 +602,72 @@ else: raise ValueError('Unexpected message: %s' % msg.name) + def handlePeer(self, msg): + if self.jobResult is None: + self.jobResult = [msg, ] + else: + self.jobResult.append(msg) return True def handleEndListPeers(self, msg): self.jobTimeStop = time.time() - self.jobResult = msg + if self.jobResult is None: + self.jobResult = [] self.jobClient.jobRemove(self.jobIdentifier) return True + +class JobListPeerNotes(JobBase): + """Lists all notes associated to a peer of the node + """ + + def __init__(self, fcpClient, identifier): + """ + @param identifier: identifier of the peer to list notes for (peer identity) + @ivar jobResult: on job completion will be a list containing all notes associated to the peer + + @note: notes are only available for darknet peers (opennet == false) + """ + + message = Message( + Message.ListPeerNotes, + NodeIdentifier=identifier + ) + JobBase.__init__(self, fcpClient, identifier, message) + + + def handleMessage(self,msg): + if msg.name == Message.EndListPeerNotes: + return self.handleEndListPeerNotes(msg) + elif msg.name == Message.PeerNote: + return self.handlePeerNote(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handlePeerNote(self, msg): + note = msg.get('NoteText', '') + if note: + note = base64.decodestring(note) + if self.jobResult is None: + self.jobResult = [note, ] + else: + self.jobResult.append(note) + return True + + + def handleEndListPeerNotes(self, msg): + self.jobTimeStop = time.time() + if self.jobResult is None: + self.jobResult = [] + self.jobClient.jobRemove(self.jobIdentifier) + return True + + + + #TODO: identifier collisions are not yet handled class JobGetFileInfo(JobBase): """Tries to retieve information about a file. If everything goes well @@ -735,7 +807,13 @@ """ - def __init__(self, fcpClient, directory, read=True, write=True): + def __init__(self, fcpClient, directory, read=False, write=False): + """ + + @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed) + """ + + if not os.path.isdir(directory): raise ValueError('No such directory: %r' % directory) @@ -747,8 +825,7 @@ ) JobBase.__init__(self, fcpClient, directory, message) self.jobTmpFile = None - - + def handleMessage(self, msg): if msg.name == Message.TestDDAReply: @@ -786,13 +863,50 @@ def handleTestDDAComplete(self, msg): self.jobTimeStop = time.time() - self.jobResult = msg + self.jobResult = ( + pythonBool(msg.get('ReadDirectoryAllowed', 'false')), + pythonBool(msg.get('WriteDirectoryAllowed', 'false')), + ) saveRemoveFile(self.jobTmpFile) self.jobTmpFile = None self.jobClient.jobRemove(self.jobIdentifier) return True + +class JobGenerateSSK(JobBase): + """Job to generate a SSK key pair + """ + + + def __init__(self, fcpClient): + """ + @ivar jobResult: on job completion, a tuple(insertURI, requestURI) of the generated + SSK key + """ + + identifier = newIdentifier() + message = Message( + Message.GenerateSSK, + Identifier=identifier, + ) + JobBase.__init__(self, fcpClient, identifier, message) + + + def handleMessage(self, msg): + if msg.name == Message.SSKKeypair: + return self.handleSSKKeypair(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + + def handleSSKKeypair(self, msg): + self.jobTimeStop = time.time() + self.jobResult = (msg['InsertURI'], msg['RequestURI']) + self.jobClient.jobRemove(self.jobIdentifier) + return True + + #************************************************************************** # fcp client #************************************************************************** @@ -840,20 +954,19 @@ with two params: SocketError + details. When the handler is called the client is already closed. @param verbosity: verbosity level for debugging - @param logMessages: LogMessages class containing messages + @param logMessages: LogMessages class containing message strings """ self._isConnected = False self._jobs = { - #TODO: check if JobList is still required - 'JobMapping': {}, - 'JobList': [], + 'Jobs': {}, + 'PendingJobs': [], 'RegisteredDirectories': [], } - self._errorHandler = errorHandler + self._errorHandler = errorHandler #TODO: check! self._log = logging.getLogger(name) self._logMessages = logMessages - self._lock = thread.allocate_lock() # lovk when resources are accessed + self._lock = thread.allocate_lock() # lock when resources are accessed self._socket = None self.setVerbosity(verbosity) @@ -933,7 +1046,7 @@ if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst: return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - elif code == ProtocolError.Shutdown: + elif code == ProtocolError.ShuttingDown: if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg): # ######################################## @@ -956,6 +1069,11 @@ identifier = msg['Directory'] elif msg.name == Message.TestDDAComplete: identifier = msg['Directory'] + elif msg.name == Message.PeerNote: + identifier = msg['NodeIdentifier'] + elif msg.name == Message.EndListPeerNotes: + identifier = msg['NodeIdentifier'] + else: identifier = msg.get('Identifier', None) @@ -971,6 +1089,13 @@ elif msg.name == Message.Peer: return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) + #elif msg.name == Message.PeerNote: + # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg) + + #elif msg.name == Message.EndListPeerNotes: + # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg) + + # more here..... else: @@ -993,9 +1118,12 @@ """ self._lock.acquire(True) try: - result = bool(self._jobs['JobList']) + result = self._jobs['Jobs'] or self._jobs['PendingJobs'] finally: self._lock.release() + + + return result @@ -1006,20 +1134,16 @@ """ self._lock.acquire(True) try: - if job.jobIdentifier in self._jobs['JobMapping']: - raise ValueError('Job with that identifier already present') - - if job.jobIdentifier in self._jobs['JobMapping']: + if job.jobIdentifier in self._jobs['Jobs']: raise ValueError('Duplicate job: %r' % job.jobIdentifier) - self._jobs['JobMapping'][job.jobIdentifier] = job - self._jobs['JobList'].append(job) + self._jobs['Jobs'][job.jobIdentifier] = job finally: self._lock.release() self._log.info(self._logMessages.JobStart + job.jobMessage.name) job.handleStart() if synchron: - while job.jobResult is None: + while self.jobGet(job.jobIdentifier): self.next() @@ -1042,7 +1166,7 @@ """ self._lock.acquire(True) try: - result = self._jobs['JobMapping'].get(identifier, None) + result = self._jobs['Jobs'].get(identifier, None) finally: self._lock.release() return result @@ -1055,7 +1179,7 @@ """ self._lock.acquire(True) try: - result = identifier in self._jobs['JobMapping'] + result = identifier in self._jobs['Jobs'] finally: self._lock.release() return result @@ -1068,10 +1192,9 @@ """ self._lock.acquire(True) try: - job = self._jobs['JobMapping'].get(identifier, None) + job = self._jobs['Jobs'].get(identifier, None) if job is not None: - self._jobs['JobList'].remove(job) - del self._jobs['JobMapping'][identifier] + del self._jobs['Jobs'][identifier] finally: self._lock.release() if job is None: @@ -1209,17 +1332,86 @@ self._log.setLevel(verbosity) + ######################################################## ## ######################################################## def getFileInfo(self, job): pass - def getFile(self, job): - pass + + + ######################################################### + ## boilerplate code to tackle TestDDA + ## + ## ...but I don't trust it ;-) I was not yet alble to wrap my head around + ## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not. + ## + ## Another problem is that there is no way to know when a directory is no longer + ## needed. And I fon't want to write code in a Gui to tackle a problem that will hopefully + ## go away in the near future. + ## + ## see: https://bugs.freenetproject.org/view.php?id=1753 + ## + ######################################################### + def testWriteAccess(self, directory): + canRead, canWrite = False, False + result = self._jobs.get('RegisteredDirectories', None) + if result is not None: + canRead, canWrite = result + if not canWrite: + job = JobTestDDA(directory, read=canRead, write=True) + self.addJob(job, synchron=True) + canWrite = job.jobResult[1] + self._jobs['RegisteredDirectories'] = (canRead, canWrite) + return canWrite + + def testReadAccess(self, directory): + canRead, canWrite = False, False + result = self._jobs.get('RegisteredDirectories', None) + if result is not None: + canRead, canWrite = result + if not canRead: + job = JobTestDDA(directory, read=True, write=canWrite) + self.addJob(job, synchron=True) + canRead = job.jobResult[0] + self._jobs['RegisteredDirectories'] = (canRead, canWrite) + return canRead + + + def downloadFile(self, directory, job): + if not os.path.isdir(directory): + raise IOError('No such directory') + + self._jobs['PendingJobs'].append(job) + try: + result = self.testWriteAccess(directory) + if result: + self.addJob(job) + finally: + self._jobs['PendingJobs'].remove(job) + return result + + def uploadFile(self, directory, job): + if not os.path.isdir(directory): + raise IOError('No such directory') + + self._jobs['PendingJobs'].append(job) + try: + result = self.testReadAccess(directory) + if result: + self.addJob(job) + finally: + self._jobs['PendingJobs'].remove(job) + return result + + + + + #***************************************************************************** # #***************************************************************************** @@ -1239,7 +1431,16 @@ #foo() + def foo(): + job = JobGenerateSSK(c) + c.jobAdd(job, synchron=True) + print job.jobResult + foo() + + + + def foo(): d = os.path.dirname(os.path.abspath(__file__)) job2 = JobTestDDA(c, d) c.jobAdd(job2) @@ -1250,12 +1451,21 @@ #foo() def foo(): - job2 = JobListPeers(c) - c.jobAdd(job2) + job = JobListPeers(c) + c.jobAdd(job) c.run() print '---------------------------' - print job2.jobResult + print job.jobResult print '---------------------------' + + + for peer in job.jobResult: + if not pythonBool(peer['opennet']): + job = JobListPeerNotes(c, peer['identity']) + c.jobAdd(job, synchron=True) + print '>>', job.jobResult + #.get('NoteText') + #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-23 09:19:10
|
Revision: 11 http://fclient.svn.sourceforge.net/fclient/?rev=11&view=rev Author: jUrner Date: 2007-10-23 02:19:12 -0700 (Tue, 23 Oct 2007) Log Message: ----------- combed over message dispatch Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11) @@ -14,7 +14,6 @@ #************************************************************** # consts #************************************************************** -NameClient = 'Fcp20Client' DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() try: DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) @@ -28,6 +27,13 @@ """ ClientHello = 'ClientHello' ListPeers = 'ListPeers' + ListPeerNotes = 'ListPeerNotes' + GetNode = 'GetNode' + GetConfig = 'GetConfig' + ModifyConfig = 'ModifyConfig' + WatchGlobal = 'WatchGlobal' + Shutdown = 'Shutdown' + class Priorities: @@ -492,6 +498,10 @@ self.jobTimeStop = 0 + def handleMessage(self, msg): + return False + + def handleStart(self): """Starts the job""" self.jobResult = None @@ -514,7 +524,6 @@ goes well, you will get a NodeHello in response. """ - def __init__(self, fcpClient, name=None, expectedVersion='2.0'): """ @param name: (str) connection name or None, to use an arbitrary name @@ -529,13 +538,36 @@ def displayName(self): return 'ClientHello' + + + def handleMessage(self, msg): + if msg.name == Message.NodeHello: + return self.handleNodeHello(msg) + elif msg.name == Message.ProtocolError: + return self.handleProtocolError(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handleNodeHello(self, msg): + self.jobTimeStop = time.time() + self.jobResult = msg + self.jobClient.jobRemove(self.jobIdentifier) + return True + + + def handleProtocolError(self, msg): + raise ProtocolError(msg) + + + + + class JobListPeers(JobBase): """Lists all known peers of the node """ - - + def __init__(self, fcpClient, withMetaData=False, withVolantile=False): message = Message( Message.ListPeers, @@ -545,13 +577,26 @@ JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) - def handlePeer(self,msg): - """Handles the next peer send by the node in form of a 'Peer' message - while the job is running. Overwrite to process. - """ + def handleMessage(self,msg): + if msg.name == Message.EndListPeers: + return self.handleEndListPeers(msg) + elif msg.name == Message.Peer: + return self.handlePeer(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handlePeer(self, msg): + return True + + def handleEndListPeers(self, msg): + self.jobTimeStop = time.time() + self.jobResult = msg + self.jobClient.jobRemove(self.jobIdentifier) + return True + - +#TODO: identifier collisions are not yet handled class JobGetFileInfo(JobBase): """Tries to retieve information about a file. If everything goes well @@ -571,6 +616,11 @@ MaxRetries=-1 ...N PriorityClass=Priority* + @ivar jobResult: will be a tuple(bool error, data). If error is True, no information could be + retrieved and data will be a GetFailed message containing details. If error is False + data will be a tuple(str metadataContentType, str size). Note that both may be empty + string and size may not be accurate. + """ identifier = newIdentifier() message = Message( @@ -612,46 +662,73 @@ ) - def handlePriorityChanged(self): - self.jobMessage['PriorityClass'] = priority + def handleMessage(self, msg): + if msg.name == Message.DataFound: + return self.handleDataFound(msg) + elif msg.name == Message.GetFailed: + return self.handleGetFailed(msg) + elif msg.name == Message.IdentifierCollision: + return self.handleIdentifierCollision(msg) + elif msg.name == Message.PersistentRequestModified: + return self.handlePersistentRequestModified(msg) + elif msg.name == Message.PersistentRequestRemoved: + return self.handlePersistentRequestRemoved(msg) + elif msg.name == Message.SimpleProgress: + return self.handleSimpleProgress(msg) + + else: + raise ValueError('Unexpected message: %s' % msg.name) + - def handleProgress(self, msg): - """Handles the next progress made. Overwrite to process. - """ + def handleDataFound(self, msg): + self.jobTimeStop = time.time() + self.jobResult = ( + False, + ( + msg.get('Metadata.ContentType', ''), + msg.get('DataLength', '') + ) + ) + self.jobClient.jobRemove(self.jobIdentifier) + return True - def handleRequestRemoved(self, msg): + def handleGetFailed(self, msg): + self.jobTimeStop = time.time() + if msg['Code'] == FetchError.TooBig: + self.jobResult = (False, msg) + self.jobResult = ( + False, + ( + msg.get('ExpectedMetadata.ContentType', ''), + msg.get('ExpectedDataLength', '') + ) + ) + else: + self.jobResult = (True, msg) + self.jobClient.jobRemove(self.jobIdentifier) + return True + + + def handleIdentifierCollision(self, msg): + raise + + + def handleSimpleProgress(self, msg): + return True + + + def handlePersistentRequestModified(self, msg): + priorityClass = msg.get('PriorityClass', None) + if priorityClass is not None: + self.jobMessage['PriorityClass'] = priorityClass + return True + + def handlePersistentRequestRemoved(self, msg): if self.jobClient.jobIsRunning(self.jobIdentifier): self.jobClient.jobStop(self.jobIdentifier) + return True + - - def handleStop(self, flagError, msg): - """called when a job is finished - @note: jobResult may contain an 'IdentifierCollision' message. Make shure - to handle this appropriately - """ - JobBase.handleStop(self,flagError, msg) - if msg.name == Message.DataFound: - self.jobResult = ( - msg.get('Metadata.ContentType', ''), - msg.get('DataLength', '') - ) - elif msg.name == Message.GetFailed: - if msg['Code'] == FetchError.TooBig: - self.jobResult = (False, msg) - self.jobResult = ( - msg.get('ExpectedMetadata.ContentType', ''), - msg.get('ExpectedDataLength', '') - ) - # not shure when removing the request is actually required - # so send it anyway - self.jobClient.sendMessage( - Message.RemovePersistentRequest, - Global=fcpBool(False), - Identifier=self.jobIdentifier, - ) - - - #TODO: handle case where directories are registered multiple times class JobTestDDA(JobBase): """Tests a directory for read / write accesss @@ -672,6 +749,16 @@ self.jobTmpFile = None + + def handleMessage(self, msg): + if msg.name == Message.TestDDAReply: + return self.handleTestDDAReply(msg) + elif msg.name == Message.TestDDAComplete: + return self.handleTestDDAComplete(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handleTestDDAReply(self, msg): fpathWrite = msg.params.get('WriteFilename', None) fpathRead = msg.params.get('ReadFilename', None) @@ -693,13 +780,18 @@ Message.TestDDAResponse, Directory=msg['Directory'], ReadContent=readContent, - ) + ) + return True + - - def handleStop(self, flagError, msg): - JobBase.handleStop(self, flagError, msg) + def handleTestDDAComplete(self, msg): + self.jobTimeStop = time.time() + self.jobResult = msg saveRemoveFile(self.jobTmpFile) self.jobTmpFile = None + self.jobClient.jobRemove(self.jobIdentifier) + return True + #************************************************************************** # fcp client @@ -728,6 +820,10 @@ #TODO: no idea what happens on reconnect if socket died. What about running jobs? #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. #TODO: no idea if to add support for pending jobs and queue management here + +#TODO: do not mix directories as identifiers with identifiers (might lead to collisions) +#TODO: how to handle (ProtocolError code 18: Shutting down)? + class FcpClient(object): """Fcp client implementation""" @@ -757,7 +853,7 @@ self._errorHandler = errorHandler self._log = logging.getLogger(name) self._logMessages = logMessages - self._lock = thread.allocate_lock() + self._lock = thread.allocate_lock() # lovk when resources are accessed self._socket = None self.setVerbosity(verbosity) @@ -806,9 +902,8 @@ # as expected the socket simply breaks. So take it over. job = JobClientHello(self) self.jobAdd(job, synchron=True) - error, msg = job.jobResult - assert not error, 'Error should have been caught by handleMessage()' - return msg + assert job.jobResult is not None, 'ClientHello is not working as expected' + return job.jobResult self._log.info(self._logMessages.ConnectionRetry) @@ -827,59 +922,66 @@ """ - #if msg.name != 'USocketTimeOut': + if msg.name == 'USocketTimeOut': + return True + self._log.debug(self._logMessages.MessageReceived + msg.pprint()) - if msg.name == Message.NodeHello: - #connectionIdentifier = msg['ConnectionIdentifier'] - self.jobStop(FixedJobIdentifiers.ClientHello, msg) - - elif msg.name == Message.ProtocolError: + + if msg.name == Message.ProtocolError: code = msg['Code'] - #if code == ProtocolError.NoLateClientHellos: - # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True) - #elif code == ProtocolError.ClientHelloMustBeFirst: - # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True) - #else: - identifier = msg.get('Identifier', None) - if identifier is None: - #TODO: inform caller - raise ProtocolError(msg) + if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst: + return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) + + elif code == ProtocolError.Shutdown: + if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg): + + # ######################################## + #TODO: ??? + + return True + else: - return self.jobStop(identifier, msg, flagError=True) + identifier = msg.get('Identifier', None) + if identifier is None: + #TODO: inform caller + raise ProtocolError(msg) + else: + return self.jobDispatchMessage(identifier, msg) + + else: + + # check if the is something like an identifier in the message + if msg.name == Message.TestDDAReply: + identifier = msg['Directory'] + elif msg.name == Message.TestDDAComplete: + identifier = msg['Directory'] + else: + identifier = msg.get('Identifier', None) - if msg.name == Message.DataFound: - return self.jobStop(msg['Identifier'], msg, flagError=True) - - elif msg.name == Message.EndListPeers: - return self.jobStop(FixedJobIdentifiers.ListPeers, msg) - - elif msg.name == Message.GetFailed: - return self.jobStop(msg['Identifier'], msg, flagError=True) + # dispatch to jobs with fixed identifiers + if identifier is None: + + if msg.name == Message.NodeHello: + return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - elif msg.name == Message.PersistentRequestRemoved: - return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg) - - elif msg.name == Message.SimpleProgress: - return self.jobNotify(msg['Identifier'], 'handleProgress', msg) + elif msg.name == Message.EndListPeers: + return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Message.TestDDAComplete: - return self.jobStop(msg['Directory'], msg) + elif msg.name == Message.Peer: + return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Message.TestDDAReply: - return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) + # more here..... + + else: + raise ValueError('Unhandled message: ' + msg.name) + + else: + return self.jobDispatchMessage(identifier, msg) + + raise RuntimeError('Should not have endet here: %s' % msg.name) - elif msg.name == Message.IdentifierCollision: - # identifier is unique to us, but for some reason not unique to the node - #TODO: maybe for future implementations find a way to automise - # reinsertion and a notification via a handleJobChanged() for the - # caller to overwrite - return self.jobStop(msg['Identifier'], msg, flagError=True) - - elif msg.name == Message.Peer: - return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg) - - return False + ######################################################### @@ -905,6 +1007,9 @@ self._lock.acquire(True) try: if job.jobIdentifier in self._jobs['JobMapping']: + raise ValueError('Job with that identifier already present') + + if job.jobIdentifier in self._jobs['JobMapping']: raise ValueError('Duplicate job: %r' % job.jobIdentifier) self._jobs['JobMapping'][job.jobIdentifier] = job self._jobs['JobList'].append(job) @@ -916,7 +1021,19 @@ if synchron: while job.jobResult is None: self.next() + + def jobDispatchMessage(self, identifier, msg): + """Dispatches a message to a job + @param identifier: identifier of the job + @param msg: (Message) message to dispatch + @return: True if the message was handled, False otherwise + """ + job = self.jobGet(identifier) + if job is not None: + return job.handleMessage(msg) + return False + def jobGet(self, identifier): """Returns a job given its identifier @@ -943,36 +1060,15 @@ self._lock.release() return result - - def jobNotify(self, identifier, handler, msg): - """Notifies a job about an event while it is running - @param identifier: identifier of the job to notify - @param handler: (str) method of the job to call to handle the notification - @param msg: Message() to pass to the job - @return: True if a job was found to be notified, False otherwise + + def jobRemove(self, identifier): + """Removes a job unconditionally + @param identifier: identifier of the job to remove + @return: True if the job was found, False otherwise """ self._lock.acquire(True) try: job = self._jobs['JobMapping'].get(identifier, None) - finally: - self._lock.release() - if job is None: - return False - getattr(job, handler)(msg) - return True - - - #TODO: quite unclear when to remove a job - def jobStop(self, identifier, msg, flagError=False): - """Stops a job - @param identifier: identifier of the job to stop - @param msg: Message() to pass to the job as result - @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse - @return: True if a job was found to be stopped, False otherwise - """ - self._lock.acquire(True) - try: - job = self._jobs['JobMapping'].get(identifier, None) if job is not None: self._jobs['JobList'].remove(job) del self._jobs['JobMapping'][identifier] @@ -981,10 +1077,9 @@ if job is None: return False self._log.info(self._logMessages.JobStop + job.jobMessage.name) - job.handleStop(flagError, msg) return True - + #TODO: some info when all jobs are completed def next(self): """Pumps the next message waiting @@ -1059,8 +1154,13 @@ @note: use KeyboardInterrupt to stop prematurely """ try: + #n = 0 + while self.hasJobsRunning(): + #n += 1 + #if n > 40: break self.next() + except KeyboardInterrupt: self._log(self._logMessages.KeyboardInterrupt) self.close() @@ -1117,7 +1217,9 @@ def getFile(self, job): pass + + #***************************************************************************** # #***************************************************************************** @@ -1158,10 +1260,22 @@ def foo(): - job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') - c.jobAdd(job2) + job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip') + + + #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + #job.jobIdentifier = job.jobMessage['Identifier'] = 1 + #job.jobMessage['Identifier'] = 1 + #job.jobIdentifier = 1 + c.jobAdd(job) + + #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%201%281%29.jpg') + #job.jobMessage['Identifier'] = 1 + #job.jobIdentifier = 1 + #c.jobAdd(job) + c.run() print '---------------------------' - print job2.jobResult + print job.jobResult print '---------------------------' #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-22 09:02:02
|
Revision: 10 http://fclient.svn.sourceforge.net/fclient/?rev=10&view=rev Author: jUrner Date: 2007-10-22 02:02:04 -0700 (Mon, 22 Oct 2007) Log Message: ----------- continued working on protocol implementation Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 13:37:51 UTC (rev 9) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10) @@ -16,10 +16,10 @@ #************************************************************** NameClient = 'Fcp20Client' DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() -DefaultFcpPort = 9481 try: DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) -except: pass +except ValueError: + DefaultFcpPort = 9481 SocketTimeout = 0.1 class FixedJobIdentifiers: @@ -33,13 +33,13 @@ class Priorities: """All priorities supported by the client""" - Maximum = 0 - Interactive = 1 - SemiInteractive = 2 - Updatable = 3 - Bulk = 4 - Prefetch = 5 - Minimum = 6 + Maximum = '0' + Interactive = '1' + SemiInteractive = '2' + Updatable = '3' + Bulk = '4' + Prefetch = '5' + Minimum = '6' PriorityMin = Minimum PriorityDefault = Bulk @@ -471,42 +471,40 @@ class JobBase(object): """Base class for jobs""" - _fcp_auto_remove_ = True - + def __init__(self, fcpClient, identifier, message): """ @param fcpClient: FcpClient() instance @param identifier: (str) identifier of the job @param message: (Message) to send to the node whne the job ist started - @ivar fcpClient: FcpClient() instance - @ivar fcpError: holding the error message if an error was encountered while running - the job, None otherwise - @ivar fcpIdentifier: identifier of the job - @ivar fcpMessage: initial message send to the node - @ivar fcpResult: if no error was encountered, holding the result of the job when complete - @ivar fcpTimeStart: time the job was started - @ivar fcpTimeStop: time the job was stopped + @ivar jobClient: FcpClient() instance of the job + @ivar jobIdentifier: identifier of the job + @ivar jobMessage: message to be send to the node + @ivar jobResult: if no error was encountered, holding the result of the job when complete + @ivar jobTimeStart: time the job was started + @ivar jobTimeStop: time the job was stopped """ - self.fcpClient = fcpClient # FcpClient() instance the job belongs to - self.fcpIdentifier = identifier # job identifier - self.fcpMessage = message # message send to node - self.fcpResult = None # job result (bool error, Message msg) - self.fcpTimeStart = 0 # time the job was started - self.fcpTimeStop = 0 # time the job was stopped + self.jobClient = fcpClient + self.jobIdentifier = identifier + self.jobMessage = message + self.jobResult = None + self.jobTimeStart = 0 + self.jobTimeStop = 0 + def handleStart(self): """Starts the job""" - self.fcpResult = None - self.fcpTimeStart = time.time() - self.fcpClient.sendMessageEx(self.fcpMessage) + self.jobResult = None + self.jobTimeStart = time.time() + self.jobClient.sendMessageEx(self.jobMessage) def handleStop(self, flagError, msg): """Called on job completion to stop the job @param flagError: True if an error was encountered, False otherwise @param msg: (Message) to pass to the job """ - self.fcpTimeStop = time.time() - self.fcpResult = (flagError, msg) + self.jobTimeStop = time.time() + self.jobResult = (flagError, msg) class JobClientHello(JobBase): @@ -516,8 +514,7 @@ goes well, you will get a NodeHello in response. """ - _fcp_auto_remove_ = True - + def __init__(self, fcpClient, name=None, expectedVersion='2.0'): """ @param name: (str) connection name or None, to use an arbitrary name @@ -538,8 +535,7 @@ """Lists all known peers of the node """ - _fcp_auto_remove_ = True - + def __init__(self, fcpClient, withMetaData=False, withVolantile=False): message = Message( Message.ListPeers, @@ -561,13 +557,10 @@ On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file. Note, that both members may be '' (empty string) - """ - _fcp_auto_remove_ = False - - - # idea is to provoke a GetFailed message and take mimetype and size from it + + # idea is to provoke a GetFailed message and take mimetype and size from 'GetFailed' def __init__(self, fcpClient, uri, **params): """ @param fcpClient: FcpClient() instance @@ -584,7 +577,9 @@ Message.ClientGet, Identifier=identifier, URI=uri, - MaxSize='0', + # suggested by Mathew Toseland to use about 32k for mimeType requests + # basic sizes of keys are: 1k for SSks and 32k for CHKs + MaxSize='32000', ReturnType='none', Verbosity='1', **params @@ -592,34 +587,77 @@ JobBase.__init__(self, fcpClient, identifier, message) + def getPrority(self): + return self.jobMessage.get('PriorityClass', Priorities.PriorityDefault) + + + def setPriority(self, priority): + if not priority in Priorities: + raise ValueError('Invalid priority: %r' % priority) + self.jobClient.sendMessage( + Message.ModifyPersistentRequest, + Identifier=self.jobIdentifier, + Global=fcpBool(False), + PriorityClass=priority, + ) + # not shure if the response arrives in any case, so set it here + self.jobMessage['PriorityClass'] = priority + + + def stopRequest(self): + self.jobClient.sendMessage( + Message.RemovePersistentRequest, + Global=fcpBool(False), + Identifier=self.jobIdentifier, + ) + + + def handlePriorityChanged(self): + self.jobMessage['PriorityClass'] = priority + def handleProgress(self, msg): """Handles the next progress made. Overwrite to process. """ + + def handleRequestRemoved(self, msg): + if self.jobClient.jobIsRunning(self.jobIdentifier): + self.jobClient.jobStop(self.jobIdentifier) + def handleStop(self, flagError, msg): + """called when a job is finished + @note: jobResult may contain an 'IdentifierCollision' message. Make shure + to handle this appropriately + """ JobBase.handleStop(self,flagError, msg) if msg.name == Message.DataFound: - self.fcpResult = ( + self.jobResult = ( msg.get('Metadata.ContentType', ''), msg.get('DataLength', '') ) elif msg.name == Message.GetFailed: if msg['Code'] == FetchError.TooBig: - self.fcpResult = (False, msg) - self.fcpResult = ( + self.jobResult = (False, msg) + self.jobResult = ( msg.get('ExpectedMetadata.ContentType', ''), msg.get('ExpectedDataLength', '') ) - + # not shure when removing the request is actually required + # so send it anyway + self.jobClient.sendMessage( + Message.RemovePersistentRequest, + Global=fcpBool(False), + Identifier=self.jobIdentifier, + ) + - + #TODO: handle case where directories are registered multiple times class JobTestDDA(JobBase): """Tests a directory for read / write accesss """ - _fcp_auto_remove_ = False - + def __init__(self, fcpClient, directory, read=True, write=True): if not os.path.isdir(directory): raise ValueError('No such directory: %r' % directory) @@ -631,7 +669,7 @@ WantWriteDirectory=fcpBool(write), ) JobBase.__init__(self, fcpClient, directory, message) - self.fcpTmpFile = None + self.jobTmpFile = None def handleTestDDAReply(self, msg): @@ -644,14 +682,14 @@ if os.path.isfile(fpathWrite): os.remove(fpathWrite) else: - self.fcpTmpFile = fpathWrite + self.jobTmpFile = fpathWrite if fpathRead is not None: readContent = saveReadFile(fpathRead) if readContent is None: readContent = '' - self.fcpClient.sendMessage( + self.jobClient.sendMessage( Message.TestDDAResponse, Directory=msg['Directory'], ReadContent=readContent, @@ -660,8 +698,8 @@ def handleStop(self, flagError, msg): JobBase.handleStop(self, flagError, msg) - saveRemoveFile(self.fcpTmpFile) - self.fcpTmpFile = None + saveRemoveFile(self.jobTmpFile) + self.jobTmpFile = None #************************************************************************** # fcp client @@ -682,6 +720,7 @@ JobStop = 'Stopping job: ' JobsCompleted = 'All jobs completed' + KeyboardInterrupt = 'Keyboard interrupt' SocketDead = 'Socket is dead' @@ -710,10 +749,10 @@ self._isConnected = False self._jobs = { - 'all': {}, - 'pending': [], # ??? - 'running': [], - 'complete': [], # ??? + #TODO: check if JobList is still required + 'JobMapping': {}, + 'JobList': [], + 'RegisteredDirectories': [], } self._errorHandler = errorHandler self._log = logging.getLogger(name) @@ -734,7 +773,7 @@ self._socket = None - #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call + #TODO: an iterator would be nice to enshure Guis stay responsitive in the call def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): """Establishes the connection to a freenet node @param host: (str) host of th node @@ -767,7 +806,7 @@ # as expected the socket simply breaks. So take it over. job = JobClientHello(self) self.jobAdd(job, synchron=True) - error, msg = job.fcpResult + error, msg = job.jobResult assert not error, 'Error should have been caught by handleMessage()' return msg @@ -782,9 +821,13 @@ def handleMessage(self, msg): - """Handles the next message from the freenet node - @param msg: Message() to handle + """Handles a message from the freenet node + @param msg: (Message) to handle + @return: True if the message was handled, False otherwise """ + + + #if msg.name != 'USocketTimeOut': self._log.debug(self._logMessages.MessageReceived + msg.pprint()) if msg.name == Message.NodeHello: @@ -803,30 +846,57 @@ #TODO: inform caller raise ProtocolError(msg) else: - self.jobStop(identifier, msg, flagError=True) + return self.jobStop(identifier, msg, flagError=True) - elif msg.name == Message.Peer: - self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg) + if msg.name == Message.DataFound: + return self.jobStop(msg['Identifier'], msg, flagError=True) elif msg.name == Message.EndListPeers: - self.jobStop(FixedJobIdentifiers.ListPeers, msg) + return self.jobStop(FixedJobIdentifiers.ListPeers, msg) elif msg.name == Message.GetFailed: - self.jobStop(msg['Identifier'], msg, flagError=True) + return self.jobStop(msg['Identifier'], msg, flagError=True) + elif msg.name == Message.PersistentRequestRemoved: + return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg) + elif msg.name == Message.SimpleProgress: - self.jobNotify(msg['Identifier'], 'handleProgress', msg) + return self.jobNotify(msg['Identifier'], 'handleProgress', msg) + elif msg.name == Message.TestDDAComplete: + return self.jobStop(msg['Directory'], msg) + elif msg.name == Message.TestDDAReply: - self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) + return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) - elif msg.name == Message.TestDDAComplete: - self.jobStop(msg['Directory'], msg) + elif msg.name == Message.IdentifierCollision: + # identifier is unique to us, but for some reason not unique to the node + #TODO: maybe for future implementations find a way to automise + # reinsertion and a notification via a handleJobChanged() for the + # caller to overwrite + return self.jobStop(msg['Identifier'], msg, flagError=True) - elif msg.name == Message.IdentifierCollision: - pass + elif msg.name == Message.Peer: + return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg) + + return False + ######################################################### + ## jobs + ######################################################### + def hasJobsRunning(self): + """Checks if the client has running jobs + @return: (bool) True if so, False otherwise + """ + self._lock.acquire(True) + try: + result = bool(self._jobs['JobList']) + finally: + self._lock.release() + return result + + def jobAdd(self, job, synchron=False): """Adds a job to the client @param job: (Job*) job to add @@ -834,35 +904,62 @@ """ self._lock.acquire(True) try: - if job.fcpIdentifier in self._jobs['all']: - raise ValueError('Duplicate job: %r' % job.identifier) - self._jobs['all'][job.fcpIdentifier] = job - self._jobs['running'].append(job) + if job.jobIdentifier in self._jobs['JobMapping']: + raise ValueError('Duplicate job: %r' % job.jobIdentifier) + self._jobs['JobMapping'][job.jobIdentifier] = job + self._jobs['JobList'].append(job) finally: self._lock.release() - self._log.info(self._logMessages.JobStart + job.fcpMessage.name) + self._log.info(self._logMessages.JobStart + job.jobMessage.name) job.handleStart() if synchron: - while job.fcpResult is None: + while job.jobResult is None: self.next() + def jobGet(self, identifier): + """Returns a job given its identifier + @param identifier: identifier of the job + @return: (Job*) instance or None, if no corrosponding job was found + """ + self._lock.acquire(True) + try: + result = self._jobs['JobMapping'].get(identifier, None) + finally: + self._lock.release() + return result + + def jobIsRunning(self, identifier): + """Checks if a job is running + @param identifier: identifier of the job + @return: True if so, False otherwise + """ + self._lock.acquire(True) + try: + result = identifier in self._jobs['JobMapping'] + finally: + self._lock.release() + return result + + def jobNotify(self, identifier, handler, msg): """Notifies a job about an event while it is running @param identifier: identifier of the job to notify @param handler: (str) method of the job to call to handle the notification @param msg: Message() to pass to the job + @return: True if a job was found to be notified, False otherwise """ self._lock.acquire(True) try: - job = self._jobs['all'].get(identifier, None) + job = self._jobs['JobMapping'].get(identifier, None) finally: self._lock.release() if job is None: - raise ValueError('No such job: %r' % identifier) + return False getattr(job, handler)(msg) + return True #TODO: quite unclear when to remove a job @@ -871,25 +968,23 @@ @param identifier: identifier of the job to stop @param msg: Message() to pass to the job as result @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse + @return: True if a job was found to be stopped, False otherwise """ self._lock.acquire(True) try: - job = self._jobs['all'].get(identifier, None) + job = self._jobs['JobMapping'].get(identifier, None) if job is not None: - self._jobs['running'].remove(job) - if job._fcp_auto_remove_: - del self._jobs['all'][identifier] - else: - self._jobs['complete'].append(job) + self._jobs['JobList'].remove(job) + del self._jobs['JobMapping'][identifier] finally: self._lock.release() - if job is None: - raise ValueError('No such job: %r' % identifier) - self._log.info(self._logMessages.JobStop + job.fcpMessage.name) + return False + self._log.info(self._logMessages.JobStop + job.jobMessage.name) job.handleStop(flagError, msg) + return True + - #TODO: some info when all jobs are completed def next(self): """Pumps the next message waiting @@ -963,21 +1058,8 @@ """Runs the client untill all jobs passed to it are completed @note: use KeyboardInterrupt to stop prematurely """ - - # TODO: - # x. push pending jobs try: - while True: - if not self._lock.acquire(False): - continue - - try: - if not self._jobs['pending'] and not self._jobs['running']: - self._log.info(self._logMessages.JobsCompleted) - break - finally: - self._lock.release() - + while self.hasJobsRunning(): self.next() except KeyboardInterrupt: self._log(self._logMessages.KeyboardInterrupt) @@ -1026,6 +1108,16 @@ """""" self._log.setLevel(verbosity) + + ######################################################## + ## + ######################################################## + def getFileInfo(self, job): + pass + + def getFile(self, job): + pass + #***************************************************************************** # #***************************************************************************** @@ -1039,8 +1131,7 @@ c.run() print '---------------------------' - print job1.fcpError - print job1.fcpResult + print job1.jobResult print '---------------------------' # should raise #foo() @@ -1052,7 +1143,7 @@ c.jobAdd(job2) c.run() print '---------------------------' - print job2.fcpResult + print job2.jobResult print '---------------------------' #foo() @@ -1061,7 +1152,7 @@ c.jobAdd(job2) c.run() print '---------------------------' - print job2.fcpResult + print job2.jobResult print '---------------------------' #foo() @@ -1071,6 +1162,6 @@ c.jobAdd(job2) c.run() print '---------------------------' - print job2.fcpResult + print job2.jobResult print '---------------------------' #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-20 13:37:47
|
Revision: 9 http://fclient.svn.sourceforge.net/fclient/?rev=9&view=rev Author: jUrner Date: 2007-10-20 06:37:51 -0700 (Sat, 20 Oct 2007) Log Message: ----------- bit of refactoring Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 10:02:28 UTC (rev 8) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 13:37:51 UTC (rev 9) @@ -22,79 +22,14 @@ except: pass SocketTimeout = 0.1 -class JobIdentifiers: +class FixedJobIdentifiers: """Fixed job identifiers @note: he client can only handle one job of these at a time """ ClientHello = 'ClientHello' ListPeers = 'ListPeers' -class Messages: - """All messages supported by the client""" - - # client messages - ClientHello = 'ClientHello' - ListPeer = 'ListPeer' # (since 1045) - ListPeers = 'ListPeers' - ListPeerNotes = 'ListPeerNotes' - AddPeer = 'AddPeer' - ModifyPeer = 'ModifyPeer' - ModifyPeerNote = 'ModifyPeerNote' - RemovePeer = 'RemovePeer' - GetNode = 'GetNode' - GetConfig = 'GetConfig' # (since 1027) - ModifyConfig = 'ModifyConfig' # (since 1027) - TestDDARequest = 'TestDDARequest' # (since 1027) - TestDDAResponse = 'TestDDAResponse' # (since 1027) - GenerateSSK = 'GenerateSSK' - ClientPut = 'ClientPut' - ClientPutDiskDir = 'ClientPutDiskDir' - ClientPutComplexDir = 'ClientPutComplexDir' - ClientGet = 'ClientGet' - SubscribeUSK = 'SubscribeUSK' - WatchGlobal = 'WatchGlobal' - GetRequestStatus = 'GetRequestStatus' - ListPersistentRequests = 'ListPersistentRequests' - RemovePersistentRequest = 'RemovePersistentRequest' - ModifyPersistentRequest = 'ModifyPersistentRequest' - Shutdown = 'Shutdown' - - # node messages - NodeHello = 'NodeHello' - CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName' - Peer = 'Peer' - PeerNote = 'PeerNote' - EndListPeers = 'EndListPeers' - EndListPeerNotes = 'EndListPeerNotes' - PeerRemoved = 'PeerRemoved' - NodeData = 'NodeData' - ConfigData = 'ConfigData' # (since 1027) - TestDDAReply = 'TestDDAReply' # (since 1027) - TestDDAComplete = 'TestDDAComplete' # (since 1027) - SSKKeypair = 'SSKKeypair' - PersistentGet = 'PersistentGet' - PersistentPut = 'PersistentPut' - PersistentPutDir = 'PersistentPutDir' - URIGenerated = 'URIGenerated' - PutSuccessful = 'PutSuccessful' - PutFetchable = 'PutFetchable' - DataFound = 'DataFound' - AllData = 'AllData' - StartedCompression = 'StartedCompression' - FinishedCompression = 'FinishedCompression' - SimpleProgress = 'SimpleProgress' - EndListPersistentRequests = 'EndListPersistentRequests' - PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016) - PersistentRequestModified = 'PersistentRequestModified' # (since 1016) - PutFailed = 'PutFailed' - GetFailed = 'GetFailed' - ProtocolError = 'ProtocolError' - IdentifierCollision = 'IdentifierCollision' - UnknownNodeIdentifier = 'UnknownNodeIdentifier' - UnknownPeerNoteType = 'UnknownPeerNoteType' - SubscribedUSKUpdate = 'SubscribedUSKUpdate' - class Priorities: """All priorities supported by the client""" @@ -109,12 +44,23 @@ PriorityMin = Minimum PriorityDefault = Bulk - -# errors - -class FetchErrors: +#************************************************************************************ +# exceptions +#************************************************************************************ +class FetchError(Exception): """All fetch errors supported by the client""" + def __init__(self, msg): + """ + @param msg: (Message) GetFailed message + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + def __str__(self): return self.value + MaxArchiveRecursionExceeded = '1' UnknownSplitfileMetadata = '2' UnknownMetadata = '3' @@ -145,9 +91,20 @@ NotAllDataFound = '28' -class InsertErrors: +class InsertError(Exception): """All insert errors supported by the client""" + def __init__(self, msg): + """ + @param msg: (Message) PutFailed message + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + def __str__(self): return self.value + InvalidUri = '1' BucketError = '2' InternalError = '3' @@ -160,9 +117,20 @@ Canceled = '10' -class ProtocolErrors: +class ProtocolError(Exception): """All protocol errors supported by the client""" + def __init__(self, msg): + """ + @param msg: (Message) ProtocolError message + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + def __str__(self): return self.value + ClientHelloMustBeFirst = '1' NoLateClientHellos = '2' MessageParseError = '3' @@ -195,15 +163,35 @@ OpennetDisabled = '30' DarknetOnly = '31' +class SocketError(Exception): pass #********************************************************************** # functions #********************************************************************** +def fcpBool(pythonBool): + """Converts a python bool to a fcp bool + @param pythonBool: (bool) + @return: (str) 'true' or 'false' + """ + if pythonBool: + return 'true' + return 'false' + + def newIdentifier(): """Returns a new unique identifier @return: (str) uuid """ return str(uuid.uuid4()) + +def pythonBool(fcpBool): + """Converts a fcp bool to a python bool + @param pythonBool: 'true' or 'false' + @return: (bool) True or False + """ + return fcpBool == 'true' + + def saveReadFile(fpath): """Reads contents of a file in the savest manner possible @param fpath: file to write @@ -267,42 +255,9 @@ stdout, stderr = p.communicate() return stdout - -def fcpBool(pythonBool): - """Converts a python bool to a fcp bool - @param pythonBool: (bool) - @return: (str) 'true' or 'false' - """ - if pythonBool: - return 'true' - return 'false' - -def pythonBool(fcpBool): - """Converts a fcp bool to a python bool - @param pythonBool: 'true' or 'false' - @return: (bool) True or False - """ - return fcpBool == 'true' - #********************************************************************** # classes #********************************************************************** -class FcpSocketError(Exception): pass -class FcpProtocolError(Exception): - def __init__(self, msg): - """ - @param msg: (Message) ProtocolError message - """ - self.value = '%s (%s, %s)' % ( - msg.get('CodeDescription', 'Unknown error') , - msg['Code'], - msg.get('ExtraDescription', '...'), - ) - - def __str__(self): return self.value - - - class FcpUri(object): """Wrapper class for freenet uris""" @@ -397,8 +352,68 @@ class Message(object): """Class wrapping a freenet message""" - Name = 'UMessage' + # client messages + ClientHello = 'ClientHello' + ListPeer = 'ListPeer' # (since 1045) + ListPeers = 'ListPeers' + ListPeerNotes = 'ListPeerNotes' + AddPeer = 'AddPeer' + ModifyPeer = 'ModifyPeer' + ModifyPeerNote = 'ModifyPeerNote' + RemovePeer = 'RemovePeer' + GetNode = 'GetNode' + GetConfig = 'GetConfig' # (since 1027) + ModifyConfig = 'ModifyConfig' # (since 1027) + TestDDARequest = 'TestDDARequest' # (since 1027) + TestDDAResponse = 'TestDDAResponse' # (since 1027) + GenerateSSK = 'GenerateSSK' + ClientPut = 'ClientPut' + ClientPutDiskDir = 'ClientPutDiskDir' + ClientPutComplexDir = 'ClientPutComplexDir' + ClientGet = 'ClientGet' + SubscribeUSK = 'SubscribeUSK' + WatchGlobal = 'WatchGlobal' + GetRequestStatus = 'GetRequestStatus' + ListPersistentRequests = 'ListPersistentRequests' + RemovePersistentRequest = 'RemovePersistentRequest' + ModifyPersistentRequest = 'ModifyPersistentRequest' + Shutdown = 'Shutdown' + # node messages + NodeHello = 'NodeHello' + CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName' + Peer = 'Peer' + PeerNote = 'PeerNote' + EndListPeers = 'EndListPeers' + EndListPeerNotes = 'EndListPeerNotes' + PeerRemoved = 'PeerRemoved' + NodeData = 'NodeData' + ConfigData = 'ConfigData' # (since 1027) + TestDDAReply = 'TestDDAReply' # (since 1027) + TestDDAComplete = 'TestDDAComplete' # (since 1027) + SSKKeypair = 'SSKKeypair' + PersistentGet = 'PersistentGet' + PersistentPut = 'PersistentPut' + PersistentPutDir = 'PersistentPutDir' + URIGenerated = 'URIGenerated' + PutSuccessful = 'PutSuccessful' + PutFetchable = 'PutFetchable' + DataFound = 'DataFound' + AllData = 'AllData' + StartedCompression = 'StartedCompression' + FinishedCompression = 'FinishedCompression' + SimpleProgress = 'SimpleProgress' + EndListPersistentRequests = 'EndListPersistentRequests' + PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016) + PersistentRequestModified = 'PersistentRequestModified' # (since 1016) + PutFailed = 'PutFailed' + GetFailed = 'GetFailed' + ProtocolError = 'ProtocolError' + IdentifierCollision = 'IdentifierCollision' + UnknownNodeIdentifier = 'UnknownNodeIdentifier' + UnknownPeerNoteType = 'UnknownPeerNoteType' + SubscribedUSKUpdate = 'SubscribedUSKUpdate' + def __init__(self, name, data=None, **params): """ @@ -410,29 +425,25 @@ self.data = data self.name = name self.params = params - - + def toString(self): """Returns a string with the formated message, ready to be send""" - # TODO: "Data" not yet implemented out = [self.name, ] for param, value in self.params.items(): out.append('%s=%s' % (param, value)) out.append('EndMessage\n') return '\n'.join(out) - - + def pprint(self): """Returns the message as nicely formated human readable string""" - - out = [self.name, ] + out = ['', '>>' + self.name, ] for param, value in self.params.items(): - out.append(' %s=%s' % (param, value)) - out.append('EndMessage') + out.append('>> %s=%s' % (param, value)) + out.append('>>EndMessage') out.append('') return '\n'.join(out) - + def __getitem__(self, name): """Returns the message parameter 'name' """ return self.params[name] @@ -444,14 +455,13 @@ def __setitem__(self, name, value): """Sets the message parameter 'name' to 'value' """ self.params[name] = value - + + class MessageSocketTimeout(Message): - Name = 'USocketTimeout' - def __init__(self): - Message.__init__(self, self.Name) + Message.__init__(self, 'USocketTimeOut') @@ -474,47 +484,29 @@ @ivar fcpIdentifier: identifier of the job @ivar fcpMessage: initial message send to the node @ivar fcpResult: if no error was encountered, holding the result of the job when complete - @ivar fcpTime: when the job is complete, holding the time the job took to complete + @ivar fcpTimeStart: time the job was started + @ivar fcpTimeStop: time the job was stopped """ - self.fcpClient = fcpClient # FcpClient() instance the job belongs to - self.fcpError = None # last error (either this is set or dcpResult) - self.fcpIdentifier = identifier # + self.fcpIdentifier = identifier # job identifier self.fcpMessage = message # message send to node - self.fcpResult = None # job result + self.fcpResult = None # job result (bool error, Message msg) self.fcpTimeStart = 0 # time the job was started self.fcpTimeStop = 0 # time the job was stopped - self.fcpStopped = False - - def displayName(self): - """Returns the display name of the job - @return: (str) display name - """ - return 'JobBase' - - def start(self): + + def handleStart(self): """Starts the job""" - self.fcpStopped = False + self.fcpResult = None self.fcpTimeStart = time.time() self.fcpClient.sendMessageEx(self.fcpMessage) - - def error(self, msg): - """Called on job completion if an error was encounterd while runnng the job - @param msg: (Message) to pass to the job - """ - self.fcpStopped = True - self.fcpTimeStop = time.time() - self.fcpError = msg - self.fcpResult = None - - def stop(self, msg): + + def handleStop(self, flagError, msg): """Called on job completion to stop the job + @param flagError: True if an error was encountered, False otherwise @param msg: (Message) to pass to the job """ - self.fcpStopped = True self.fcpTimeStop = time.time() - self.fcpError = None - self.fcpResult = msg + self.fcpResult = (flagError, msg) class JobClientHello(JobBase): @@ -532,11 +524,11 @@ @param expectedVersion: (str) node version expected """ message = Message( - Messages.ClientHello, + Message.ClientHello, Name=name if name is not None else newIdentifier(), ExpectedVersion=expectedVersion, ) - JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) + JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ClientHello, message) def displayName(self): return 'ClientHello' @@ -550,16 +542,13 @@ def __init__(self, fcpClient, withMetaData=False, withVolantile=False): message = Message( - Messages.ListPeers, + Message.ListPeers, WithMetadata=fcpBool(withMetaData), WithVolatile=fcpBool(withVolantile), ) - JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) + JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) - def displayName(self): - return 'ListPeers' - def handlePeer(self,msg): """Handles the next peer send by the node in form of a 'Peer' message while the job is running. Overwrite to process. @@ -592,7 +581,7 @@ """ identifier = newIdentifier() message = Message( - Messages.ClientGet, + Message.ClientGet, Identifier=identifier, URI=uri, MaxSize='0', @@ -602,37 +591,25 @@ ) JobBase.__init__(self, fcpClient, identifier, message) - - def displayName(self): - return 'GetFileInfo' - + def handleProgress(self, msg): """Handles the next progress made. Overwrite to process. """ - - - def error(self, msg): - JobBase.error(self, msg) - if msg.name == Messages.GetFailed: - if msg['Code'] == FetchErrors.TooBig: - self.fcpError = None - self.fcpResult = ( - msg.get('ExpectedMetadata.ContentType', ''), - msg.get('ExpectedDataLength', '') - ) - #else: - # raise ValueError('Unhandled message: %s' % msg.name) - - def stop(self, msg): - JobBase.stop(self, msg) - if msg.name == Messages.DataFound: + def handleStop(self, flagError, msg): + JobBase.handleStop(self,flagError, msg) + if msg.name == Message.DataFound: self.fcpResult = ( msg.get('Metadata.ContentType', ''), msg.get('DataLength', '') ) - else: - raise ValueError('Unhandled message: %s' % msg.name) + elif msg.name == Message.GetFailed: + if msg['Code'] == FetchError.TooBig: + self.fcpResult = (False, msg) + self.fcpResult = ( + msg.get('ExpectedMetadata.ContentType', ''), + msg.get('ExpectedDataLength', '') + ) @@ -648,7 +625,7 @@ raise ValueError('No such directory: %r' % directory) message = Message( - Messages.TestDDARequest, + Message.TestDDARequest, Directory=directory, WantReadDirectory=fcpBool(read), WantWriteDirectory=fcpBool(write), @@ -656,9 +633,7 @@ JobBase.__init__(self, fcpClient, directory, message) self.fcpTmpFile = None - def displayName(self): - return 'TestDDA' - + def handleTestDDAReply(self, msg): fpathWrite = msg.params.get('WriteFilename', None) fpathRead = msg.params.get('ReadFilename', None) @@ -677,22 +652,16 @@ readContent = '' self.fcpClient.sendMessage( - Messages.TestDDAResponse, + Message.TestDDAResponse, Directory=msg['Directory'], ReadContent=readContent, ) - def error(self, msg): - JobBase.error(self, msg) + def handleStop(self, flagError, msg): + JobBase.handleStop(self, flagError, msg) saveRemoveFile(self.fcpTmpFile) self.fcpTmpFile = None - - - def stop(self, msg): - JobBase.stop(self, msg) - saveRemoveFile(self.fcpTmpFile) - self.fcpTmpFile = None #************************************************************************** # fcp client @@ -706,8 +675,8 @@ ClientClose = 'Closing client' - MessageSend = 'Message send' - MessageReceived = 'Message received' + MessageSend = 'SendMessage' + MessageReceived = 'ReceivedMessage' JobStart = 'Starting job: ' JobStop = 'Stopping job: ' @@ -733,7 +702,7 @@ """ @param name: name of the client instance or '' (for debugging) @param errorHandler: will be called if the socket conncetion to the node is dead - with two params: FcpSocketError + details. When the handler is called the client + with two params: SocketError + details. When the handler is called the client is already closed. @param verbosity: verbosity level for debugging @param logMessages: LogMessages class containing messages @@ -747,7 +716,7 @@ 'complete': [], # ??? } self._errorHandler = errorHandler - self._log = logging.getLogger(NameClient + ':' + name) + self._log = logging.getLogger(name) self._logMessages = logMessages self._lock = thread.allocate_lock() self._socket = None @@ -792,10 +761,15 @@ pass else: self._log.info(self._logMessages.Connected) + + #NOTE: thought I could leave ClientHelloing up to the caller + # but instad of responding with ClientHelloMustBeFirst + # as expected the socket simply breaks. So take it over. job = JobClientHello(self) self.jobAdd(job, synchron=True) - assert job.fcpError is None, 'Error should have been caught by handleMessage()' - return job.fcpResult + error, msg = job.fcpResult + assert not error, 'Error should have been caught by handleMessage()' + return msg self._log.info(self._logMessages.ConnectionRetry) @@ -811,45 +785,45 @@ """Handles the next message from the freenet node @param msg: Message() to handle """ - self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) + self._log.debug(self._logMessages.MessageReceived + msg.pprint()) - if msg.name == Messages.NodeHello: + if msg.name == Message.NodeHello: #connectionIdentifier = msg['ConnectionIdentifier'] - self.jobStop(JobIdentifiers.ClientHello, msg) + self.jobStop(FixedJobIdentifiers.ClientHello, msg) - elif msg.name == Messages.ProtocolError: + elif msg.name == Message.ProtocolError: code = msg['Code'] - #if code == ProtocolErrors.NoLateClientHellos: - # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) - #elif code == ProtocolErrors.ClientHelloMustBeFirst: - # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + #if code == ProtocolError.NoLateClientHellos: + # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True) + #elif code == ProtocolError.ClientHelloMustBeFirst: + # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True) #else: identifier = msg.get('Identifier', None) if identifier is None: #TODO: inform caller - raise FcpProtocolError(msg) + raise ProtocolError(msg) else: - self.jobStop(identifier, msg, error=True) + self.jobStop(identifier, msg, flagError=True) - elif msg.name == Messages.Peer: - self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg) + elif msg.name == Message.Peer: + self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg) - elif msg.name == Messages.EndListPeers: - self.jobStop(IdentifierListPeers, msg) + elif msg.name == Message.EndListPeers: + self.jobStop(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Messages.GetFailed: - self.jobStop(msg['Identifier'], msg, error=True) + elif msg.name == Message.GetFailed: + self.jobStop(msg['Identifier'], msg, flagError=True) - elif msg.name == Messages.SimpleProgress: + elif msg.name == Message.SimpleProgress: self.jobNotify(msg['Identifier'], 'handleProgress', msg) - elif msg.name == Messages.TestDDAReply: + elif msg.name == Message.TestDDAReply: self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) - elif msg.name == Messages.TestDDAComplete: + elif msg.name == Message.TestDDAComplete: self.jobStop(msg['Directory'], msg) - elif msg.name == Messages.IdentifierCollision: + elif msg.name == Message.IdentifierCollision: pass @@ -867,10 +841,10 @@ finally: self._lock.release() - self._log.info(self._logMessages.JobStart + job.displayName()) - job.start() + self._log.info(self._logMessages.JobStart + job.fcpMessage.name) + job.handleStart() if synchron: - while not job.fcpStopped: + while job.fcpResult is None: self.next() @@ -892,11 +866,11 @@ #TODO: quite unclear when to remove a job - def jobStop(self, identifier, msg, error=False): + def jobStop(self, identifier, msg, flagError=False): """Stops a job @param identifier: identifier of the job to stop @param msg: Message() to pass to the job as result - @param error: set to True to indicate unsuccessful completion of the job, True otherwisse + @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse """ self._lock.acquire(True) try: @@ -912,11 +886,8 @@ if job is None: raise ValueError('No such job: %r' % identifier) - self._log.info(self._logMessages.JobStop + job.displayName()) - if error: - job.error(msg) - else: - job.stop(msg) + self._log.info(self._logMessages.JobStop + job.fcpMessage.name) + job.handleStop(flagError, msg) #TODO: some info when all jobs are completed @@ -931,7 +902,7 @@ def readMessage(self): """Reads the next message directly from the socket and dispatches it @return: (Message) the next message read from the socket - @raise FcpSocketError: if the socket connection to the node dies unexpectedly + @raise SocketError: if the socket connection to the node dies unexpectedly If an error handler is passed to the client it is called emidiately before the error is raised. """ @@ -949,8 +920,8 @@ self._log.info(self._logMessages.SocketDead) self.close() if self._errorHandler is not None: - self._errorHandler(FcpSocketError, d) - raise FcpSocketError(d) #!! + self._errorHandler(SocketError, d) + raise SocketError(d) #!! if p == '\r': # ignore continue @@ -975,8 +946,8 @@ self._log.info(self._logMessages.SocketDead) self.close() if self._errorHandler is not None: - self._errorHandler(FcpSocketError, d) - raise FcpSocketError(d) #!! + self._errorHandler(SocketError, d) + raise SocketError(d) #!! else: head, sep, tail = line.partition('=') @@ -1019,7 +990,7 @@ @param data: data to atatch to the message @param params: {para-name: param-calue, ...} of parameters to pass along with the message (see freenet protocol) - @raise FcpSocketError: if the socket connection to the node dies unexpectedly + @raise SocketError: if the socket connection to the node dies unexpectedly If an error handler is passed to the client it is called emidiately before the error is raised. """ @@ -1030,20 +1001,19 @@ """Sends a message to freenet @param msg: (Message) message to send @return: Message - @raise FcpSocketError: if the socket connection to the node dies unexpectedly. + @raise SocketError: if the socket connection to the node dies unexpectedly. If an error handler is passed to the client it is called emidiately before the error is raised. """ - rawMsg = msg.toString() - self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint()) + self._log.debug(self._logMessages.MessageSend + msg.pprint()) try: - self._socket.sendall(rawMsg) - except Exception, d: + self._socket.sendall(msg.toString()) + except socket.error, d: self._log.info(self._logMessages.SocketDead) self.close() if self._errorHandler is not None: - self._errorHandler(FcpSocketError, d) - raise FcpSocketError(d) + self._errorHandler(SocketError, d) + raise SocketError(d) return msg @@ -1061,7 +1031,8 @@ #***************************************************************************** if __name__ == '__main__': c = FcpClient(name='test', verbosity=logging.DEBUG) - if c.connect(): + nodeHello = c.connect() + if nodeHello is not None: def foo(): job1 = JobClientHello(c) c.jobAdd(job1) @@ -1070,8 +1041,8 @@ print '---------------------------' print job1.fcpError print job1.fcpResult - print job1.fcpTime print '---------------------------' + # should raise #foo() @@ -1081,18 +1052,25 @@ c.jobAdd(job2) c.run() print '---------------------------' - print job2.fcpError print job2.fcpResult - print job2.fcpTime print '---------------------------' - + #foo() + def foo(): + job2 = JobListPeers(c) + c.jobAdd(job2) + c.run() + print '---------------------------' + print job2.fcpResult + print '---------------------------' + #foo() + + + def foo(): job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') c.jobAdd(job2) c.run() print '---------------------------' - print job2.fcpError print job2.fcpResult - print job2.fcpTime print '---------------------------' #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-20 10:02:24
|
Revision: 8 http://fclient.svn.sourceforge.net/fclient/?rev=8&view=rev Author: jUrner Date: 2007-10-20 03:02:28 -0700 (Sat, 20 Oct 2007) Log Message: ----------- renamed to be able to map to fcp version numbers to modules Added Paths: ----------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Removed Paths: ------------- trunk/fclient/fclient_lib/fcp/fcp20.py Deleted: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:00:20 UTC (rev 7) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:02:28 UTC (rev 8) @@ -1,1098 +0,0 @@ - -import atexit -import logging -import os -import re -import socket -import subprocess -import sys -import time -import thread -import uuid - -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) -#************************************************************** -# consts -#************************************************************** -NameClient = 'Fcp20Client' -DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() -DefaultFcpPort = 9481 -try: - DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) -except: pass -SocketTimeout = 0.1 - -class JobIdentifiers: - """Fixed job identifiers - @note: he client can only handle one job of these at a time - """ - ClientHello = 'ClientHello' - ListPeers = 'ListPeers' - -class Messages: - """All messages supported by the client""" - - # client messages - ClientHello = 'ClientHello' - ListPeer = 'ListPeer' # (since 1045) - ListPeers = 'ListPeers' - ListPeerNotes = 'ListPeerNotes' - AddPeer = 'AddPeer' - ModifyPeer = 'ModifyPeer' - ModifyPeerNote = 'ModifyPeerNote' - RemovePeer = 'RemovePeer' - GetNode = 'GetNode' - GetConfig = 'GetConfig' # (since 1027) - ModifyConfig = 'ModifyConfig' # (since 1027) - TestDDARequest = 'TestDDARequest' # (since 1027) - TestDDAResponse = 'TestDDAResponse' # (since 1027) - GenerateSSK = 'GenerateSSK' - ClientPut = 'ClientPut' - ClientPutDiskDir = 'ClientPutDiskDir' - ClientPutComplexDir = 'ClientPutComplexDir' - ClientGet = 'ClientGet' - SubscribeUSK = 'SubscribeUSK' - WatchGlobal = 'WatchGlobal' - GetRequestStatus = 'GetRequestStatus' - ListPersistentRequests = 'ListPersistentRequests' - RemovePersistentRequest = 'RemovePersistentRequest' - ModifyPersistentRequest = 'ModifyPersistentRequest' - Shutdown = 'Shutdown' - - # node messages - NodeHello = 'NodeHello' - CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName' - Peer = 'Peer' - PeerNote = 'PeerNote' - EndListPeers = 'EndListPeers' - EndListPeerNotes = 'EndListPeerNotes' - PeerRemoved = 'PeerRemoved' - NodeData = 'NodeData' - ConfigData = 'ConfigData' # (since 1027) - TestDDAReply = 'TestDDAReply' # (since 1027) - TestDDAComplete = 'TestDDAComplete' # (since 1027) - SSKKeypair = 'SSKKeypair' - PersistentGet = 'PersistentGet' - PersistentPut = 'PersistentPut' - PersistentPutDir = 'PersistentPutDir' - URIGenerated = 'URIGenerated' - PutSuccessful = 'PutSuccessful' - PutFetchable = 'PutFetchable' - DataFound = 'DataFound' - AllData = 'AllData' - StartedCompression = 'StartedCompression' - FinishedCompression = 'FinishedCompression' - SimpleProgress = 'SimpleProgress' - EndListPersistentRequests = 'EndListPersistentRequests' - PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016) - PersistentRequestModified = 'PersistentRequestModified' # (since 1016) - PutFailed = 'PutFailed' - GetFailed = 'GetFailed' - ProtocolError = 'ProtocolError' - IdentifierCollision = 'IdentifierCollision' - UnknownNodeIdentifier = 'UnknownNodeIdentifier' - UnknownPeerNoteType = 'UnknownPeerNoteType' - SubscribedUSKUpdate = 'SubscribedUSKUpdate' - - -class Priorities: - """All priorities supported by the client""" - - Maximum = 0 - Interactive = 1 - SemiInteractive = 2 - Updatable = 3 - Bulk = 4 - Prefetch = 5 - Minimum = 6 - - PriorityMin = Minimum - PriorityDefault = Bulk - - -# errors - -class FetchErrors: - """All fetch errors supported by the client""" - - MaxArchiveRecursionExceeded = '1' - UnknownSplitfileMetadata = '2' - UnknownMetadata = '3' - InvalidMetadata = '4' - ArchiveFailure = '5' - BlockDecodeError = '6' - MaxMetadataLevelsExceeded = '7' - MaxArchiveRestartsExceeded = '8' - MaxRecursionLevelExceeded = '9' - NotAnArchve = '10' - TooManyMetastrings = '11' - BucketError = '12' - DataNotFound = '13' - RouteNotFound = '14' - RejectedOverload = '15' - TooManyRedirects = '16' - InternalError = '17' - TransferFailed = '18' - SplitfileError = '19' - InvalidUri = '20' - TooBig = '21' - MetadataTooBig = '22' - TooManyBlocks = '23' - NotEnoughMetastrings = '24' - Canceled = '25' - ArchiveRestart = '26' - PermanentRedirect = '27' - NotAllDataFound = '28' - - -class InsertErrors: - """All insert errors supported by the client""" - - InvalidUri = '1' - BucketError = '2' - InternalError = '3' - RejectedOverload = '4' - RouteNotFound = '5' - FatalErrorInBlocks = '6' - TooManyRetriesInBlock = '7' - RouteReallyNotFound = '8' - Collision = '9' - Canceled = '10' - - -class ProtocolErrors: - """All protocol errors supported by the client""" - - ClientHelloMustBeFirst = '1' - NoLateClientHellos = '2' - MessageParseError = '3' - UriParseError = '4' - MissingField = '5' - ErrorParsingNumber = '6' - InvalidMessage = '7' - InvalidField = '8' - FileNotFound = '9' - DiskTargetExists = '10' - SameDirectoryExpected = '11' - CouldNotCreateFile = '12' - CouldNotWriteFile = '13' - CouldNotRenameFile = '14' - NoSuchIdentifier = '15' - NotSupported = '16' - InternalError = '17' - ShuttingDown = '18' - NoSuchNodeIdentifier = '19' # Unused since 995 - UrlParseError = '20' - ReferenceParseError = '21' - FileParseError = '22' - NotAFile = '23' - AccessDenied = '24' - DDADenied = '25' - CouldNotReadFile = '26' - ReferenceSignature = '27' - CanNotPeerWithSelf = '28' - PeerExists = '29' - OpennetDisabled = '30' - DarknetOnly = '31' - -#********************************************************************** -# functions -#********************************************************************** -def newIdentifier(): - """Returns a new unique identifier - @return: (str) uuid - """ - return str(uuid.uuid4()) - -def saveReadFile(fpath): - """Reads contents of a file in the savest manner possible - @param fpath: file to write - @return: contents of the file if successful, None otherwise - """ - read = None - try: - fp = open(fpath, 'rb') - except: pass - else: - try: - read = fp.read() - except: pass - fp.close() - return read - -def saveRemoveFile(fpath): - """Savely removes a file - @param fpath: filepath of the file to remove or None - @return: True if the file was removed, False otherwise - """ - if fpath is not None: - if os.path.isfile(fpath): - os.remove(fpath) - return True - return False - - -def saveWriteFile(fpath, data): - """Writes data to a file i the savest manner possible - @param fpath: file to write - @param data: data to write to file - @return: True if successful, False otherwise - """ - written = False - try: - fp = open(fpath, 'wb') - except: pass - else: - try: - fp.write(data) - written = True - except: - fp.Close() - else: - fp.close() - return written - -def startFreenet(cmdline): - """Starts freenet - @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat') - @return: (string) whatever freenet returns - """ - #TODO: on windows it may be necessary to hide the comand window - p = subprocess.Popen( - args=cmdline, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - stdout, stderr = p.communicate() - return stdout - - -def fcpBool(pythonBool): - """Converts a python bool to a fcp bool - @param pythonBool: (bool) - @return: (str) 'true' or 'false' - """ - if pythonBool: - return 'true' - return 'false' - -def pythonBool(fcpBool): - """Converts a fcp bool to a python bool - @param pythonBool: 'true' or 'false' - @return: (bool) True or False - """ - return fcpBool == 'true' - -#********************************************************************** -# classes -#********************************************************************** -class FcpSocketError(Exception): pass -class FcpProtocolError(Exception): - def __init__(self, msg): - """ - @param msg: (Message) ProtocolError message - """ - self.value = '%s (%s, %s)' % ( - msg.get('CodeDescription', 'Unknown error') , - msg['Code'], - msg.get('ExtraDescription', '...'), - ) - - def __str__(self): return self.value - - - -class FcpUri(object): - """Wrapper class for freenet uris""" - - - KeySSK = 'SSK@' - KeyKSK = 'KSK@' - KeyCHK = 'CHK@' - KeyUSK = 'USK@' - KeySVK = 'SVK@' - KeyUnknown = '' - KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK) - - ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I) - ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I) - - - def __init__(self, uri): - """ - @param uri: uri to wrap - @param cvar ReUriPattern: pattern matching a freenet uri - @param cvar ReKeyPattern: pattern matching the key type of a freenet uri - - @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible - - - >>> uri = FcpUri('freenet:SSK@foo/bar') - >>> str(uri) - 'SSK@foo/bar' - >>> uri.keyType() == FcpUri.KeySSK - True - >>> uri.split() - ('SSK@foo', 'bar') - >>> uri.fileName() - 'bar' - - >>> uri = FcpUri('http://SSK@foo/bar') - >>> str(uri) - 'SSK@foo/bar' - - # uris not containing freenet keys are left unchanged - >>> uri = FcpUri('http://foo/bar') - >>> str(uri) - 'http://foo/bar' - >>> uri.keyType() == FcpUri.KeyUnknown - True - >>> uri.split() - ('http://foo/bar', '') - >>> uri.fileName() - 'http://foo/bar' - - """ - self._uri = uri - - result = self.ReUriPattern.search(uri) - if result is not None: - self._uri = result.group(0) - - def __str__(self): - return str(self._uri) - - def __unicode__(self): - return unicode(self._uri) - - def keyType(self): - """Retuns the key type of the uri - @return: one of the Key* consts - """ - result = self.ReKeyPattern.search(self._uri) - if result is not None: - return result.group(0).upper() - return self.KeyUnknown - - def split(self): - """Splits the uri - @return: tuple(freenet-key, file-name) - """ - if self.keyType() != self.KeyUnknown: - head, sep, tail = self._uri.partition('/') - return head, tail - return self._uri, '' - - def fileName(self): - """Returns the filename part of the uri - @return: str - """ - head, tail = self.split() - if tail: - return tail - return self._uri - - -class Message(object): - """Class wrapping a freenet message""" - - Name = 'UMessage' - - - def __init__(self, name, data=None, **params): - """ - @param name: messge name - @param data: data associated to the messge (not yet implemented) - @param params: {field-name: value, ...} of parameters of the message - @note: all params can be accessed as attributes of the class - """ - self.data = data - self.name = name - self.params = params - - - def toString(self): - """Returns a string with the formated message, ready to be send""" - - # TODO: "Data" not yet implemented - out = [self.name, ] - for param, value in self.params.items(): - out.append('%s=%s' % (param, value)) - out.append('EndMessage\n') - return '\n'.join(out) - - - def pprint(self): - """Returns the message as nicely formated human readable string""" - - out = [self.name, ] - for param, value in self.params.items(): - out.append(' %s=%s' % (param, value)) - out.append('EndMessage') - out.append('') - return '\n'.join(out) - - def __getitem__(self, name): - """Returns the message parameter 'name' """ - return self.params[name] - - def get(self, name, default=None): - """Returns the message parameter 'name' or 'default' """ - return self.params.get(name, default) - - def __setitem__(self, name, value): - """Sets the message parameter 'name' to 'value' """ - self.params[name] = value - - -class MessageSocketTimeout(Message): - - Name = 'USocketTimeout' - - def __init__(self): - Message.__init__(self, self.Name) - - - -#************************************************************************** -# jobs -#************************************************************************** -class JobBase(object): - """Base class for jobs""" - - _fcp_auto_remove_ = True - - def __init__(self, fcpClient, identifier, message): - """ - @param fcpClient: FcpClient() instance - @param identifier: (str) identifier of the job - @param message: (Message) to send to the node whne the job ist started - @ivar fcpClient: FcpClient() instance - @ivar fcpError: holding the error message if an error was encountered while running - the job, None otherwise - @ivar fcpIdentifier: identifier of the job - @ivar fcpMessage: initial message send to the node - @ivar fcpResult: if no error was encountered, holding the result of the job when complete - @ivar fcpTime: when the job is complete, holding the time the job took to complete - """ - - self.fcpClient = fcpClient # FcpClient() instance the job belongs to - self.fcpError = None # last error (either this is set or dcpResult) - self.fcpIdentifier = identifier # - self.fcpMessage = message # message send to node - self.fcpResult = None # job result - self.fcpTimeStart = 0 # time the job was started - self.fcpTimeStop = 0 # time the job was stopped - self.fcpStopped = False - - def displayName(self): - """Returns the display name of the job - @return: (str) display name - """ - return 'JobBase' - - def start(self): - """Starts the job""" - self.fcpStopped = False - self.fcpTimeStart = time.time() - self.fcpClient.sendMessageEx(self.fcpMessage) - - def error(self, msg): - """Called on job completion if an error was encounterd while runnng the job - @param msg: (Message) to pass to the job - """ - self.fcpStopped = True - self.fcpTimeStop = time.time() - self.fcpError = msg - self.fcpResult = None - - def stop(self, msg): - """Called on job completion to stop the job - @param msg: (Message) to pass to the job - """ - self.fcpStopped = True - self.fcpTimeStop = time.time() - self.fcpError = None - self.fcpResult = msg - - -class JobClientHello(JobBase): - """Sed a ClientHello message to the node - - @note: this must be the first message passed to the node. If everything - goes well, you will get a NodeHello in response. - """ - - _fcp_auto_remove_ = True - - def __init__(self, fcpClient, name=None, expectedVersion='2.0'): - """ - @param name: (str) connection name or None, to use an arbitrary name - @param expectedVersion: (str) node version expected - """ - message = Message( - Messages.ClientHello, - Name=name if name is not None else newIdentifier(), - ExpectedVersion=expectedVersion, - ) - JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) - - def displayName(self): - return 'ClientHello' - - -class JobListPeers(JobBase): - """Lists all known peers of the node - """ - - _fcp_auto_remove_ = True - - def __init__(self, fcpClient, withMetaData=False, withVolantile=False): - message = Message( - Messages.ListPeers, - WithMetadata=fcpBool(withMetaData), - WithVolatile=fcpBool(withVolantile), - ) - JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) - - - def displayName(self): - return 'ListPeers' - - def handlePeer(self,msg): - """Handles the next peer send by the node in form of a 'Peer' message - while the job is running. Overwrite to process. - """ - - - -class JobGetFileInfo(JobBase): - """Tries to retieve information about a file. If everything goes well - - On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file. - Note, that both members may be '' (empty string) - - """ - - _fcp_auto_remove_ = False - - - # idea is to provoke a GetFailed message and take mimetype and size from it - def __init__(self, fcpClient, uri, **params): - """ - @param fcpClient: FcpClient() instance - @param uri: uri of the file to retrieve info for - @param params: additional parameters: - IgnoreDS='true' / 'false' - DSOnly='true' / 'false' - MaxRetries=-1 ...N - PriorityClass=Priority* - - """ - identifier = newIdentifier() - message = Message( - Messages.ClientGet, - Identifier=identifier, - URI=uri, - MaxSize='0', - ReturnType='none', - Verbosity='1', - **params - ) - JobBase.__init__(self, fcpClient, identifier, message) - - - def displayName(self): - return 'GetFileInfo' - - def handleProgress(self, msg): - """Handles the next progress made. Overwrite to process. - """ - - - def error(self, msg): - JobBase.error(self, msg) - if msg.name == Messages.GetFailed: - if msg['Code'] == FetchErrors.TooBig: - self.fcpError = None - self.fcpResult = ( - msg.get('ExpectedMetadata.ContentType', ''), - msg.get('ExpectedDataLength', '') - ) - #else: - # raise ValueError('Unhandled message: %s' % msg.name) - - - def stop(self, msg): - JobBase.stop(self, msg) - if msg.name == Messages.DataFound: - self.fcpResult = ( - msg.get('Metadata.ContentType', ''), - msg.get('DataLength', '') - ) - else: - raise ValueError('Unhandled message: %s' % msg.name) - - - -#TODO: handle case where directories are registered multiple times -class JobTestDDA(JobBase): - """Tests a directory for read / write accesss - """ - - _fcp_auto_remove_ = False - - def __init__(self, fcpClient, directory, read=True, write=True): - if not os.path.isdir(directory): - raise ValueError('No such directory: %r' % directory) - - message = Message( - Messages.TestDDARequest, - Directory=directory, - WantReadDirectory=fcpBool(read), - WantWriteDirectory=fcpBool(write), - ) - JobBase.__init__(self, fcpClient, directory, message) - self.fcpTmpFile = None - - def displayName(self): - return 'TestDDA' - - def handleTestDDAReply(self, msg): - fpathWrite = msg.params.get('WriteFilename', None) - fpathRead = msg.params.get('ReadFilename', None) - readContent = '' - if fpathWrite is not None: - written = saveWriteFile(fpathWrite, msg['ContentToWrite']) - if not written: - if os.path.isfile(fpathWrite): - os.remove(fpathWrite) - else: - self.fcpTmpFile = fpathWrite - - if fpathRead is not None: - readContent = saveReadFile(fpathRead) - if readContent is None: - readContent = '' - - self.fcpClient.sendMessage( - Messages.TestDDAResponse, - Directory=msg['Directory'], - ReadContent=readContent, - ) - - - def error(self, msg): - JobBase.error(self, msg) - saveRemoveFile(self.fcpTmpFile) - self.fcpTmpFile = None - - - def stop(self, msg): - JobBase.stop(self, msg) - saveRemoveFile(self.fcpTmpFile) - self.fcpTmpFile = None - -#************************************************************************** -# fcp client -#************************************************************************** -class LogMessages: - """Message strings used for log infos""" - Connecting = 'Connecting to node...' - Connected = 'Connected to node' - ConnectionRetry = 'Connecting to node failed... retrying' - ConnectingFailed = 'Connecting to node failed' - - ClientClose = 'Closing client' - - MessageSend = 'Message send' - MessageReceived = 'Message received' - - JobStart = 'Starting job: ' - JobStop = 'Stopping job: ' - JobsCompleted = 'All jobs completed' - - KeyboardInterrupt = 'Keyboard interrupt' - SocketDead = 'Socket is dead' - - -#TODO: no idea what happens on reconnect if socket died. What about running jobs? -#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. -#TODO: no idea if to add support for pending jobs and queue management here -class FcpClient(object): - """Fcp client implementation""" - - - def __init__(self, - name='', - errorHandler=None, - verbosity=logging.WARNING, - logMessages=LogMessages - ): - """ - @param name: name of the client instance or '' (for debugging) - @param errorHandler: will be called if the socket conncetion to the node is dead - with two params: FcpSocketError + details. When the handler is called the client - is already closed. - @param verbosity: verbosity level for debugging - @param logMessages: LogMessages class containing messages - """ - - self._isConnected = False - self._jobs = { - 'all': {}, - 'pending': [], # ??? - 'running': [], - 'complete': [], # ??? - } - self._errorHandler = errorHandler - self._log = logging.getLogger(NameClient + ':' + name) - self._logMessages = logMessages - self._lock = thread.allocate_lock() - self._socket = None - - self.setVerbosity(verbosity) - atexit.register(self.close) - - def close(self): - """Closes the client - @note: make shure to call close() when done with the client - """ - self._log.info(self._logMessages.ClientClose) - if self._socket is not None: - self._socket.close() - self._socket = None - - - #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call - def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): - """Establishes the connection to a freenet node - @param host: (str) host of th node - @param port: (int) port of the node - @param repeat: (int) how many seconds try to connect before giving up - @param timeout: (int) how much time to wait before another attempt to connect - @return: (Message) NodeHello if successful,None otherwise - """ - self._clientHello = None - self._log.info(self._logMessages.Connecting) - - # poll untill freenet responds - time_elapsed = 0 - while time_elapsed <= repeat: - - # try to Connect socket - if self._socket is not None: - self.close() - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.settimeout(SocketTimeout) - try: - self._socket.connect((host, port)) - except Exception, d: - pass - else: - self._log.info(self._logMessages.Connected) - job = JobClientHello(self) - self.jobAdd(job, synchron=True) - assert job.fcpError is None, 'Error should have been caught by handleMessage()' - return job.fcpResult - - self._log.info(self._logMessages.ConnectionRetry) - - # continue polling - time_elapsed += timeout - time.sleep(timeout) - - self._log.info(self._logMessages.ConnectingFailed) - return None - - - def handleMessage(self, msg): - """Handles the next message from the freenet node - @param msg: Message() to handle - """ - self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) - - if msg.name == Messages.NodeHello: - #connectionIdentifier = msg['ConnectionIdentifier'] - self.jobStop(JobIdentifiers.ClientHello, msg) - - elif msg.name == Messages.ProtocolError: - code = msg['Code'] - #if code == ProtocolErrors.NoLateClientHellos: - # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) - #elif code == ProtocolErrors.ClientHelloMustBeFirst: - # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) - #else: - identifier = msg.get('Identifier', None) - if identifier is None: - #TODO: inform caller - raise FcpProtocolError(msg) - else: - self.jobStop(identifier, msg, error=True) - - elif msg.name == Messages.Peer: - self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg) - - elif msg.name == Messages.EndListPeers: - self.jobStop(IdentifierListPeers, msg) - - elif msg.name == Messages.GetFailed: - self.jobStop(msg['Identifier'], msg, error=True) - - elif msg.name == Messages.SimpleProgress: - self.jobNotify(msg['Identifier'], 'handleProgress', msg) - - elif msg.name == Messages.TestDDAReply: - self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) - - elif msg.name == Messages.TestDDAComplete: - self.jobStop(msg['Directory'], msg) - - elif msg.name == Messages.IdentifierCollision: - pass - - - def jobAdd(self, job, synchron=False): - """Adds a job to the client - @param job: (Job*) job to add - @param synchron: if True, wait untill the job is completed, if False return emidiately - """ - self._lock.acquire(True) - try: - if job.fcpIdentifier in self._jobs['all']: - raise ValueError('Duplicate job: %r' % job.identifier) - self._jobs['all'][job.fcpIdentifier] = job - self._jobs['running'].append(job) - finally: - self._lock.release() - - self._log.info(self._logMessages.JobStart + job.displayName()) - job.start() - if synchron: - while not job.fcpStopped: - self.next() - - - - def jobNotify(self, identifier, handler, msg): - """Notifies a job about an event while it is running - @param identifier: identifier of the job to notify - @param handler: (str) method of the job to call to handle the notification - @param msg: Message() to pass to the job - """ - self._lock.acquire(True) - try: - job = self._jobs['all'].get(identifier, None) - finally: - self._lock.release() - if job is None: - raise ValueError('No such job: %r' % identifier) - getattr(job, handler)(msg) - - - #TODO: quite unclear when to remove a job - def jobStop(self, identifier, msg, error=False): - """Stops a job - @param identifier: identifier of the job to stop - @param msg: Message() to pass to the job as result - @param error: set to True to indicate unsuccessful completion of the job, True otherwisse - """ - self._lock.acquire(True) - try: - job = self._jobs['all'].get(identifier, None) - if job is not None: - self._jobs['running'].remove(job) - if job._fcp_auto_remove_: - del self._jobs['all'][identifier] - else: - self._jobs['complete'].append(job) - finally: - self._lock.release() - - if job is None: - raise ValueError('No such job: %r' % identifier) - self._log.info(self._logMessages.JobStop + job.displayName()) - if error: - job.error(msg) - else: - job.stop(msg) - - - #TODO: some info when all jobs are completed - def next(self): - """Pumps the next message waiting - @note: use this method instead of run() to run the client step by step - """ - msg = self.readMessage() - self.handleMessage(msg) - return msg - - def readMessage(self): - """Reads the next message directly from the socket and dispatches it - @return: (Message) the next message read from the socket - @raise FcpSocketError: if the socket connection to the node dies unexpectedly - If an error handler is passed to the client it is called emidiately before the error - is raised. - """ - msg = Message(None) - buf = [] - while True: - - try: - p = self._socket.recv(1) - if not p: raise ValueError('Socket is dead') - except socket.timeout, d: # no new messages in queue - msg = MessageSocketTimeout() - break - except Exception, d: - self._log.info(self._logMessages.SocketDead) - self.close() - if self._errorHandler is not None: - self._errorHandler(FcpSocketError, d) - raise FcpSocketError(d) #!! - - if p == '\r': # ignore - continue - - if p != '\n': - buf.append(p) - continue - - line = ''.join(buf) - if line in ('End', "EndMessage"): - break - buf = [] - - if msg.name is None: - msg.name = line - elif line == 'Data': - n = int(msg.params['DataLength']) - try: - msg.data = self._socket.recv(n) - if not msg.data: raise ValueError('Socket is dead') - except Exception, d: - self._log.info(self._logMessages.SocketDead) - self.close() - if self._errorHandler is not None: - self._errorHandler(FcpSocketError, d) - raise FcpSocketError(d) #!! - - else: - head, sep, tail = line.partition('=') - msg.params[head] = tail - if not sep: - # TODO: chek for invalid messages or not - pass - - return msg - - - def run(self): - """Runs the client untill all jobs passed to it are completed - @note: use KeyboardInterrupt to stop prematurely - """ - - # TODO: - # x. push pending jobs - try: - while True: - if not self._lock.acquire(False): - continue - - try: - if not self._jobs['pending'] and not self._jobs['running']: - self._log.info(self._logMessages.JobsCompleted) - break - finally: - self._lock.release() - - self.next() - except KeyboardInterrupt: - self._log(self._logMessages.KeyboardInterrupt) - self.close() - - - def sendMessage(self, name, data=None, **params): - """Sends a message to freenet - @param name: name of the message to send - @param data: data to atatch to the message - @param params: {para-name: param-calue, ...} of parameters to pass along - with the message (see freenet protocol) - @raise FcpSocketError: if the socket connection to the node dies unexpectedly - If an error handler is passed to the client it is called emidiately before the error - is raised. - """ - return self.sendMessageEx(Message(name, data=data, **params)) - - - def sendMessageEx(self, msg): - """Sends a message to freenet - @param msg: (Message) message to send - @return: Message - @raise FcpSocketError: if the socket connection to the node dies unexpectedly. - If an error handler is passed to the client it is called emidiately before the error - is raised. - """ - rawMsg = msg.toString() - self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint()) - try: - self._socket.sendall(rawMsg) - except Exception, d: - self._log.info(self._logMessages.SocketDead) - self.close() - if self._errorHandler is not None: - self._errorHandler(FcpSocketError, d) - raise FcpSocketError(d) - return msg - - - def setLogMessages(self, logMessages): - """""" - self._logMessages = logMessages - - - def setVerbosity(self, verbosity): - """""" - self._log.setLevel(verbosity) - -#***************************************************************************** -# -#***************************************************************************** -if __name__ == '__main__': - c = FcpClient(name='test', verbosity=logging.DEBUG) - if c.connect(): - def foo(): - job1 = JobClientHello(c) - c.jobAdd(job1) - - c.run() - print '---------------------------' - print job1.fcpError - print job1.fcpResult - print job1.fcpTime - print '---------------------------' - #foo() - - - def foo(): - d = os.path.dirname(os.path.abspath(__file__)) - job2 = JobTestDDA(c, d) - c.jobAdd(job2) - c.run() - print '---------------------------' - print job2.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' - - def foo(): - job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') - c.jobAdd(job2) - c.run() - print '---------------------------' - print job2.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' - #foo() Copied: trunk/fclient/fclient_lib/fcp/fcp2_0.py (from rev 7, trunk/fclient/fclient_lib/fcp/fcp20.py) =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py (rev 0) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 10:02:28 UTC (rev 8) @@ -0,0 +1,1098 @@ + +import atexit +import logging +import os +import re +import socket +import subprocess +import sys +import time +import thread +import uuid + +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) +#************************************************************** +# consts +#************************************************************** +NameClient = 'Fcp20Client' +DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() +DefaultFcpPort = 9481 +try: + DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) +except: pass +SocketTimeout = 0.1 + +class JobIdentifiers: + """Fixed job identifiers + @note: he client can only handle one job of these at a time + """ + ClientHello = 'ClientHello' + ListPeers = 'ListPeers' + +class Messages: + """All messages supported by the client""" + + # client messages + ClientHello = 'ClientHello' + ListPeer = 'ListPeer' # (since 1045) + ListPeers = 'ListPeers' + ListPeerNotes = 'ListPeerNotes' + AddPeer = 'AddPeer' + ModifyPeer = 'ModifyPeer' + ModifyPeerNote = 'ModifyPeerNote' + RemovePeer = 'RemovePeer' + GetNode = 'GetNode' + GetConfig = 'GetConfig' # (since 1027) + ModifyConfig = 'ModifyConfig' # (since 1027) + TestDDARequest = 'TestDDARequest' # (since 1027) + TestDDAResponse = 'TestDDAResponse' # (since 1027) + GenerateSSK = 'GenerateSSK' + ClientPut = 'ClientPut' + ClientPutDiskDir = 'ClientPutDiskDir' + ClientPutComplexDir = 'ClientPutComplexDir' + ClientGet = 'ClientGet' + SubscribeUSK = 'SubscribeUSK' + WatchGlobal = 'WatchGlobal' + GetRequestStatus = 'GetRequestStatus' + ListPersistentRequests = 'ListPersistentRequests' + RemovePersistentRequest = 'RemovePersistentRequest' + ModifyPersistentRequest = 'ModifyPersistentRequest' + Shutdown = 'Shutdown' + + # node messages + NodeHello = 'NodeHello' + CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName' + Peer = 'Peer' + PeerNote = 'PeerNote' + EndListPeers = 'EndListPeers' + EndListPeerNotes = 'EndListPeerNotes' + PeerRemoved = 'PeerRemoved' + NodeData = 'NodeData' + ConfigData = 'ConfigData' # (since 1027) + TestDDAReply = 'TestDDAReply' # (since 1027) + TestDDAComplete = 'TestDDAComplete' # (since 1027) + SSKKeypair = 'SSKKeypair' + PersistentGet = 'PersistentGet' + PersistentPut = 'PersistentPut' + PersistentPutDir = 'PersistentPutDir' + URIGenerated = 'URIGenerated' + PutSuccessful = 'PutSuccessful' + PutFetchable = 'PutFetchable' + DataFound = 'DataFound' + AllData = 'AllData' + StartedCompression = 'StartedCompression' + FinishedCompression = 'FinishedCompression' + SimpleProgress = 'SimpleProgress' + EndListPersistentRequests = 'EndListPersistentRequests' + PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016) + PersistentRequestModified = 'PersistentRequestModified' # (since 1016) + PutFailed = 'PutFailed' + GetFailed = 'GetFailed' + ProtocolError = 'ProtocolError' + IdentifierCollision = 'IdentifierCollision' + UnknownNodeIdentifier = 'UnknownNodeIdentifier' + UnknownPeerNoteType = 'UnknownPeerNoteType' + SubscribedUSKUpdate = 'SubscribedUSKUpdate' + + +class Priorities: + """All priorities supported by the client""" + + Maximum = 0 + Interactive = 1 + SemiInteractive = 2 + Updatable = 3 + Bulk = 4 + Prefetch = 5 + Minimum = 6 + + PriorityMin = Minimum + PriorityDefault = Bulk + + +# errors + +class FetchErrors: + """All fetch errors supported by the client""" + + MaxArchiveRecursionExceeded = '1' + UnknownSplitfileMetadata = '2' + UnknownMetadata = '3' + InvalidMetadata = '4' + ArchiveFailure = '5' + BlockDecodeError = '6' + MaxMetadataLevelsExceeded = '7' + MaxArchiveRestartsExceeded = '8' + MaxRecursionLevelExceeded = '9' + NotAnArchve = '10' + TooManyMetastrings = '11' + BucketError = '12' + DataNotFound = '13' + RouteNotFound = '14' + RejectedOverload = '15' + TooManyRedirects = '16' + InternalError = '17' + TransferFailed = '18' + SplitfileError = '19' + InvalidUri = '20' + TooBig = '21' + MetadataTooBig = '22' + TooManyBlocks = '23' + NotEnoughMetastrings = '24' + Canceled = '25' + ArchiveRestart = '26' + PermanentRedirect = '27' + NotAllDataFound = '28' + + +class InsertErrors: + """All insert errors supported by the client""" + + InvalidUri = '1' + BucketError = '2' + InternalError = '3' + RejectedOverload = '4' + RouteNotFound = '5' + FatalErrorInBlocks = '6' + TooManyRetriesInBlock = '7' + RouteReallyNotFound = '8' + Collision = '9' + Canceled = '10' + + +class ProtocolErrors: + """All protocol errors supported by the client""" + + ClientHelloMustBeFirst = '1' + NoLateClientHellos = '2' + MessageParseError = '3' + UriParseError = '4' + MissingField = '5' + ErrorParsingNumber = '6' + InvalidMessage = '7' + InvalidField = '8' + FileNotFound = '9' + DiskTargetExists = '10' + SameDirectoryExpected = '11' + CouldNotCreateFile = '12' + CouldNotWriteFile = '13' + CouldNotRenameFile = '14' + NoSuchIdentifier = '15' + NotSupported = '16' + InternalError = '17' + ShuttingDown = '18' + NoSuchNodeIdentifier = '19' # Unused since 995 + UrlParseError = '20' + ReferenceParseError = '21' + FileParseError = '22' + NotAFile = '23' + AccessDenied = '24' + DDADenied = '25' + CouldNotReadFile = '26' + ReferenceSignature = '27' + CanNotPeerWithSelf = '28' + PeerExists = '29' + OpennetDisabled = '30' + DarknetOnly = '31' + +#********************************************************************** +# functions +#********************************************************************** +def newIdentifier(): + """Returns a new unique identifier + @return: (str) uuid + """ + return str(uuid.uuid4()) + +def saveReadFile(fpath): + """Reads contents of a file in the savest manner possible + @param fpath: file to write + @return: contents of the file if successful, None otherwise + """ + read = None + try: + fp = open(fpath, 'rb') + except: pass + else: + try: + read = fp.read() + except: pass + fp.close() + return read + +def saveRemoveFile(fpath): + """Savely removes a file + @param fpath: filepath of the file to remove or None + @return: True if the file was removed, False otherwise + """ + if fpath is not None: + if os.path.isfile(fpath): + os.remove(fpath) + return True + return False + + +def saveWriteFile(fpath, data): + """Writes data to a file i the savest manner possible + @param fpath: file to write + @param data: data to write to file + @return: True if successful, False otherwise + """ + written = False + try: + fp = open(fpath, 'wb') + except: pass + else: + try: + fp.write(data) + written = True + except: + fp.Close() + else: + fp.close() + return written + +def startFreenet(cmdline): + """Starts freenet + @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat') + @return: (string) whatever freenet returns + """ + #TODO: on windows it may be necessary to hide the comand window + p = subprocess.Popen( + args=cmdline, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = p.communicate() + return stdout + + +def fcpBool(pythonBool): + """Converts a python bool to a fcp bool + @param pythonBool: (bool) + @return: (str) 'true' or 'false' + """ + if pythonBool: + return 'true' + return 'false' + +def pythonBool(fcpBool): + """Converts a fcp bool to a python bool + @param pythonBool: 'true' or 'false' + @return: (bool) True or False + """ + return fcpBool == 'true' + +#********************************************************************** +# classes +#********************************************************************** +class FcpSocketError(Exception): pass +class FcpProtocolError(Exception): + def __init__(self, msg): + """ + @param msg: (Message) ProtocolError message + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + + def __str__(self): return self.value + + + +class FcpUri(object): + """Wrapper class for freenet uris""" + + + KeySSK = 'SSK@' + KeyKSK = 'KSK@' + KeyCHK = 'CHK@' + KeyUSK = 'USK@' + KeySVK = 'SVK@' + KeyUnknown = '' + KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK) + + ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I) + ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I) + + + def __init__(self, uri): + """ + @param uri: uri to wrap + @param cvar ReUriPattern: pattern matching a freenet uri + @param cvar ReKeyPattern: pattern matching the key type of a freenet uri + + @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible + + + >>> uri = FcpUri('freenet:SSK@foo/bar') + >>> str(uri) + 'SSK@foo/bar' + >>> uri.keyType() == FcpUri.KeySSK + True + >>> uri.split() + ('SSK@foo', 'bar') + >>> uri.fileName() + 'bar' + + >>> uri = FcpUri('http://SSK@foo/bar') + >>> str(uri) + 'SSK@foo/bar' + + # uris not containing freenet keys are left unchanged + >>> uri = FcpUri('http://foo/bar') + >>> str(uri) + 'http://foo/bar' + >>> uri.keyType() == FcpUri.KeyUnknown + True + >>> uri.split() + ('http://foo/bar', '') + >>> uri.fileName() + 'http://foo/bar' + + """ + self._uri = uri + + result = self.ReUriPattern.search(uri) + if result is not None: + self._uri = result.group(0) + + def __str__(self): + return str(self._uri) + + def __unicode__(self): + return unicode(self._uri) + + def keyType(self): + """Retuns the key type of the uri + @return: one of the Key* consts + """ + result = self.ReKeyPattern.search(self._uri) + if result is not None: + return result.group(0).upper() + return self.KeyUnknown + + def split(self): + """Splits the uri + @return: tuple(freenet-key, file-name) + """ + if self.keyType() != self.KeyUnknown: + head, sep, tail = self._uri.partition('/') + return head, tail + return self._uri, '' + + def fileName(self): + """Returns the filename part of the uri + @return: str + """ + head, tail = self.split() + if tail: + return tail + return self._uri + + +class Message(object): + """Class wrapping a freenet message""" + + Name = 'UMessage' + + + def __init__(self, name, data=None, **params): + """ + @param name: messge name + @param data: data associated to the messge (not yet implemented) + @param params: {field-name: value, ...} of parameters of the message + @note: all params can be accessed as attributes of the class + """ + self.data = data + self.name = name + self.params = params + + + def toString(self): + """Returns a string with the formated message, ready to be send""" + + # TODO: "Data" not yet implemented + out = [self.name, ] + for param, value in self.params.items(): + out.append('%s=%s' % (param, value)) + out.append('EndMessage\n') + return '\n'.join(out) + + + def pprint(self): + """Returns the message as nicely formated human readable string""" + + out = [self.name, ] + for param, value in self.params.items(): + out.append(' %s=%s' % (param, value)) + out.append('EndMessage') + out.append('') + return '\n'.join(out) + + def __getitem__(self, name): + """Returns the message parameter 'name' """ + return self.params[name] + + def get(self, name, default=None): + """Returns the message parameter 'name' or 'default' """ + return self.params.get(name, default) + + def __setitem__(self, name, value): + """Sets the message parameter 'name' to 'value' """ + self.params[name] = value + + +class MessageSocketTimeout(Message): + + Name = 'USocketTimeout' + + def __init__(self): + Message.__init__(self, self.Name) + + + +#************************************************************************** +# jobs +#************************************************************************** +class JobBase(object): + """Base class for jobs""" + + _fcp_auto_remove_ = True + + def __init__(self, fcpClient, identifier, message): + """ + @param fcpClient: FcpClient() instance + @param identifier: (str) identifier of the job + @param message: (Message) to send to the node whne the job ist started + @ivar fcpClient: FcpClient() instance + @ivar fcpError: holding the error message if an error was encountered while running + the job, None otherwise + @ivar fcpIdentifier: identifier of the job + @ivar fcpMessage: initial message send to the node + @ivar fcpResult: if no error was encountered, holding the result of the job when complete + @ivar fcpTime: when the job is complete, holding the time the job took to complete + """ + + self.fcpClient = fcpClient # FcpClient() instance the job belongs to + self.fcpError = None # last error (either this is set or dcpResult) + self.fcpIdentifier = identifier # + self.fcpMessage = message # message send to node + self.fcpResult = None # job result + self.fcpTimeStart = 0 # time the job was started + self.fcpTimeStop = 0 # time the job was stopped + self.fcpStopped = False + + def displayName(self): + """Returns the display name of the job + @return: (str) display name + """ + return 'JobBase' + + def start(self): + """Starts the job""" + self.fcpStopped = False + self.fcpTimeStart = time.time() + self.fcpClient.sendMessageEx(self.fcpMessage) + + def error(self, msg): + """Called on job completion if an error was encounterd while runnng the job + @param msg: (Message) to pass to the job + """ + self.fcpStopped = True + self.fcpTimeStop = time.time() + self.fcpError = msg + self.fcpResult = None + + def stop(self, msg): + """Called on job completion to stop the job + @param msg: (Message) to pass to the job + """ + self.fcpStopped = True + self.fcpTimeStop = time.time() + self.fcpError = None + self.fcpResult = msg + + +class JobClientHello(JobBase): + """Sed a ClientHello message to the node + + @note: this must be the first message passed to the node. If everything + goes well, you will get a NodeHello in response. + """ + + _fcp_auto_remove_ = True + + def __init__(self, fcpClient, name=None, expectedVersion='2.0'): + """ + @param name: (str) connection name or None, to use an arbitrary name + @param expectedVersion: (str) node version expected + """ + message = Message( + Messages.ClientHello, + Name=name if name is not None else newIdentifier(), + ExpectedVersion=expectedVersion, + ) + JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) + + def displayName(self): + return 'ClientHello' + + +class JobListPeers(JobBase): + """Lists all known peers of the node + """ + + _fcp_auto_remove_ = True + + def __init__(self, fcpClient, withMetaData=False, withVolantile=False): + message = Message( + Messages.ListPeers, + WithMetadata=fcpBool(withMetaData), + WithVolatile=fcpBool(withVolantile), + ) + JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) + + + def displayName(self): + return 'ListPeers' + + def handlePeer(self,msg): + """Handles the next peer send by the node in form of a 'Peer' message + while the job is running. Overwrite to process. + """ + + + +class JobGetFileInfo(JobBase): + """Tries to retieve information about a file. If everything goes well + + On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file. + Note, that both members may be '' (empty string) + + """ + + _fcp_auto_remove_ = False + + + # idea is to provoke a GetFailed message and take mimetype and size from it + def __init__(self, fcpClient, uri, **params): + """ + @param fcpClient: FcpClient() instance + @param uri: uri of the file to retrieve info for + @param params: additional parameters: + IgnoreDS='true' / 'false' + DSOnly='true' / 'false' + MaxRetries=-1 ...N + PriorityClass=Priority* + + """ + identifier = newIdentifier() + message = Message( + Messages.ClientGet, + Identifier=identifier, + URI=uri, + MaxSize='0', + ReturnType='none', + Verbosity='1', + **params + ) + JobBase.__init__(self, fcpClient, identifier, message) + + + def displayName(self): + return 'GetFileInfo' + + def handleProgress(self, msg): + """Handles the next progress made. Overwrite to process. + """ + + + def error(self, msg): + JobBase.error(self, msg) + if msg.name == Messages.GetFailed: + if msg['Code'] == FetchErrors.TooBig: + self.fcpError = None + self.fcpResult = ( + msg.get('ExpectedMetadata.ContentType', ''), + msg.get('ExpectedDataLength', '') + ) + #else: + # raise ValueError('Unhandled message: %s' % msg.name) + + + def stop(self, msg): + JobBase.stop(self, msg) + if msg.name == Messages.DataFound: + self.fcpResult = ( + msg.get('Metadata.ContentType', ''), + msg.get('DataLength', '') + ) + else: + raise ValueError('Unhandled message: %s' % msg.name) + + + +#TODO: handle case where directories are registered multiple times +class JobTestDDA(JobBase): + """Tests a directory for read / write accesss + """ + + _fcp_auto_remove_ = False + + def __init__(self, fcpClient, directory, read=True, write=True): + if not os.path.isdir(directory): + raise ValueError('No such directory: %r' % directory) + + message = Message( + Messages.TestDDARequest, + Directory=directory, + WantReadDirectory=fcpBool(read), + WantWriteDirectory=fcpBool(write), + ) + JobBase.__init__(self, fcpClient, directory, message) + self.fcpTmpFile = None + + def displayName(self): + return 'TestDDA' + + def handleTestDDAReply(self, msg): + fpathWrite = msg.params.get('WriteFilename', None) + fpathRead = msg.params.get('ReadFilename', None) + readContent = '' + if fpathWrite is not None: + written = saveWriteFile(fpathWrite, msg['ContentToWrite']) + if not written: + if os.path.isfile(fpathWrite): + os.remove(fpathWrite) + else: + self.fcpTmpFile = fpathWrite + + if fpathRead is not None: + readContent = saveReadFile(fpathRead) + if readContent is None: + readContent = '' + + self.fcpClient.sendMessage( + Messages.TestDDAResponse, + Directory=msg['Directory'], + ReadContent=readContent, + ) + + + def error(self, msg): + JobBase.error(self, msg) + saveRemoveFile(self.fcpTmpFile) + self.fcpTmpFile = None + + + def stop(self, msg): + JobBase.stop(self, msg) + saveRemoveFile(self.fcpTmpFile) + self.fcpTmpFile = None + +#************************************************************************** +# fcp client +#************************************************************************** +class LogMessages: + """Message strings used for log infos""" + Connecting = 'Connecting to node...' + Connected = 'Connected to node' + ConnectionRetry = 'Connecting to node failed... retrying' + ConnectingFailed = 'Connecting to node failed' + + ClientClose = 'Closing client' + + MessageSend = 'Message send' + MessageReceived = 'Message received' + + JobStart = 'Starting job: ' + JobStop = 'Stopping job: ' + JobsCompleted = 'All jobs completed' + + KeyboardInterrupt = 'Keyboard interrupt' + SocketDead = 'Socket is dead' + + +#TODO: no idea what happens on reconnect if socket died. What about running jobs? +#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. +#TODO: no idea if to add support for pending jobs and queue management here +class FcpClient(object): + """Fcp client implementation""" + + + def __init__(self, + name='', + errorHandler=None, + verbosity=logging.WARNING, + logMessages=LogMessages + ): + """ + @param name: name of the client instance or '' (for debugging) + @param errorHandler: will be called if the socket conncetion to the node is dead + with two params: FcpSocketError + details. When the handler is called the client + is already closed. + @param verbosity: verbosity level for debugging + @param logMessages: LogMessages class containing messages + """ + + self._isConnected = False + self._jobs = { + 'all': {}, + 'pending': [], # ??? + 'running': [], + 'complete': [], # ??? + } + self._errorHandler = errorHandler + self._log = logging.getLogger(NameClient + ':' + name) + self._logMessages = logMessages + self._lock = thread.allocate_lock() + self._socket = None + + self.setVerbosity(verbosity) + atexit.register(self.close) + + def close(self): + """Closes the client + @note: make shure to call close() when done with the client + """ + self._log.info(self._logMessages.ClientClose) + if self._socket is not None: + self._socket.close() + self._socket = None + + + #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call + def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): + """Establishes the connection to a freenet node + @param host: (str) host of th node + @param port: (int) port of the node + @param repeat: (int) how many seconds try to connect before giving up + @param timeout: (int) how much time to wait before another attempt to connect + @return: (Message) NodeHello if successful,None otherwise + """ + self._clientHello = None + self._log.info(self._logMessages.Connecting) + + # poll untill freenet responds + time_elapsed = 0 + while time_elapsed <= repeat: + + # try to Connect socket + if self._socket is not None: + self.close() + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(SocketTimeout) + try: + self._socket.connect((host, port)) + except Exception, d: + pass + else: + self._log.info(self._logMessages.Connected) + job = JobClientHello(self) + self.jobAdd(job, synchron=True) + assert job.fcpError is None, 'Error should have been caught by handleMessage()' + return job.fcpResult + + self._log.info(self._logMessages.ConnectionRetry) + + # continue polling + time_elapsed += timeout + time.sleep(timeout) + + self._log.info(self._logMessages.ConnectingFailed) + return None + + + def handleMessage(self, msg): + """Handles the next message from the freenet node + @param msg: Message() to handle + """ + self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) + + if msg.name == Messages.NodeHello: + #connectionIdentifier = msg['ConnectionIdentifier'] + self.jobStop(JobIdentifiers.ClientHello, msg) + + elif msg.name == Messages.ProtocolError: + code = msg['Code'] + #if code == ProtocolErrors.NoLateClientHellos: + # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + #elif code == ProtocolErrors.ClientHelloMustBeFirst: + # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + #else: + identifier = msg.get('Identifier', None) + if identifier is None: + #TODO: inform caller + raise FcpProtocolError(msg) + else: + self.jobStop(identifier, msg, error=True) + + elif msg.name == Messages.Peer: + self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg) + + elif msg.name == Messages.EndListPeers: + self.jobStop(IdentifierListPeers, msg) + + elif msg.name == Messages.GetFailed: + self.jobStop(msg['Identifier'], msg, error=True) + + elif msg.name == Messages.SimpleProgress: + self.jobNotify(msg['Identifier'], 'handleProgress', msg) + + elif msg.name == Messages.TestDDAReply: + self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) + + elif msg.name == Messages.TestDDAComplete: + self.jobStop(msg['Directory'], msg) + + elif msg.name == Messages.IdentifierCollision: + pass + + + def jobAdd(self, job, synchron=False): + """Adds a job to the client + @param job: (Job*) job to add + @param synchron: if True, wait untill the job is completed, if False return emidiately + """ + self._lock.acquire(True) + try: + if job.fcpIdentifier in self._jobs['all']: + raise ValueError('Duplicate job: %r' % job.identifier) + self._jobs['all'][job.fcpIdentifier] = job + self._jobs['running'].append(job) + finally: + self._lock.release() + + self._log.info(self._logMessages.JobStart + job.displayName()) + job.start() + if synchron: + while not job.fcpStopped: + self.next() + + + + def jobNotify(self, identifier, handler, msg): + """Notifies a job about an event while it is running + @param identifier: identifier of the job to notify + @param handler: (str) method of the job to call to handle the notification + @param msg: Message() to pass to the job + """ + self._lock.acquire(True) + try: + job = self._jobs['all'].get(identifier, None) + finally: + self._lock.release() + if job is None: + raise ValueError('No such job: %r' % identifier) + getattr(job, handler)(msg) + + + #TODO: quite unclear when to remove a job + def jobStop(self, identifier, msg, error=False): + """Stops a job + @param identifier: identifier of the job to stop + @param msg: Message() to pass to the job as result + @param error: set to True to indicate unsuccessful completion of the job, True otherwisse + """ + self._lock.acquire(True) + try: + job = self._jobs['all'].get(identifier, None) + if job is not None: + self._jobs['running'].remove(job) + if job._fcp_auto_remove_: + del self._jobs['all'][identifier] + else: + self._jobs['complete'].append(job) + finally: + self._lock.release() + + if job is None: + raise ValueError('No such job: %r' % identifier) + self._log.info(self._logMessages.JobStop + job.displayName()) + if error: + job.error(msg) + else: + job.stop(msg) + + + #TODO: some info when all jobs are completed + def next(self): + """Pumps the next message waiting + @note: use this method instead of run() to run the client step by step + """ + msg = self.readMessage() + self.handleMessage(msg) + return msg + + def readMessage(self): + """Reads the next message directly from the socket and dispatches it + @return: (Message) the next message read from the socket + @raise FcpSocketError: if the socket connection to the node dies unexpectedly + If an error handler is passed to the client it is called emidiately before the error + is raised. + """ + msg = Message(None) + buf = [] + while True: + + try: + p = self._socket.recv(1) + if not p: raise ValueError('Socket is dead') + except socket.timeout, d: # no new messages in queue + msg = MessageSocketTimeout() + break + except Exception, d: + self._log.info(self._logMessages.SocketDead) + self.close() + if self._errorHandler is not None: + self._errorHandler(FcpSocketError, d) + raise FcpSocketError(d) #!! + + if p == '\r': # ignore + continue + + if p != '\n': + buf.append(p) + continue + + line = ''.join(buf) + if line in ('End', "EndMessage"): + break + buf = [] + + if msg.name is None: + msg.name = line + elif line == 'Data': + n = int(msg.params['DataLength']) + try: + msg.data = self._socket.recv(n) + if not msg.data: raise ValueError('Socket is dead') + except Exception, d: + self._log.info(self._logMessages.SocketDead) + self.close() + if self._errorHandler is not None: + self._errorHandler(FcpSocketError, d) + raise FcpSocketError(d) #!! + + else: + head, sep, tail = line.partition('=') + msg.params[head] = tail + if not sep: + # TODO: chek for invalid messages or not + pass + + return msg + + + def run(self): + """Runs the client untill all jobs passed to it are completed + @note: use KeyboardInterrupt to stop prematurely + """ + + # TODO: + # x. push pending jobs + try: + while True: + if not self._lock.acquire(False): + continue + + try: + if not self._jobs['pending'] and not self._jobs['running']: + self._log.info(self._logMessages.JobsCompleted) + break + finally: + self._lock.release() + + self.next() + except KeyboardInterrupt: + self._log(self._logMessages.KeyboardInterrupt) + self.close() + + + def sendMessage(self, name, data=None, **params): + """Sends a message to freenet + @param name: name of the message to send + @param data: data to atatch to the message + @param params: {para-name: param-calue, ...} of parameters to pass along + with the message (see freenet protocol) + @raise FcpSocketError: if the socket connection to the node dies unexpectedly + If an error handler is passed to the client it is called emidiately before the error + is raised. + """ + return self.sendMessageEx(Message(name, data=data, **params)) + + + def sendMessageEx(self, msg): + """Sends a message to freenet + @param msg: (Message) message to send + @return: Message + @raise FcpSocketError: if the socket connection to the node dies unexpectedly. + If an error handler is passed to the client it is called emidiately before the error + is raised. + """ + rawMsg = msg.toString() + self._log.debug(self._logMessages.Mes... [truncated message content] |
From: <jU...@us...> - 2007-10-20 10:00:17
|
Revision: 7 http://fclient.svn.sourceforge.net/fclient/?rev=7&view=rev Author: jUrner Date: 2007-10-20 03:00:20 -0700 (Sat, 20 Oct 2007) Log Message: ----------- continued working on fcp protocol implementation Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp20.py Modified: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 17:54:01 UTC (rev 6) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:00:20 UTC (rev 7) @@ -288,6 +288,21 @@ # classes #********************************************************************** class FcpSocketError(Exception): pass +class FcpProtocolError(Exception): + def __init__(self, msg): + """ + @param msg: (Message) ProtocolError message + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + + def __str__(self): return self.value + + + class FcpUri(object): """Wrapper class for freenet uris""" @@ -467,8 +482,10 @@ self.fcpIdentifier = identifier # self.fcpMessage = message # message send to node self.fcpResult = None # job result - self.fcpTime = 0 # start time (will hld duration whern the job is complte) - + self.fcpTimeStart = 0 # time the job was started + self.fcpTimeStop = 0 # time the job was stopped + self.fcpStopped = False + def displayName(self): """Returns the display name of the job @return: (str) display name @@ -477,14 +494,16 @@ def start(self): """Starts the job""" - self.fcpTime = time.time() + self.fcpStopped = False + self.fcpTimeStart = time.time() self.fcpClient.sendMessageEx(self.fcpMessage) def error(self, msg): """Called on job completion if an error was encounterd while runnng the job @param msg: (Message) to pass to the job """ - self.fcpTime = time.time() - self.fcpTime + self.fcpStopped = True + self.fcpTimeStop = time.time() self.fcpError = msg self.fcpResult = None @@ -492,7 +511,8 @@ """Called on job completion to stop the job @param msg: (Message) to pass to the job """ - self.fcpTime = time.time() - self.fcpTime + self.fcpStopped = True + self.fcpTimeStop = time.time() self.fcpError = None self.fcpResult = msg @@ -519,7 +539,7 @@ JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) def displayName(self): - return 'NodeHello' + return 'ClientHello' class JobListPeers(JobBase): @@ -531,8 +551,8 @@ def __init__(self, fcpClient, withMetaData=False, withVolantile=False): message = Message( Messages.ListPeers, - WithMetadata='true' if withMetaData else 'false', - WithVolatile='true' if withVolantile else 'false', + WithMetadata=fcpBool(withMetaData), + WithVolatile=fcpBool(withVolantile), ) JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) @@ -587,8 +607,7 @@ return 'GetFileInfo' def handleProgress(self, msg): - """Handles the next progress made of a 'SimpleProgress' message - while the job is running. Overwrite to process. + """Handles the next progress made. Overwrite to process. """ @@ -625,6 +644,9 @@ _fcp_auto_remove_ = False def __init__(self, fcpClient, directory, read=True, write=True): + if not os.path.isdir(directory): + raise ValueError('No such directory: %r' % directory) + message = Message( Messages.TestDDARequest, Directory=directory, @@ -660,6 +682,7 @@ ReadContent=readContent, ) + def error(self, msg): JobBase.error(self, msg) saveRemoveFile(self.fcpTmpFile) @@ -728,7 +751,7 @@ self._logMessages = logMessages self._lock = thread.allocate_lock() self._socket = None - + self.setVerbosity(verbosity) atexit.register(self.close) @@ -749,8 +772,9 @@ @param port: (int) port of the node @param repeat: (int) how many seconds try to connect before giving up @param timeout: (int) how much time to wait before another attempt to connect - @return: True if successful, False otherwise + @return: (Message) NodeHello if successful,None otherwise """ + self._clientHello = None self._log.info(self._logMessages.Connecting) # poll untill freenet responds @@ -768,7 +792,10 @@ pass else: self._log.info(self._logMessages.Connected) - return True + job = JobClientHello(self) + self.jobAdd(job, synchron=True) + assert job.fcpError is None, 'Error should have been caught by handleMessage()' + return job.fcpResult self._log.info(self._logMessages.ConnectionRetry) @@ -777,7 +804,7 @@ time.sleep(timeout) self._log.info(self._logMessages.ConnectingFailed) - return False + return None def handleMessage(self, msg): @@ -792,15 +819,17 @@ elif msg.name == Messages.ProtocolError: code = msg['Code'] - if code == ProtocolErrors.NoLateClientHellos: - self.jobStop(JobIdentifiers.ClientHello, msg, error=True) - + #if code == ProtocolErrors.NoLateClientHellos: + # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + #elif code == ProtocolErrors.ClientHelloMustBeFirst: + # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + #else: + identifier = msg.get('Identifier', None) + if identifier is None: + #TODO: inform caller + raise FcpProtocolError(msg) else: - identifier = msg.get('Identifier', None) - if identifier is None: - pass # raise ??? - else: - self.jobStop(identifier, msg, error=True) + self.jobStop(identifier, msg, error=True) elif msg.name == Messages.Peer: self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg) @@ -824,9 +853,10 @@ pass - def jobAdd(self, job): + def jobAdd(self, job, synchron=False): """Adds a job to the client @param job: (Job*) job to add + @param synchron: if True, wait untill the job is completed, if False return emidiately """ self._lock.acquire(True) try: @@ -839,6 +869,10 @@ self._log.info(self._logMessages.JobStart + job.displayName()) job.start() + if synchron: + while not job.fcpStopped: + self.next() + def jobNotify(self, identifier, handler, msg): @@ -857,6 +891,7 @@ getattr(job, handler)(msg) + #TODO: quite unclear when to remove a job def jobStop(self, identifier, msg, error=False): """Stops a job @param identifier: identifier of the job to stop @@ -891,6 +926,7 @@ """ msg = self.readMessage() self.handleMessage(msg) + return msg def readMessage(self): """Reads the next message directly from the socket and dispatches it @@ -1026,32 +1062,37 @@ if __name__ == '__main__': c = FcpClient(name='test', verbosity=logging.DEBUG) if c.connect(): - job1 = JobClientHello(c) - c.jobAdd(job1) + def foo(): + job1 = JobClientHello(c) + c.jobAdd(job1) - c.run() - print '---------------------------' - print job1.fcpError - print job1.fcpResult - print job1.fcpTime - print '---------------------------' + c.run() + print '---------------------------' + print job1.fcpError + print job1.fcpResult + print job1.fcpTime + print '---------------------------' + #foo() + + def foo(): + d = os.path.dirname(os.path.abspath(__file__)) + job2 = JobTestDDA(c, d) + c.jobAdd(job2) + c.run() + print '---------------------------' + print job2.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' - job2 = JobTestDDA(c, os.path.dirname(__file__)) - c.jobAdd(job2) - c.run() - print '---------------------------' - print job1.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' - + def foo(): job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') c.jobAdd(job2) c.run() print '---------------------------' - print job1.fcpError + print job2.fcpError print job2.fcpResult print job2.fcpTime print '---------------------------' - + #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ju...@us...> - 2007-10-16 17:54:04
|
Revision: 6 http://fclient.svn.sourceforge.net/fclient/?rev=6&view=rev Author: jurner Date: 2007-10-16 10:54:01 -0700 (Tue, 16 Oct 2007) Log Message: ----------- added a bit of documentation and a class to deal with freenet uris Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp20.py Modified: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 10:19:24 UTC (rev 5) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 17:54:01 UTC (rev 6) @@ -2,6 +2,7 @@ import atexit import logging import os +import re import socket import subprocess import sys @@ -13,6 +14,7 @@ #************************************************************** # consts #************************************************************** +NameClient = 'Fcp20Client' DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() DefaultFcpPort = 9481 try: @@ -21,12 +23,14 @@ SocketTimeout = 0.1 class JobIdentifiers: - # fixed job identifiers - # note that the client can only handle one job of these at a time + """Fixed job identifiers + @note: he client can only handle one job of these at a time + """ ClientHello = 'ClientHello' ListPeers = 'ListPeers' class Messages: + """All messages supported by the client""" # client messages ClientHello = 'ClientHello' @@ -92,6 +96,8 @@ class Priorities: + """All priorities supported by the client""" + Maximum = 0 Interactive = 1 SemiInteractive = 2 @@ -107,6 +113,8 @@ # errors class FetchErrors: + """All fetch errors supported by the client""" + MaxArchiveRecursionExceeded = '1' UnknownSplitfileMetadata = '2' UnknownMetadata = '3' @@ -138,6 +146,8 @@ class InsertErrors: + """All insert errors supported by the client""" + InvalidUri = '1' BucketError = '2' InternalError = '3' @@ -151,6 +161,8 @@ class ProtocolErrors: + """All protocol errors supported by the client""" + ClientHelloMustBeFirst = '1' NoLateClientHellos = '2' MessageParseError = '3' @@ -187,6 +199,9 @@ # functions #********************************************************************** def newIdentifier(): + """Returns a new unique identifier + @return: (str) uuid + """ return str(uuid.uuid4()) def saveReadFile(fpath): @@ -206,6 +221,10 @@ return read def saveRemoveFile(fpath): + """Savely removes a file + @param fpath: filepath of the file to remove or None + @return: True if the file was removed, False otherwise + """ if fpath is not None: if os.path.isfile(fpath): os.remove(fpath) @@ -265,99 +284,101 @@ """ return fcpBool == 'true' - -def uriToList(uri): - """Splits a freenet uri into its components - @param uri: (str) uri to split - @return: (list) or components - @note: additional dexoration like 'freenet:' or 'http://blah' are ignored +#********************************************************************** +# classes +#********************************************************************** +class FcpSocketError(Exception): pass +class FcpUri(object): + """Wrapper class for freenet uris""" - >>> uriToList('SSK@foo') - ['SSK@foo'] - >>> uriToList('SSK@foo/bar/baz') - ['SSK@foo', 'bar', 'baz'] + KeySSK = 'SSK@' + KeyKSK = 'KSK@' + KeyCHK = 'CHK@' + KeyUSK = 'USK@' + KeySVK = 'SVK@' + KeyUnknown = '' + KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK) + + ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I) + ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I) - >>> uriToList('freenet:SSK@foo') - ['SSK@foo'] - >>> uriToList('http://foo/SSK@foo') - ['SSK@foo'] + def __init__(self, uri): + """ + @param uri: uri to wrap + @param cvar ReUriPattern: pattern matching a freenet uri + @param cvar ReKeyPattern: pattern matching the key type of a freenet uri + + @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible + + + >>> uri = FcpUri('freenet:SSK@foo/bar') + >>> str(uri) + 'SSK@foo/bar' + >>> uri.keyType() == FcpUri.KeySSK + True + >>> uri.split() + ('SSK@foo', 'bar') + >>> uri.fileName() + 'bar' + + >>> uri = FcpUri('http://SSK@foo/bar') + >>> str(uri) + 'SSK@foo/bar' + + # uris not containing freenet keys are left unchanged + >>> uri = FcpUri('http://foo/bar') + >>> str(uri) + 'http://foo/bar' + >>> uri.keyType() == FcpUri.KeyUnknown + True + >>> uri.split() + ('http://foo/bar', '') + >>> uri.fileName() + 'http://foo/bar' + + """ + self._uri = uri + + result = self.ReUriPattern.search(uri) + if result is not None: + self._uri = result.group(0) + + def __str__(self): + return str(self._uri) - >>> uriToList('http://foo') - [] + def __unicode__(self): + return unicode(self._uri) - """ - if uri.startswith('freenet:'): - uri = uri[len('freenet:'): ] - - components = [] - head = uri - while head: - head, tail = posixpath.split(head) - components.append(tail) + def keyType(self): + """Retuns the key type of the uri + @return: one of the Key* consts + """ + result = self.ReKeyPattern.search(self._uri) + if result is not None: + return result.group(0).upper() + return self.KeyUnknown - components.reverse() + def split(self): + """Splits the uri + @return: tuple(freenet-key, file-name) + """ + if self.keyType() != self.KeyUnknown: + head, sep, tail = self._uri.partition('/') + return head, tail + return self._uri, '' + + def fileName(self): + """Returns the filename part of the uri + @return: str + """ + head, tail = self.split() + if tail: + return tail + return self._uri - while components: - if components[0][:4] in FcpKeys.KeysAll: - break - else: - components.pop(0) - - return components -def uriKeyType(uri): - pass - - -def isValidUri(uri): - """Checks if an uri is a valid freenet uri - @param uri: (str) uri to check - @reuturn: (bool) - """ - return bool(UriToList(uri)) - - -def splitUri(uri): - """Splits an uri into uri and filename - @param uri: uri to split - @return: tuple(uri, filename) - - >>> splitUri('SSK@foo/bar/baz') - ('SSK@foo', 'bar/baz') - - >>> splitUri('SSK@foo') - ('SSK@foo', '') - - >>> splitUri('NoUri') - ('NoUri', '') - - """ - L = uriToList(uri) - if not L: - return (uri, '') - names = L[1:] - if not names: - name = '' - else: - name = '/'.join(names) - return (L[0], name) - - -def fileNameFromUri(uri): - """Returns the filename part of an uri - @return: (str) filename. If no filename is found the uri is returned unchanged - """ - tmp_uri, name = splitUri(uri) - if name: - return name - return uri - -#********************************************************************** -# classes -#********************************************************************** -class FcpSocketError(Exception): pass class Message(object): """Class wrapping a freenet message""" @@ -417,6 +438,8 @@ def __init__(self): Message.__init__(self, self.Name) + + #************************************************************************** # jobs #************************************************************************** @@ -426,6 +449,18 @@ _fcp_auto_remove_ = True def __init__(self, fcpClient, identifier, message): + """ + @param fcpClient: FcpClient() instance + @param identifier: (str) identifier of the job + @param message: (Message) to send to the node whne the job ist started + @ivar fcpClient: FcpClient() instance + @ivar fcpError: holding the error message if an error was encountered while running + the job, None otherwise + @ivar fcpIdentifier: identifier of the job + @ivar fcpMessage: initial message send to the node + @ivar fcpResult: if no error was encountered, holding the result of the job when complete + @ivar fcpTime: when the job is complete, holding the time the job took to complete + """ self.fcpClient = fcpClient # FcpClient() instance the job belongs to self.fcpError = None # last error (either this is set or dcpResult) @@ -435,31 +470,50 @@ self.fcpTime = 0 # start time (will hld duration whern the job is complte) def displayName(self): + """Returns the display name of the job + @return: (str) display name + """ return 'JobBase' def start(self): + """Starts the job""" self.fcpTime = time.time() self.fcpClient.sendMessageEx(self.fcpMessage) def error(self, msg): + """Called on job completion if an error was encounterd while runnng the job + @param msg: (Message) to pass to the job + """ self.fcpTime = time.time() - self.fcpTime self.fcpError = msg self.fcpResult = None - def stop(self, result): + def stop(self, msg): + """Called on job completion to stop the job + @param msg: (Message) to pass to the job + """ self.fcpTime = time.time() - self.fcpTime self.fcpError = None - self.fcpResult = result + self.fcpResult = msg -class JobNodeHello(JobBase): +class JobClientHello(JobBase): + """Sed a ClientHello message to the node + @note: this must be the first message passed to the node. If everything + goes well, you will get a NodeHello in response. + """ + _fcp_auto_remove_ = True - def __init__(self, fcpClient, expectedVersion='2.0'): + def __init__(self, fcpClient, name=None, expectedVersion='2.0'): + """ + @param name: (str) connection name or None, to use an arbitrary name + @param expectedVersion: (str) node version expected + """ message = Message( Messages.ClientHello, - Name=newIdentifier(), + Name=name if name is not None else newIdentifier(), ExpectedVersion=expectedVersion, ) JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) @@ -469,6 +523,8 @@ class JobListPeers(JobBase): + """Lists all known peers of the node + """ _fcp_auto_remove_ = True @@ -485,13 +541,20 @@ return 'ListPeers' def handlePeer(self,msg): - pass + """Handles the next peer send by the node in form of a 'Peer' message + while the job is running. Overwrite to process. + """ + - class JobGetFileInfo(JobBase): - """""" + """Tries to retieve information about a file. If everything goes well + On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file. + Note, that both members may be '' (empty string) + + """ + _fcp_auto_remove_ = False @@ -524,9 +587,11 @@ return 'GetFileInfo' def handleProgress(self, msg): - pass + """Handles the next progress made of a 'SimpleProgress' message + while the job is running. Overwrite to process. + """ + - def error(self, msg): JobBase.error(self, msg) if msg.name == Messages.GetFailed: @@ -554,7 +619,9 @@ #TODO: handle case where directories are registered multiple times class JobTestDDA(JobBase): - + """Tests a directory for read / write accesss + """ + _fcp_auto_remove_ = False def __init__(self, fcpClient, directory, read=True, write=True): @@ -608,6 +675,7 @@ # fcp client #************************************************************************** class LogMessages: + """Message strings used for log infos""" Connecting = 'Connecting to node...' Connected = 'Connected to node' ConnectionRetry = 'Connecting to node failed... retrying' @@ -628,8 +696,11 @@ #TODO: no idea what happens on reconnect if socket died. What about running jobs? #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. +#TODO: no idea if to add support for pending jobs and queue management here class FcpClient(object): + """Fcp client implementation""" + def __init__(self, name='', errorHandler=None, @@ -648,12 +719,12 @@ self._isConnected = False self._jobs = { 'all': {}, - 'pending': [], + 'pending': [], # ??? 'running': [], - 'complete': [], + 'complete': [], # ??? } - self._errrorHandler = errorHandler - self._log = logging.getLogger('FcpClient20:%s' % name) + self._errorHandler = errorHandler + self._log = logging.getLogger(NameClient + ':' + name) self._logMessages = logMessages self._lock = thread.allocate_lock() self._socket = None @@ -662,6 +733,9 @@ atexit.register(self.close) def close(self): + """Closes the client + @note: make shure to call close() when done with the client + """ self._log.info(self._logMessages.ClientClose) if self._socket is not None: self._socket.close() @@ -670,7 +744,13 @@ #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): - + """Establishes the connection to a freenet node + @param host: (str) host of th node + @param port: (int) port of the node + @param repeat: (int) how many seconds try to connect before giving up + @param timeout: (int) how much time to wait before another attempt to connect + @return: True if successful, False otherwise + """ self._log.info(self._logMessages.Connecting) # poll untill freenet responds @@ -699,8 +779,55 @@ self._log.info(self._logMessages.ConnectingFailed) return False + + def handleMessage(self, msg): + """Handles the next message from the freenet node + @param msg: Message() to handle + """ + self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) + + if msg.name == Messages.NodeHello: + #connectionIdentifier = msg['ConnectionIdentifier'] + self.jobStop(JobIdentifiers.ClientHello, msg) + + elif msg.name == Messages.ProtocolError: + code = msg['Code'] + if code == ProtocolErrors.NoLateClientHellos: + self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + + else: + identifier = msg.get('Identifier', None) + if identifier is None: + pass # raise ??? + else: + self.jobStop(identifier, msg, error=True) + + elif msg.name == Messages.Peer: + self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg) + + elif msg.name == Messages.EndListPeers: + self.jobStop(IdentifierListPeers, msg) + + elif msg.name == Messages.GetFailed: + self.jobStop(msg['Identifier'], msg, error=True) + + elif msg.name == Messages.SimpleProgress: + self.jobNotify(msg['Identifier'], 'handleProgress', msg) + + elif msg.name == Messages.TestDDAReply: + self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) + + elif msg.name == Messages.TestDDAComplete: + self.jobStop(msg['Directory'], msg) + + elif msg.name == Messages.IdentifierCollision: + pass - def addJob(self, job): + + def jobAdd(self, job): + """Adds a job to the client + @param job: (Job*) job to add + """ self._lock.acquire(True) try: if job.fcpIdentifier in self._jobs['all']: @@ -712,12 +839,33 @@ self._log.info(self._logMessages.JobStart + job.displayName()) job.start() - - def stopJob(self, identifier, msg, error=False): + + def jobNotify(self, identifier, handler, msg): + """Notifies a job about an event while it is running + @param identifier: identifier of the job to notify + @param handler: (str) method of the job to call to handle the notification + @param msg: Message() to pass to the job + """ self._lock.acquire(True) try: job = self._jobs['all'].get(identifier, None) + finally: + self._lock.release() + if job is None: + raise ValueError('No such job: %r' % identifier) + getattr(job, handler)(msg) + + + def jobStop(self, identifier, msg, error=False): + """Stops a job + @param identifier: identifier of the job to stop + @param msg: Message() to pass to the job as result + @param error: set to True to indicate unsuccessful completion of the job, True otherwisse + """ + self._lock.acquire(True) + try: + job = self._jobs['all'].get(identifier, None) if job is not None: self._jobs['running'].remove(job) if job._fcp_auto_remove_: @@ -734,92 +882,22 @@ job.error(msg) else: job.stop(msg) - - def notifyJob(self, identifier, handler, msg): - self._lock.acquire(True) - try: - job = self._jobs['all'].get(identifier, None) - finally: - self._lock.release() - if job is None: - raise ValueError('No such job: %r' % identifier) - getattr(job, handler)(msg) - - def run(self): - - # TODO: - # x. push pending jobs - try: - while True: - if not self._lock.acquire(False): - continue - - try: - if not self._jobs['pending'] and not self._jobs['running']: - self._log.info(self._logMessages.JobsCompleted) - break - finally: - self._lock.release() - - self.next() - except KeyboardInterrupt: - self._log(self._logMessages.KeyboardInterrupt) - self.close() - - #TODO: some info when all jobs are completed def next(self): + """Pumps the next message waiting + @note: use this method instead of run() to run the client step by step + """ msg = self.readMessage() self.handleMessage(msg) - - def handleMessage(self, msg): - - self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) - - if msg.name == Messages.NodeHello: - #connectionIdentifier = msg['ConnectionIdentifier'] - self.stopJob(JobIdentifiers.ClientHello, msg) - - elif msg.name == Messages.ProtocolError: - code = msg['Code'] - if code == ProtocolErrors.NoLateClientHellos: - self.stopJob(JobIdentifiers.ClientHello, msg, error=True) - - else: - identifier = msg.get('Identifier', None) - if identifier is None: - pass # raise ??? - else: - self.stopJob(identifier, msg, error=True) - - elif msg.name == Messages.Peer: - self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg) - - elif msg.name == Messages.EndListPeers: - self.stopJob(IdentifierListPeers, msg) - - elif msg.name == Messages.GetFailed: - self.stopJob(msg['Identifier'], msg, error=True) - - elif msg.name == Messages.SimpleProgress: - self.notifyJob(msg['Identifier'], 'handleProgress', msg) - - elif msg.name == Messages.TestDDAReply: - self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg) - - elif msg.name == Messages.TestDDAComplete: - self.stopJob(msg['Directory'], msg) - - elif msg.name == Messages.IdentifierCollision: - pass - - def readMessage(self): """Reads the next message directly from the socket and dispatches it - @return: valid or invalid Message() + @return: (Message) the next message read from the socket + @raise FcpSocketError: if the socket connection to the node dies unexpectedly + If an error handler is passed to the client it is called emidiately before the error + is raised. """ msg = Message(None) buf = [] @@ -872,20 +950,42 @@ pass return msg + + + def run(self): + """Runs the client untill all jobs passed to it are completed + @note: use KeyboardInterrupt to stop prematurely + """ + # TODO: + # x. push pending jobs + try: + while True: + if not self._lock.acquire(False): + continue + + try: + if not self._jobs['pending'] and not self._jobs['running']: + self._log.info(self._logMessages.JobsCompleted) + break + finally: + self._lock.release() + + self.next() + except KeyboardInterrupt: + self._log(self._logMessages.KeyboardInterrupt) + self.close() - def setLogMessages(self, logMessages): - self._logMessages = logMessages - def setVerbosity(self, verbosity): - self._log.setLevel(verbosity) - def sendMessage(self, name, data=None, **params): """Sends a message to freenet @param name: name of the message to send @param data: data to atatch to the message @param params: {para-name: param-calue, ...} of parameters to pass along with the message (see freenet protocol) + @raise FcpSocketError: if the socket connection to the node dies unexpectedly + If an error handler is passed to the client it is called emidiately before the error + is raised. """ return self.sendMessageEx(Message(name, data=data, **params)) @@ -894,6 +994,9 @@ """Sends a message to freenet @param msg: (Message) message to send @return: Message + @raise FcpSocketError: if the socket connection to the node dies unexpectedly. + If an error handler is passed to the client it is called emidiately before the error + is raised. """ rawMsg = msg.toString() self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint()) @@ -907,14 +1010,24 @@ raise FcpSocketError(d) return msg + + def setLogMessages(self, logMessages): + """""" + self._logMessages = logMessages + + + def setVerbosity(self, verbosity): + """""" + self._log.setLevel(verbosity) + #***************************************************************************** # #***************************************************************************** if __name__ == '__main__': c = FcpClient(name='test', verbosity=logging.DEBUG) if c.connect(): - job1 = JobNodeHello(c) - c.addJob(job1) + job1 = JobClientHello(c) + c.jobAdd(job1) c.run() print '---------------------------' @@ -924,21 +1037,21 @@ print '---------------------------' - job2 = JobTestDDA(c, os.path.dirname(__file__)) - c.addJob(job2) - c.run() - print '---------------------------' - print job1.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' - - job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') - c.addJob(job2) - c.run() - print '---------------------------' - print job1.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' + job2 = JobTestDDA(c, os.path.dirname(__file__)) + c.jobAdd(job2) + c.run() + print '---------------------------' + print job1.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' + + job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + c.jobAdd(job2) + c.run() + print '---------------------------' + print job1.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ju...@us...> - 2007-10-16 10:19:40
|
Revision: 5 http://fclient.svn.sourceforge.net/fclient/?rev=5&view=rev Author: jurner Date: 2007-10-16 03:19:24 -0700 (Tue, 16 Oct 2007) Log Message: ----------- continued working on FcpClient Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp20.py Modified: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-15 17:42:24 UTC (rev 4) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 10:19:24 UTC (rev 5) @@ -1,9 +1,15 @@ - + +import atexit +import logging import os import socket +import subprocess +import sys import time import thread import uuid + +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) #************************************************************** # consts #************************************************************** @@ -13,7 +19,6 @@ DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) except: pass SocketTimeout = 0.1 -KeyTypes = ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@') class JobIdentifiers: # fixed job identifiers @@ -200,6 +205,13 @@ fp.close() return read +def saveRemoveFile(fpath): + if fpath is not None: + if os.path.isfile(fpath): + os.remove(fpath) + return True + return False + def saveWriteFile(fpath, data): """Writes data to a file i the savest manner possible @@ -221,6 +233,22 @@ fp.close() return written +def startFreenet(cmdline): + """Starts freenet + @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat') + @return: (string) whatever freenet returns + """ + #TODO: on windows it may be necessary to hide the comand window + p = subprocess.Popen( + args=cmdline, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = p.communicate() + return stdout + + def fcpBool(pythonBool): """Converts a python bool to a fcp bool @param pythonBool: (bool) @@ -237,6 +265,95 @@ """ return fcpBool == 'true' + +def uriToList(uri): + """Splits a freenet uri into its components + @param uri: (str) uri to split + @return: (list) or components + @note: additional dexoration like 'freenet:' or 'http://blah' are ignored + + >>> uriToList('SSK@foo') + ['SSK@foo'] + + >>> uriToList('SSK@foo/bar/baz') + ['SSK@foo', 'bar', 'baz'] + + >>> uriToList('freenet:SSK@foo') + ['SSK@foo'] + + >>> uriToList('http://foo/SSK@foo') + ['SSK@foo'] + + >>> uriToList('http://foo') + [] + + """ + if uri.startswith('freenet:'): + uri = uri[len('freenet:'): ] + + components = [] + head = uri + while head: + head, tail = posixpath.split(head) + components.append(tail) + + components.reverse() + + while components: + if components[0][:4] in FcpKeys.KeysAll: + break + else: + components.pop(0) + + return components + +def uriKeyType(uri): + pass + + +def isValidUri(uri): + """Checks if an uri is a valid freenet uri + @param uri: (str) uri to check + @reuturn: (bool) + """ + return bool(UriToList(uri)) + + +def splitUri(uri): + """Splits an uri into uri and filename + @param uri: uri to split + @return: tuple(uri, filename) + + >>> splitUri('SSK@foo/bar/baz') + ('SSK@foo', 'bar/baz') + + >>> splitUri('SSK@foo') + ('SSK@foo', '') + + >>> splitUri('NoUri') + ('NoUri', '') + + """ + L = uriToList(uri) + if not L: + return (uri, '') + names = L[1:] + if not names: + name = '' + else: + name = '/'.join(names) + return (L[0], name) + + +def fileNameFromUri(uri): + """Returns the filename part of an uri + @return: (str) filename. If no filename is found the uri is returned unchanged + """ + tmp_uri, name = splitUri(uri) + if name: + return name + return uri + #********************************************************************** # classes #********************************************************************** @@ -303,7 +420,6 @@ #************************************************************************** # jobs #************************************************************************** -#TODO: do somrthing that this class does not lock the queue class JobBase(object): """Base class for jobs""" @@ -311,18 +427,28 @@ def __init__(self, fcpClient, identifier, message): - self.fcpClient = fcpClient - self.fcpIdentifier = identifier - self.fcpMessage = message - self.fcpResult = None - self.fcpTime = 0 + self.fcpClient = fcpClient # FcpClient() instance the job belongs to + self.fcpError = None # last error (either this is set or dcpResult) + self.fcpIdentifier = identifier # + self.fcpMessage = message # message send to node + self.fcpResult = None # job result + self.fcpTime = 0 # start time (will hld duration whern the job is complte) + def displayName(self): + return 'JobBase' + def start(self): self.fcpTime = time.time() self.fcpClient.sendMessageEx(self.fcpMessage) + def error(self, msg): + self.fcpTime = time.time() - self.fcpTime + self.fcpError = msg + self.fcpResult = None + def stop(self, result): self.fcpTime = time.time() - self.fcpTime + self.fcpError = None self.fcpResult = result @@ -338,8 +464,10 @@ ) JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) + def displayName(self): + return 'NodeHello' + - class JobListPeers(JobBase): _fcp_auto_remove_ = True @@ -353,15 +481,21 @@ JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) + def displayName(self): + return 'ListPeers' + def handlePeer(self,msg): pass -class JobFileInfo(JobBase): +class JobGetFileInfo(JobBase): + """""" _fcp_auto_remove_ = False + + # idea is to provoke a GetFailed message and take mimetype and size from it def __init__(self, fcpClient, uri, **params): """ @param fcpClient: FcpClient() instance @@ -378,7 +512,7 @@ Messages.ClientGet, Identifier=identifier, URI=uri, - MaxSize='ase0', + MaxSize='0', ReturnType='none', Verbosity='1', **params @@ -386,36 +520,37 @@ JobBase.__init__(self, fcpClient, identifier, message) + def displayName(self): + return 'GetFileInfo' + def handleProgress(self, msg): pass - def stop(self, msg): - JobBase.stop(self, msg) - error = result = None + def error(self, msg): + JobBase.error(self, msg) if msg.name == Messages.GetFailed: if msg['Code'] == FetchErrors.TooBig: - result = ( + self.fcpError = None + self.fcpResult = ( msg.get('ExpectedMetadata.ContentType', ''), msg.get('ExpectedDataLength', '') ) - else: - error, result = msg['Code'], msg - - elif msg.name == Messages.DataFound: - result = ( + #else: + # raise ValueError('Unhandled message: %s' % msg.name) + + + def stop(self, msg): + JobBase.stop(self, msg) + if msg.name == Messages.DataFound: + self.fcpResult = ( msg.get('Metadata.ContentType', ''), msg.get('DataLength', '') ) - - elif msg.name == Messages.ProtocolError: - error, result = msg['Code'], msg - else: raise ValueError('Unhandled message: %s' % msg.name) - - self.fcpResult = error, result + #TODO: handle case where directories are registered multiple times class JobTestDDA(JobBase): @@ -432,7 +567,9 @@ JobBase.__init__(self, fcpClient, directory, message) self.fcpTmpFile = None - + def displayName(self): + return 'TestDDA' + def handleTestDDAReply(self, msg): fpathWrite = msg.params.get('WriteFilename', None) fpathRead = msg.params.get('ReadFilename', None) @@ -456,19 +593,58 @@ ReadContent=readContent, ) + def error(self, msg): + JobBase.error(self, msg) + saveRemoveFile(self.fcpTmpFile) + self.fcpTmpFile = None + + def stop(self, msg): JobBase.stop(self, msg) - if self.fcpTmpFile is not None: - if os.path.isfile(self.fcpTmpFile): - os.remove(self.fcpTmpFile) + saveRemoveFile(self.fcpTmpFile) + self.fcpTmpFile = None #************************************************************************** # fcp client #************************************************************************** +class LogMessages: + Connecting = 'Connecting to node...' + Connected = 'Connected to node' + ConnectionRetry = 'Connecting to node failed... retrying' + ConnectingFailed = 'Connecting to node failed' + + ClientClose = 'Closing client' + + MessageSend = 'Message send' + MessageReceived = 'Message received' + + JobStart = 'Starting job: ' + JobStop = 'Stopping job: ' + JobsCompleted = 'All jobs completed' + + KeyboardInterrupt = 'Keyboard interrupt' + SocketDead = 'Socket is dead' + + +#TODO: no idea what happens on reconnect if socket died. What about running jobs? +#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. class FcpClient(object): - def __init__(self): - + def __init__(self, + name='', + errorHandler=None, + verbosity=logging.WARNING, + logMessages=LogMessages + ): + """ + @param name: name of the client instance or '' (for debugging) + @param errorHandler: will be called if the socket conncetion to the node is dead + with two params: FcpSocketError + details. When the handler is called the client + is already closed. + @param verbosity: verbosity level for debugging + @param logMessages: LogMessages class containing messages + """ + self._isConnected = False self._jobs = { 'all': {}, @@ -476,24 +652,34 @@ 'running': [], 'complete': [], } + self._errrorHandler = errorHandler + self._log = logging.getLogger('FcpClient20:%s' % name) + self._logMessages = logMessages self._lock = thread.allocate_lock() self._socket = None - + self.setVerbosity(verbosity) + atexit.register(self.close) def close(self): + self._log.info(self._logMessages.ClientClose) if self._socket is not None: self._socket.close() self._socket = None + #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): + + self._log.info(self._logMessages.Connecting) + # poll untill freenet responds time_elapsed = 0 while time_elapsed <= repeat: # try to Connect socket - self.close() + if self._socket is not None: + self.close() self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.settimeout(SocketTimeout) try: @@ -501,19 +687,19 @@ except Exception, d: pass else: - #self._isConnected = True + self._log.info(self._logMessages.Connected) return True + self._log.info(self._logMessages.ConnectionRetry) + # continue polling time_elapsed += timeout time.sleep(timeout) + self._log.info(self._logMessages.ConnectingFailed) return False - #def __nonzero__(self): - # return self._isConnected - def addJob(self, job): self._lock.acquire(True) try: @@ -523,9 +709,12 @@ self._jobs['running'].append(job) finally: self._lock.release() + + self._log.info(self._logMessages.JobStart + job.displayName()) job.start() - def finishJob(self, identifier, msg): + + def stopJob(self, identifier, msg, error=False): self._lock.acquire(True) try: job = self._jobs['all'].get(identifier, None) @@ -540,7 +729,11 @@ if job is None: raise ValueError('No such job: %r' % identifier) - job.stop(msg) + self._log.info(self._logMessages.JobStop + job.displayName()) + if error: + job.error(msg) + else: + job.stop(msg) def notifyJob(self, identifier, handler, msg): @@ -558,27 +751,25 @@ # TODO: # x. push pending jobs - # x. on error stop this thingy - - n = 0 - while True: - if not self._lock.acquire(False): - continue + try: + while True: + if not self._lock.acquire(False): + continue + + try: + if not self._jobs['pending'] and not self._jobs['running']: + self._log.info(self._logMessages.JobsCompleted) + break + finally: + self._lock.release() + + self.next() + except KeyboardInterrupt: + self._log(self._logMessages.KeyboardInterrupt) + self.close() - try: - if not self._jobs['pending'] and not self._jobs['running']: - break - finally: - self._lock.release() - - msg = self.readMessage() - self.handleMessage(msg) - - - n += 1 - if n > 50: break - + #TODO: some info when all jobs are completed def next(self): msg = self.readMessage() self.handleMessage(msg) @@ -586,33 +777,32 @@ def handleMessage(self, msg): - print msg.pprint() - + self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) + if msg.name == Messages.NodeHello: #connectionIdentifier = msg['ConnectionIdentifier'] - self.finishJob(JobIdentifiers.ClientHello, msg) + self.stopJob(JobIdentifiers.ClientHello, msg) elif msg.name == Messages.ProtocolError: code = msg['Code'] - if code == ProtocolErrors.NoLateClientHellos: - self.finishJob(JobIdentifiers.ClientHello, msg) + self.stopJob(JobIdentifiers.ClientHello, msg, error=True) else: identifier = msg.get('Identifier', None) if identifier is None: pass # raise ??? else: - self.finishJob(identifier, msg) + self.stopJob(identifier, msg, error=True) elif msg.name == Messages.Peer: self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg) elif msg.name == Messages.EndListPeers: - self.finishJob(IdentifierListPeers, msg) + self.stopJob(IdentifierListPeers, msg) elif msg.name == Messages.GetFailed: - self.finishJob(msg['Identifier'], msg) + self.stopJob(msg['Identifier'], msg, error=True) elif msg.name == Messages.SimpleProgress: self.notifyJob(msg['Identifier'], 'handleProgress', msg) @@ -621,7 +811,7 @@ self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg) elif msg.name == Messages.TestDDAComplete: - self.finishJob(msg['Directory'], msg) + self.stopJob(msg['Directory'], msg) elif msg.name == Messages.IdentifierCollision: pass @@ -642,6 +832,10 @@ msg = MessageSocketTimeout() break except Exception, d: + self._log.info(self._logMessages.SocketDead) + self.close() + if self._errorHandler is not None: + self._errorHandler(FcpSocketError, d) raise FcpSocketError(d) #!! if p == '\r': # ignore @@ -662,7 +856,12 @@ n = int(msg.params['DataLength']) try: msg.data = self._socket.recv(n) + if not msg.data: raise ValueError('Socket is dead') except Exception, d: + self._log.info(self._logMessages.SocketDead) + self.close() + if self._errorHandler is not None: + self._errorHandler(FcpSocketError, d) raise FcpSocketError(d) #!! else: @@ -674,8 +873,13 @@ return msg - + def setLogMessages(self, logMessages): + self._logMessages = logMessages + + def setVerbosity(self, verbosity): + self._log.setLevel(verbosity) + def sendMessage(self, name, data=None, **params): """Sends a message to freenet @param name: name of the message to send @@ -691,25 +895,50 @@ @param msg: (Message) message to send @return: Message """ - #self.log.info('SendMessage\n' + msg.pprint()) rawMsg = msg.toString() + self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint()) try: self._socket.sendall(rawMsg) except Exception, d: + self._log.info(self._logMessages.SocketDead) + self.close() + if self._errorHandler is not None: + self._errorHandler(FcpSocketError, d) raise FcpSocketError(d) - #TODO: allow for an error handler to handle return msg #***************************************************************************** # #***************************************************************************** if __name__ == '__main__': - c = FcpClient() + c = FcpClient(name='test', verbosity=logging.DEBUG) if c.connect(): job1 = JobNodeHello(c) c.addJob(job1) c.run() print '---------------------------' - print job1.fcpResult.pprint() + print job1.fcpError + print job1.fcpResult + print job1.fcpTime + print '---------------------------' + + + job2 = JobTestDDA(c, os.path.dirname(__file__)) + c.addJob(job2) + c.run() + print '---------------------------' + print job1.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' + + job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + c.addJob(job2) + c.run() + print '---------------------------' + print job1.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ju...@us...> - 2007-10-15 17:42:21
|
Revision: 4 http://fclient.svn.sourceforge.net/fclient/?rev=4&view=rev Author: jurner Date: 2007-10-15 10:42:24 -0700 (Mon, 15 Oct 2007) Log Message: ----------- started implementing fcp20 client Added Paths: ----------- trunk/fclient/ trunk/fclient/__init__.py trunk/fclient/fclient_lib/ trunk/fclient/fclient_lib/__init__.py trunk/fclient/fclient_lib/fcp/ trunk/fclient/fclient_lib/fcp/__init__.py trunk/fclient/fclient_lib/fcp/fcp20.py Added: trunk/fclient/__init__.py =================================================================== --- trunk/fclient/__init__.py (rev 0) +++ trunk/fclient/__init__.py 2007-10-15 17:42:24 UTC (rev 4) @@ -0,0 +1 @@ + Added: trunk/fclient/fclient_lib/__init__.py =================================================================== --- trunk/fclient/fclient_lib/__init__.py (rev 0) +++ trunk/fclient/fclient_lib/__init__.py 2007-10-15 17:42:24 UTC (rev 4) @@ -0,0 +1 @@ + Added: trunk/fclient/fclient_lib/fcp/__init__.py =================================================================== --- trunk/fclient/fclient_lib/fcp/__init__.py (rev 0) +++ trunk/fclient/fclient_lib/fcp/__init__.py 2007-10-15 17:42:24 UTC (rev 4) @@ -0,0 +1 @@ + Added: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py (rev 0) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-15 17:42:24 UTC (rev 4) @@ -0,0 +1,715 @@ + +import os +import socket +import time +import thread +import uuid +#************************************************************** +# consts +#************************************************************** +DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() +DefaultFcpPort = 9481 +try: + DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) +except: pass +SocketTimeout = 0.1 +KeyTypes = ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@') + +class JobIdentifiers: + # fixed job identifiers + # note that the client can only handle one job of these at a time + ClientHello = 'ClientHello' + ListPeers = 'ListPeers' + +class Messages: + + # client messages + ClientHello = 'ClientHello' + ListPeer = 'ListPeer' # (since 1045) + ListPeers = 'ListPeers' + ListPeerNotes = 'ListPeerNotes' + AddPeer = 'AddPeer' + ModifyPeer = 'ModifyPeer' + ModifyPeerNote = 'ModifyPeerNote' + RemovePeer = 'RemovePeer' + GetNode = 'GetNode' + GetConfig = 'GetConfig' # (since 1027) + ModifyConfig = 'ModifyConfig' # (since 1027) + TestDDARequest = 'TestDDARequest' # (since 1027) + TestDDAResponse = 'TestDDAResponse' # (since 1027) + GenerateSSK = 'GenerateSSK' + ClientPut = 'ClientPut' + ClientPutDiskDir = 'ClientPutDiskDir' + ClientPutComplexDir = 'ClientPutComplexDir' + ClientGet = 'ClientGet' + SubscribeUSK = 'SubscribeUSK' + WatchGlobal = 'WatchGlobal' + GetRequestStatus = 'GetRequestStatus' + ListPersistentRequests = 'ListPersistentRequests' + RemovePersistentRequest = 'RemovePersistentRequest' + ModifyPersistentRequest = 'ModifyPersistentRequest' + Shutdown = 'Shutdown' + + # node messages + NodeHello = 'NodeHello' + CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName' + Peer = 'Peer' + PeerNote = 'PeerNote' + EndListPeers = 'EndListPeers' + EndListPeerNotes = 'EndListPeerNotes' + PeerRemoved = 'PeerRemoved' + NodeData = 'NodeData' + ConfigData = 'ConfigData' # (since 1027) + TestDDAReply = 'TestDDAReply' # (since 1027) + TestDDAComplete = 'TestDDAComplete' # (since 1027) + SSKKeypair = 'SSKKeypair' + PersistentGet = 'PersistentGet' + PersistentPut = 'PersistentPut' + PersistentPutDir = 'PersistentPutDir' + URIGenerated = 'URIGenerated' + PutSuccessful = 'PutSuccessful' + PutFetchable = 'PutFetchable' + DataFound = 'DataFound' + AllData = 'AllData' + StartedCompression = 'StartedCompression' + FinishedCompression = 'FinishedCompression' + SimpleProgress = 'SimpleProgress' + EndListPersistentRequests = 'EndListPersistentRequests' + PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016) + PersistentRequestModified = 'PersistentRequestModified' # (since 1016) + PutFailed = 'PutFailed' + GetFailed = 'GetFailed' + ProtocolError = 'ProtocolError' + IdentifierCollision = 'IdentifierCollision' + UnknownNodeIdentifier = 'UnknownNodeIdentifier' + UnknownPeerNoteType = 'UnknownPeerNoteType' + SubscribedUSKUpdate = 'SubscribedUSKUpdate' + + +class Priorities: + Maximum = 0 + Interactive = 1 + SemiInteractive = 2 + Updatable = 3 + Bulk = 4 + Prefetch = 5 + Minimum = 6 + + PriorityMin = Minimum + PriorityDefault = Bulk + + +# errors + +class FetchErrors: + MaxArchiveRecursionExceeded = '1' + UnknownSplitfileMetadata = '2' + UnknownMetadata = '3' + InvalidMetadata = '4' + ArchiveFailure = '5' + BlockDecodeError = '6' + MaxMetadataLevelsExceeded = '7' + MaxArchiveRestartsExceeded = '8' + MaxRecursionLevelExceeded = '9' + NotAnArchve = '10' + TooManyMetastrings = '11' + BucketError = '12' + DataNotFound = '13' + RouteNotFound = '14' + RejectedOverload = '15' + TooManyRedirects = '16' + InternalError = '17' + TransferFailed = '18' + SplitfileError = '19' + InvalidUri = '20' + TooBig = '21' + MetadataTooBig = '22' + TooManyBlocks = '23' + NotEnoughMetastrings = '24' + Canceled = '25' + ArchiveRestart = '26' + PermanentRedirect = '27' + NotAllDataFound = '28' + + +class InsertErrors: + InvalidUri = '1' + BucketError = '2' + InternalError = '3' + RejectedOverload = '4' + RouteNotFound = '5' + FatalErrorInBlocks = '6' + TooManyRetriesInBlock = '7' + RouteReallyNotFound = '8' + Collision = '9' + Canceled = '10' + + +class ProtocolErrors: + ClientHelloMustBeFirst = '1' + NoLateClientHellos = '2' + MessageParseError = '3' + UriParseError = '4' + MissingField = '5' + ErrorParsingNumber = '6' + InvalidMessage = '7' + InvalidField = '8' + FileNotFound = '9' + DiskTargetExists = '10' + SameDirectoryExpected = '11' + CouldNotCreateFile = '12' + CouldNotWriteFile = '13' + CouldNotRenameFile = '14' + NoSuchIdentifier = '15' + NotSupported = '16' + InternalError = '17' + ShuttingDown = '18' + NoSuchNodeIdentifier = '19' # Unused since 995 + UrlParseError = '20' + ReferenceParseError = '21' + FileParseError = '22' + NotAFile = '23' + AccessDenied = '24' + DDADenied = '25' + CouldNotReadFile = '26' + ReferenceSignature = '27' + CanNotPeerWithSelf = '28' + PeerExists = '29' + OpennetDisabled = '30' + DarknetOnly = '31' + +#********************************************************************** +# functions +#********************************************************************** +def newIdentifier(): + return str(uuid.uuid4()) + +def saveReadFile(fpath): + """Reads contents of a file in the savest manner possible + @param fpath: file to write + @return: contents of the file if successful, None otherwise + """ + read = None + try: + fp = open(fpath, 'rb') + except: pass + else: + try: + read = fp.read() + except: pass + fp.close() + return read + + +def saveWriteFile(fpath, data): + """Writes data to a file i the savest manner possible + @param fpath: file to write + @param data: data to write to file + @return: True if successful, False otherwise + """ + written = False + try: + fp = open(fpath, 'wb') + except: pass + else: + try: + fp.write(data) + written = True + except: + fp.Close() + else: + fp.close() + return written + +def fcpBool(pythonBool): + """Converts a python bool to a fcp bool + @param pythonBool: (bool) + @return: (str) 'true' or 'false' + """ + if pythonBool: + return 'true' + return 'false' + +def pythonBool(fcpBool): + """Converts a fcp bool to a python bool + @param pythonBool: 'true' or 'false' + @return: (bool) True or False + """ + return fcpBool == 'true' + +#********************************************************************** +# classes +#********************************************************************** +class FcpSocketError(Exception): pass +class Message(object): + """Class wrapping a freenet message""" + + Name = 'UMessage' + + + def __init__(self, name, data=None, **params): + """ + @param name: messge name + @param data: data associated to the messge (not yet implemented) + @param params: {field-name: value, ...} of parameters of the message + @note: all params can be accessed as attributes of the class + """ + self.data = data + self.name = name + self.params = params + + + def toString(self): + """Returns a string with the formated message, ready to be send""" + + # TODO: "Data" not yet implemented + out = [self.name, ] + for param, value in self.params.items(): + out.append('%s=%s' % (param, value)) + out.append('EndMessage\n') + return '\n'.join(out) + + + def pprint(self): + """Returns the message as nicely formated human readable string""" + + out = [self.name, ] + for param, value in self.params.items(): + out.append(' %s=%s' % (param, value)) + out.append('EndMessage') + out.append('') + return '\n'.join(out) + + def __getitem__(self, name): + """Returns the message parameter 'name' """ + return self.params[name] + + def get(self, name, default=None): + """Returns the message parameter 'name' or 'default' """ + return self.params.get(name, default) + + def __setitem__(self, name, value): + """Sets the message parameter 'name' to 'value' """ + self.params[name] = value + + +class MessageSocketTimeout(Message): + + Name = 'USocketTimeout' + + def __init__(self): + Message.__init__(self, self.Name) + +#************************************************************************** +# jobs +#************************************************************************** +#TODO: do somrthing that this class does not lock the queue +class JobBase(object): + """Base class for jobs""" + + _fcp_auto_remove_ = True + + def __init__(self, fcpClient, identifier, message): + + self.fcpClient = fcpClient + self.fcpIdentifier = identifier + self.fcpMessage = message + self.fcpResult = None + self.fcpTime = 0 + + def start(self): + self.fcpTime = time.time() + self.fcpClient.sendMessageEx(self.fcpMessage) + + def stop(self, result): + self.fcpTime = time.time() - self.fcpTime + self.fcpResult = result + + +class JobNodeHello(JobBase): + + _fcp_auto_remove_ = True + + def __init__(self, fcpClient, expectedVersion='2.0'): + message = Message( + Messages.ClientHello, + Name=newIdentifier(), + ExpectedVersion=expectedVersion, + ) + JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) + + + +class JobListPeers(JobBase): + + _fcp_auto_remove_ = True + + def __init__(self, fcpClient, withMetaData=False, withVolantile=False): + message = Message( + Messages.ListPeers, + WithMetadata='true' if withMetaData else 'false', + WithVolatile='true' if withVolantile else 'false', + ) + JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) + + + def handlePeer(self,msg): + pass + + + +class JobFileInfo(JobBase): + + _fcp_auto_remove_ = False + + def __init__(self, fcpClient, uri, **params): + """ + @param fcpClient: FcpClient() instance + @param uri: uri of the file to retrieve info for + @param params: additional parameters: + IgnoreDS='true' / 'false' + DSOnly='true' / 'false' + MaxRetries=-1 ...N + PriorityClass=Priority* + + """ + identifier = newIdentifier() + message = Message( + Messages.ClientGet, + Identifier=identifier, + URI=uri, + MaxSize='ase0', + ReturnType='none', + Verbosity='1', + **params + ) + JobBase.__init__(self, fcpClient, identifier, message) + + + def handleProgress(self, msg): + pass + + + def stop(self, msg): + JobBase.stop(self, msg) + error = result = None + if msg.name == Messages.GetFailed: + if msg['Code'] == FetchErrors.TooBig: + result = ( + msg.get('ExpectedMetadata.ContentType', ''), + msg.get('ExpectedDataLength', '') + ) + else: + error, result = msg['Code'], msg + + elif msg.name == Messages.DataFound: + result = ( + msg.get('Metadata.ContentType', ''), + msg.get('DataLength', '') + ) + + elif msg.name == Messages.ProtocolError: + error, result = msg['Code'], msg + + else: + raise ValueError('Unhandled message: %s' % msg.name) + + self.fcpResult = error, result + + +#TODO: handle case where directories are registered multiple times +class JobTestDDA(JobBase): + + _fcp_auto_remove_ = False + + def __init__(self, fcpClient, directory, read=True, write=True): + message = Message( + Messages.TestDDARequest, + Directory=directory, + WantReadDirectory=fcpBool(read), + WantWriteDirectory=fcpBool(write), + ) + JobBase.__init__(self, fcpClient, directory, message) + self.fcpTmpFile = None + + + def handleTestDDAReply(self, msg): + fpathWrite = msg.params.get('WriteFilename', None) + fpathRead = msg.params.get('ReadFilename', None) + readContent = '' + if fpathWrite is not None: + written = saveWriteFile(fpathWrite, msg['ContentToWrite']) + if not written: + if os.path.isfile(fpathWrite): + os.remove(fpathWrite) + else: + self.fcpTmpFile = fpathWrite + + if fpathRead is not None: + readContent = saveReadFile(fpathRead) + if readContent is None: + readContent = '' + + self.fcpClient.sendMessage( + Messages.TestDDAResponse, + Directory=msg['Directory'], + ReadContent=readContent, + ) + + def stop(self, msg): + JobBase.stop(self, msg) + if self.fcpTmpFile is not None: + if os.path.isfile(self.fcpTmpFile): + os.remove(self.fcpTmpFile) + +#************************************************************************** +# fcp client +#************************************************************************** +class FcpClient(object): + + def __init__(self): + + self._isConnected = False + self._jobs = { + 'all': {}, + 'pending': [], + 'running': [], + 'complete': [], + } + self._lock = thread.allocate_lock() + self._socket = None + + + + def close(self): + if self._socket is not None: + self._socket.close() + self._socket = None + + + def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): + # poll untill freenet responds + time_elapsed = 0 + while time_elapsed <= repeat: + + # try to Connect socket + self.close() + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(SocketTimeout) + try: + self._socket.connect((host, port)) + except Exception, d: + pass + else: + #self._isConnected = True + return True + + # continue polling + time_elapsed += timeout + time.sleep(timeout) + + return False + + + #def __nonzero__(self): + # return self._isConnected + + def addJob(self, job): + self._lock.acquire(True) + try: + if job.fcpIdentifier in self._jobs['all']: + raise ValueError('Duplicate job: %r' % job.identifier) + self._jobs['all'][job.fcpIdentifier] = job + self._jobs['running'].append(job) + finally: + self._lock.release() + job.start() + + def finishJob(self, identifier, msg): + self._lock.acquire(True) + try: + job = self._jobs['all'].get(identifier, None) + if job is not None: + self._jobs['running'].remove(job) + if job._fcp_auto_remove_: + del self._jobs['all'][identifier] + else: + self._jobs['complete'].append(job) + finally: + self._lock.release() + + if job is None: + raise ValueError('No such job: %r' % identifier) + job.stop(msg) + + + def notifyJob(self, identifier, handler, msg): + self._lock.acquire(True) + try: + job = self._jobs['all'].get(identifier, None) + finally: + self._lock.release() + if job is None: + raise ValueError('No such job: %r' % identifier) + getattr(job, handler)(msg) + + + def run(self): + + # TODO: + # x. push pending jobs + # x. on error stop this thingy + + n = 0 + while True: + if not self._lock.acquire(False): + continue + + try: + if not self._jobs['pending'] and not self._jobs['running']: + break + finally: + self._lock.release() + + msg = self.readMessage() + self.handleMessage(msg) + + + n += 1 + if n > 50: break + + + def next(self): + msg = self.readMessage() + self.handleMessage(msg) + + + def handleMessage(self, msg): + + print msg.pprint() + + if msg.name == Messages.NodeHello: + #connectionIdentifier = msg['ConnectionIdentifier'] + self.finishJob(JobIdentifiers.ClientHello, msg) + + elif msg.name == Messages.ProtocolError: + code = msg['Code'] + + if code == ProtocolErrors.NoLateClientHellos: + self.finishJob(JobIdentifiers.ClientHello, msg) + + else: + identifier = msg.get('Identifier', None) + if identifier is None: + pass # raise ??? + else: + self.finishJob(identifier, msg) + + elif msg.name == Messages.Peer: + self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg) + + elif msg.name == Messages.EndListPeers: + self.finishJob(IdentifierListPeers, msg) + + elif msg.name == Messages.GetFailed: + self.finishJob(msg['Identifier'], msg) + + elif msg.name == Messages.SimpleProgress: + self.notifyJob(msg['Identifier'], 'handleProgress', msg) + + elif msg.name == Messages.TestDDAReply: + self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg) + + elif msg.name == Messages.TestDDAComplete: + self.finishJob(msg['Directory'], msg) + + elif msg.name == Messages.IdentifierCollision: + pass + + + def readMessage(self): + """Reads the next message directly from the socket and dispatches it + @return: valid or invalid Message() + """ + msg = Message(None) + buf = [] + while True: + + try: + p = self._socket.recv(1) + if not p: raise ValueError('Socket is dead') + except socket.timeout, d: # no new messages in queue + msg = MessageSocketTimeout() + break + except Exception, d: + raise FcpSocketError(d) #!! + + if p == '\r': # ignore + continue + + if p != '\n': + buf.append(p) + continue + + line = ''.join(buf) + if line in ('End', "EndMessage"): + break + buf = [] + + if msg.name is None: + msg.name = line + elif line == 'Data': + n = int(msg.params['DataLength']) + try: + msg.data = self._socket.recv(n) + except Exception, d: + raise FcpSocketError(d) #!! + + else: + head, sep, tail = line.partition('=') + msg.params[head] = tail + if not sep: + # TODO: chek for invalid messages or not + pass + + return msg + + + + def sendMessage(self, name, data=None, **params): + """Sends a message to freenet + @param name: name of the message to send + @param data: data to atatch to the message + @param params: {para-name: param-calue, ...} of parameters to pass along + with the message (see freenet protocol) + """ + return self.sendMessageEx(Message(name, data=data, **params)) + + + def sendMessageEx(self, msg): + """Sends a message to freenet + @param msg: (Message) message to send + @return: Message + """ + #self.log.info('SendMessage\n' + msg.pprint()) + rawMsg = msg.toString() + try: + self._socket.sendall(rawMsg) + except Exception, d: + raise FcpSocketError(d) + #TODO: allow for an error handler to handle + return msg + +#***************************************************************************** +# +#***************************************************************************** +if __name__ == '__main__': + c = FcpClient() + if c.connect(): + job1 = JobNodeHello(c) + c.addJob(job1) + + c.run() + print '---------------------------' + print job1.fcpResult.pprint() + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ju...@us...> - 2007-10-15 13:12:59
|
Revision: 3 http://fclient.svn.sourceforge.net/fclient/?rev=3&view=rev Author: jurner Date: 2007-10-15 06:12:58 -0700 (Mon, 15 Oct 2007) Log Message: ----------- init Added Paths: ----------- trunk/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ju...@us...> - 2007-10-15 13:04:34
|
Revision: 2 http://fclient.svn.sourceforge.net/fclient/?rev=2&view=rev Author: jurner Date: 2007-10-15 06:04:38 -0700 (Mon, 15 Oct 2007) Log Message: ----------- init Added Paths: ----------- tags/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |