SF.net SVN: fclient: [11] trunk/fclient/fclient_lib/fcp/fcp2_0.py
Status: Pre-Alpha
Brought to you by:
jurner
From: <jU...@us...> - 2007-10-23 09:19:10
|
Revision: 11 http://fclient.svn.sourceforge.net/fclient/?rev=11&view=rev Author: jUrner Date: 2007-10-23 02:19:12 -0700 (Tue, 23 Oct 2007) Log Message: ----------- combed over message dispatch Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11) @@ -14,7 +14,6 @@ #************************************************************** # consts #************************************************************** -NameClient = 'Fcp20Client' DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() try: DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) @@ -28,6 +27,13 @@ """ ClientHello = 'ClientHello' ListPeers = 'ListPeers' + ListPeerNotes = 'ListPeerNotes' + GetNode = 'GetNode' + GetConfig = 'GetConfig' + ModifyConfig = 'ModifyConfig' + WatchGlobal = 'WatchGlobal' + Shutdown = 'Shutdown' + class Priorities: @@ -492,6 +498,10 @@ self.jobTimeStop = 0 + def handleMessage(self, msg): + return False + + def handleStart(self): """Starts the job""" self.jobResult = None @@ -514,7 +524,6 @@ goes well, you will get a NodeHello in response. """ - def __init__(self, fcpClient, name=None, expectedVersion='2.0'): """ @param name: (str) connection name or None, to use an arbitrary name @@ -529,13 +538,36 @@ def displayName(self): return 'ClientHello' + + + def handleMessage(self, msg): + if msg.name == Message.NodeHello: + return self.handleNodeHello(msg) + elif msg.name == Message.ProtocolError: + return self.handleProtocolError(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handleNodeHello(self, msg): + self.jobTimeStop = time.time() + self.jobResult = msg + self.jobClient.jobRemove(self.jobIdentifier) + return True + + + def handleProtocolError(self, msg): + raise ProtocolError(msg) + + + + + class JobListPeers(JobBase): """Lists all known peers of the node """ - - + def __init__(self, fcpClient, withMetaData=False, withVolantile=False): message = Message( Message.ListPeers, @@ -545,13 +577,26 @@ JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) - def handlePeer(self,msg): - """Handles the next peer send by the node in form of a 'Peer' message - while the job is running. Overwrite to process. - """ + def handleMessage(self,msg): + if msg.name == Message.EndListPeers: + return self.handleEndListPeers(msg) + elif msg.name == Message.Peer: + return self.handlePeer(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handlePeer(self, msg): + return True + + def handleEndListPeers(self, msg): + self.jobTimeStop = time.time() + self.jobResult = msg + self.jobClient.jobRemove(self.jobIdentifier) + return True + - +#TODO: identifier collisions are not yet handled class JobGetFileInfo(JobBase): """Tries to retieve information about a file. If everything goes well @@ -571,6 +616,11 @@ MaxRetries=-1 ...N PriorityClass=Priority* + @ivar jobResult: will be a tuple(bool error, data). If error is True, no information could be + retrieved and data will be a GetFailed message containing details. If error is False + data will be a tuple(str metadataContentType, str size). Note that both may be empty + string and size may not be accurate. + """ identifier = newIdentifier() message = Message( @@ -612,46 +662,73 @@ ) - def handlePriorityChanged(self): - self.jobMessage['PriorityClass'] = priority + def handleMessage(self, msg): + if msg.name == Message.DataFound: + return self.handleDataFound(msg) + elif msg.name == Message.GetFailed: + return self.handleGetFailed(msg) + elif msg.name == Message.IdentifierCollision: + return self.handleIdentifierCollision(msg) + elif msg.name == Message.PersistentRequestModified: + return self.handlePersistentRequestModified(msg) + elif msg.name == Message.PersistentRequestRemoved: + return self.handlePersistentRequestRemoved(msg) + elif msg.name == Message.SimpleProgress: + return self.handleSimpleProgress(msg) + + else: + raise ValueError('Unexpected message: %s' % msg.name) + - def handleProgress(self, msg): - """Handles the next progress made. Overwrite to process. - """ + def handleDataFound(self, msg): + self.jobTimeStop = time.time() + self.jobResult = ( + False, + ( + msg.get('Metadata.ContentType', ''), + msg.get('DataLength', '') + ) + ) + self.jobClient.jobRemove(self.jobIdentifier) + return True - def handleRequestRemoved(self, msg): + def handleGetFailed(self, msg): + self.jobTimeStop = time.time() + if msg['Code'] == FetchError.TooBig: + self.jobResult = (False, msg) + self.jobResult = ( + False, + ( + msg.get('ExpectedMetadata.ContentType', ''), + msg.get('ExpectedDataLength', '') + ) + ) + else: + self.jobResult = (True, msg) + self.jobClient.jobRemove(self.jobIdentifier) + return True + + + def handleIdentifierCollision(self, msg): + raise + + + def handleSimpleProgress(self, msg): + return True + + + def handlePersistentRequestModified(self, msg): + priorityClass = msg.get('PriorityClass', None) + if priorityClass is not None: + self.jobMessage['PriorityClass'] = priorityClass + return True + + def handlePersistentRequestRemoved(self, msg): if self.jobClient.jobIsRunning(self.jobIdentifier): self.jobClient.jobStop(self.jobIdentifier) + return True + - - def handleStop(self, flagError, msg): - """called when a job is finished - @note: jobResult may contain an 'IdentifierCollision' message. Make shure - to handle this appropriately - """ - JobBase.handleStop(self,flagError, msg) - if msg.name == Message.DataFound: - self.jobResult = ( - msg.get('Metadata.ContentType', ''), - msg.get('DataLength', '') - ) - elif msg.name == Message.GetFailed: - if msg['Code'] == FetchError.TooBig: - self.jobResult = (False, msg) - self.jobResult = ( - msg.get('ExpectedMetadata.ContentType', ''), - msg.get('ExpectedDataLength', '') - ) - # not shure when removing the request is actually required - # so send it anyway - self.jobClient.sendMessage( - Message.RemovePersistentRequest, - Global=fcpBool(False), - Identifier=self.jobIdentifier, - ) - - - #TODO: handle case where directories are registered multiple times class JobTestDDA(JobBase): """Tests a directory for read / write accesss @@ -672,6 +749,16 @@ self.jobTmpFile = None + + def handleMessage(self, msg): + if msg.name == Message.TestDDAReply: + return self.handleTestDDAReply(msg) + elif msg.name == Message.TestDDAComplete: + return self.handleTestDDAComplete(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handleTestDDAReply(self, msg): fpathWrite = msg.params.get('WriteFilename', None) fpathRead = msg.params.get('ReadFilename', None) @@ -693,13 +780,18 @@ Message.TestDDAResponse, Directory=msg['Directory'], ReadContent=readContent, - ) + ) + return True + - - def handleStop(self, flagError, msg): - JobBase.handleStop(self, flagError, msg) + def handleTestDDAComplete(self, msg): + self.jobTimeStop = time.time() + self.jobResult = msg saveRemoveFile(self.jobTmpFile) self.jobTmpFile = None + self.jobClient.jobRemove(self.jobIdentifier) + return True + #************************************************************************** # fcp client @@ -728,6 +820,10 @@ #TODO: no idea what happens on reconnect if socket died. What about running jobs? #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. #TODO: no idea if to add support for pending jobs and queue management here + +#TODO: do not mix directories as identifiers with identifiers (might lead to collisions) +#TODO: how to handle (ProtocolError code 18: Shutting down)? + class FcpClient(object): """Fcp client implementation""" @@ -757,7 +853,7 @@ self._errorHandler = errorHandler self._log = logging.getLogger(name) self._logMessages = logMessages - self._lock = thread.allocate_lock() + self._lock = thread.allocate_lock() # lovk when resources are accessed self._socket = None self.setVerbosity(verbosity) @@ -806,9 +902,8 @@ # as expected the socket simply breaks. So take it over. job = JobClientHello(self) self.jobAdd(job, synchron=True) - error, msg = job.jobResult - assert not error, 'Error should have been caught by handleMessage()' - return msg + assert job.jobResult is not None, 'ClientHello is not working as expected' + return job.jobResult self._log.info(self._logMessages.ConnectionRetry) @@ -827,59 +922,66 @@ """ - #if msg.name != 'USocketTimeOut': + if msg.name == 'USocketTimeOut': + return True + self._log.debug(self._logMessages.MessageReceived + msg.pprint()) - if msg.name == Message.NodeHello: - #connectionIdentifier = msg['ConnectionIdentifier'] - self.jobStop(FixedJobIdentifiers.ClientHello, msg) - - elif msg.name == Message.ProtocolError: + + if msg.name == Message.ProtocolError: code = msg['Code'] - #if code == ProtocolError.NoLateClientHellos: - # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True) - #elif code == ProtocolError.ClientHelloMustBeFirst: - # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True) - #else: - identifier = msg.get('Identifier', None) - if identifier is None: - #TODO: inform caller - raise ProtocolError(msg) + if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst: + return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) + + elif code == ProtocolError.Shutdown: + if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg): + + # ######################################## + #TODO: ??? + + return True + else: - return self.jobStop(identifier, msg, flagError=True) + identifier = msg.get('Identifier', None) + if identifier is None: + #TODO: inform caller + raise ProtocolError(msg) + else: + return self.jobDispatchMessage(identifier, msg) + + else: + + # check if the is something like an identifier in the message + if msg.name == Message.TestDDAReply: + identifier = msg['Directory'] + elif msg.name == Message.TestDDAComplete: + identifier = msg['Directory'] + else: + identifier = msg.get('Identifier', None) - if msg.name == Message.DataFound: - return self.jobStop(msg['Identifier'], msg, flagError=True) - - elif msg.name == Message.EndListPeers: - return self.jobStop(FixedJobIdentifiers.ListPeers, msg) - - elif msg.name == Message.GetFailed: - return self.jobStop(msg['Identifier'], msg, flagError=True) + # dispatch to jobs with fixed identifiers + if identifier is None: + + if msg.name == Message.NodeHello: + return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - elif msg.name == Message.PersistentRequestRemoved: - return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg) - - elif msg.name == Message.SimpleProgress: - return self.jobNotify(msg['Identifier'], 'handleProgress', msg) + elif msg.name == Message.EndListPeers: + return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Message.TestDDAComplete: - return self.jobStop(msg['Directory'], msg) + elif msg.name == Message.Peer: + return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Message.TestDDAReply: - return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) + # more here..... + + else: + raise ValueError('Unhandled message: ' + msg.name) + + else: + return self.jobDispatchMessage(identifier, msg) + + raise RuntimeError('Should not have endet here: %s' % msg.name) - elif msg.name == Message.IdentifierCollision: - # identifier is unique to us, but for some reason not unique to the node - #TODO: maybe for future implementations find a way to automise - # reinsertion and a notification via a handleJobChanged() for the - # caller to overwrite - return self.jobStop(msg['Identifier'], msg, flagError=True) - - elif msg.name == Message.Peer: - return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg) - - return False + ######################################################### @@ -905,6 +1007,9 @@ self._lock.acquire(True) try: if job.jobIdentifier in self._jobs['JobMapping']: + raise ValueError('Job with that identifier already present') + + if job.jobIdentifier in self._jobs['JobMapping']: raise ValueError('Duplicate job: %r' % job.jobIdentifier) self._jobs['JobMapping'][job.jobIdentifier] = job self._jobs['JobList'].append(job) @@ -916,7 +1021,19 @@ if synchron: while job.jobResult is None: self.next() + + def jobDispatchMessage(self, identifier, msg): + """Dispatches a message to a job + @param identifier: identifier of the job + @param msg: (Message) message to dispatch + @return: True if the message was handled, False otherwise + """ + job = self.jobGet(identifier) + if job is not None: + return job.handleMessage(msg) + return False + def jobGet(self, identifier): """Returns a job given its identifier @@ -943,36 +1060,15 @@ self._lock.release() return result - - def jobNotify(self, identifier, handler, msg): - """Notifies a job about an event while it is running - @param identifier: identifier of the job to notify - @param handler: (str) method of the job to call to handle the notification - @param msg: Message() to pass to the job - @return: True if a job was found to be notified, False otherwise + + def jobRemove(self, identifier): + """Removes a job unconditionally + @param identifier: identifier of the job to remove + @return: True if the job was found, False otherwise """ self._lock.acquire(True) try: job = self._jobs['JobMapping'].get(identifier, None) - finally: - self._lock.release() - if job is None: - return False - getattr(job, handler)(msg) - return True - - - #TODO: quite unclear when to remove a job - def jobStop(self, identifier, msg, flagError=False): - """Stops a job - @param identifier: identifier of the job to stop - @param msg: Message() to pass to the job as result - @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse - @return: True if a job was found to be stopped, False otherwise - """ - self._lock.acquire(True) - try: - job = self._jobs['JobMapping'].get(identifier, None) if job is not None: self._jobs['JobList'].remove(job) del self._jobs['JobMapping'][identifier] @@ -981,10 +1077,9 @@ if job is None: return False self._log.info(self._logMessages.JobStop + job.jobMessage.name) - job.handleStop(flagError, msg) return True - + #TODO: some info when all jobs are completed def next(self): """Pumps the next message waiting @@ -1059,8 +1154,13 @@ @note: use KeyboardInterrupt to stop prematurely """ try: + #n = 0 + while self.hasJobsRunning(): + #n += 1 + #if n > 40: break self.next() + except KeyboardInterrupt: self._log(self._logMessages.KeyboardInterrupt) self.close() @@ -1117,7 +1217,9 @@ def getFile(self, job): pass + + #***************************************************************************** # #***************************************************************************** @@ -1158,10 +1260,22 @@ def foo(): - job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') - c.jobAdd(job2) + job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip') + + + #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + #job.jobIdentifier = job.jobMessage['Identifier'] = 1 + #job.jobMessage['Identifier'] = 1 + #job.jobIdentifier = 1 + c.jobAdd(job) + + #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%201%281%29.jpg') + #job.jobMessage['Identifier'] = 1 + #job.jobIdentifier = 1 + #c.jobAdd(job) + c.run() print '---------------------------' - print job2.jobResult + print job.jobResult print '---------------------------' #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |