SF.net SVN: fclient: [11] trunk/fclient/fclient_lib/fcp/fcp2_0.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <jU...@us...> - 2007-10-23 09:19:10
|
Revision: 11
http://fclient.svn.sourceforge.net/fclient/?rev=11&view=rev
Author: jUrner
Date: 2007-10-23 02:19:12 -0700 (Tue, 23 Oct 2007)
Log Message:
-----------
combed over message dispatch
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11)
@@ -14,7 +14,6 @@
#**************************************************************
# consts
#**************************************************************
-NameClient = 'Fcp20Client'
DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
try:
DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
@@ -28,6 +27,13 @@
"""
ClientHello = 'ClientHello'
ListPeers = 'ListPeers'
+ ListPeerNotes = 'ListPeerNotes'
+ GetNode = 'GetNode'
+ GetConfig = 'GetConfig'
+ ModifyConfig = 'ModifyConfig'
+ WatchGlobal = 'WatchGlobal'
+ Shutdown = 'Shutdown'
+
class Priorities:
@@ -492,6 +498,10 @@
self.jobTimeStop = 0
+ def handleMessage(self, msg):
+ return False
+
+
def handleStart(self):
"""Starts the job"""
self.jobResult = None
@@ -514,7 +524,6 @@
goes well, you will get a NodeHello in response.
"""
-
def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
"""
@param name: (str) connection name or None, to use an arbitrary name
@@ -529,13 +538,36 @@
def displayName(self):
return 'ClientHello'
+
+
+ def handleMessage(self, msg):
+ if msg.name == Message.NodeHello:
+ return self.handleNodeHello(msg)
+ elif msg.name == Message.ProtocolError:
+ return self.handleProtocolError(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+ def handleNodeHello(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = msg
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
+ def handleProtocolError(self, msg):
+ raise ProtocolError(msg)
+
+
+
+
+
class JobListPeers(JobBase):
"""Lists all known peers of the node
"""
-
-
+
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
message = Message(
Message.ListPeers,
@@ -545,13 +577,26 @@
JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
- def handlePeer(self,msg):
- """Handles the next peer send by the node in form of a 'Peer' message
- while the job is running. Overwrite to process.
- """
+ def handleMessage(self,msg):
+ if msg.name == Message.EndListPeers:
+ return self.handleEndListPeers(msg)
+ elif msg.name == Message.Peer:
+ return self.handlePeer(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+ def handlePeer(self, msg):
+ return True
+
+ def handleEndListPeers(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = msg
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
-
+#TODO: identifier collisions are not yet handled
class JobGetFileInfo(JobBase):
"""Tries to retieve information about a file. If everything goes well
@@ -571,6 +616,11 @@
MaxRetries=-1 ...N
PriorityClass=Priority*
+ @ivar jobResult: will be a tuple(bool error, data). If error is True, no information could be
+ retrieved and data will be a GetFailed message containing details. If error is False
+ data will be a tuple(str metadataContentType, str size). Note that both may be empty
+ string and size may not be accurate.
+
"""
identifier = newIdentifier()
message = Message(
@@ -612,46 +662,73 @@
)
- def handlePriorityChanged(self):
- self.jobMessage['PriorityClass'] = priority
+ def handleMessage(self, msg):
+ if msg.name == Message.DataFound:
+ return self.handleDataFound(msg)
+ elif msg.name == Message.GetFailed:
+ return self.handleGetFailed(msg)
+ elif msg.name == Message.IdentifierCollision:
+ return self.handleIdentifierCollision(msg)
+ elif msg.name == Message.PersistentRequestModified:
+ return self.handlePersistentRequestModified(msg)
+ elif msg.name == Message.PersistentRequestRemoved:
+ return self.handlePersistentRequestRemoved(msg)
+ elif msg.name == Message.SimpleProgress:
+ return self.handleSimpleProgress(msg)
+
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
- def handleProgress(self, msg):
- """Handles the next progress made. Overwrite to process.
- """
+ def handleDataFound(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = (
+ False,
+ (
+ msg.get('Metadata.ContentType', ''),
+ msg.get('DataLength', '')
+ )
+ )
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
- def handleRequestRemoved(self, msg):
+ def handleGetFailed(self, msg):
+ self.jobTimeStop = time.time()
+ if msg['Code'] == FetchError.TooBig:
+ self.jobResult = (False, msg)
+ self.jobResult = (
+ False,
+ (
+ msg.get('ExpectedMetadata.ContentType', ''),
+ msg.get('ExpectedDataLength', '')
+ )
+ )
+ else:
+ self.jobResult = (True, msg)
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
+ def handleIdentifierCollision(self, msg):
+ raise
+
+
+ def handleSimpleProgress(self, msg):
+ return True
+
+
+ def handlePersistentRequestModified(self, msg):
+ priorityClass = msg.get('PriorityClass', None)
+ if priorityClass is not None:
+ self.jobMessage['PriorityClass'] = priorityClass
+ return True
+
+ def handlePersistentRequestRemoved(self, msg):
if self.jobClient.jobIsRunning(self.jobIdentifier):
self.jobClient.jobStop(self.jobIdentifier)
+ return True
+
-
- def handleStop(self, flagError, msg):
- """called when a job is finished
- @note: jobResult may contain an 'IdentifierCollision' message. Make shure
- to handle this appropriately
- """
- JobBase.handleStop(self,flagError, msg)
- if msg.name == Message.DataFound:
- self.jobResult = (
- msg.get('Metadata.ContentType', ''),
- msg.get('DataLength', '')
- )
- elif msg.name == Message.GetFailed:
- if msg['Code'] == FetchError.TooBig:
- self.jobResult = (False, msg)
- self.jobResult = (
- msg.get('ExpectedMetadata.ContentType', ''),
- msg.get('ExpectedDataLength', '')
- )
- # not shure when removing the request is actually required
- # so send it anyway
- self.jobClient.sendMessage(
- Message.RemovePersistentRequest,
- Global=fcpBool(False),
- Identifier=self.jobIdentifier,
- )
-
-
-
#TODO: handle case where directories are registered multiple times
class JobTestDDA(JobBase):
"""Tests a directory for read / write accesss
@@ -672,6 +749,16 @@
self.jobTmpFile = None
+
+ def handleMessage(self, msg):
+ if msg.name == Message.TestDDAReply:
+ return self.handleTestDDAReply(msg)
+ elif msg.name == Message.TestDDAComplete:
+ return self.handleTestDDAComplete(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+
def handleTestDDAReply(self, msg):
fpathWrite = msg.params.get('WriteFilename', None)
fpathRead = msg.params.get('ReadFilename', None)
@@ -693,13 +780,18 @@
Message.TestDDAResponse,
Directory=msg['Directory'],
ReadContent=readContent,
- )
+ )
+ return True
+
-
- def handleStop(self, flagError, msg):
- JobBase.handleStop(self, flagError, msg)
+ def handleTestDDAComplete(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = msg
saveRemoveFile(self.jobTmpFile)
self.jobTmpFile = None
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
#**************************************************************************
# fcp client
@@ -728,6 +820,10 @@
#TODO: no idea what happens on reconnect if socket died. What about running jobs?
#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
#TODO: no idea if to add support for pending jobs and queue management here
+
+#TODO: do not mix directories as identifiers with identifiers (might lead to collisions)
+#TODO: how to handle (ProtocolError code 18: Shutting down)?
+
class FcpClient(object):
"""Fcp client implementation"""
@@ -757,7 +853,7 @@
self._errorHandler = errorHandler
self._log = logging.getLogger(name)
self._logMessages = logMessages
- self._lock = thread.allocate_lock()
+ self._lock = thread.allocate_lock() # lovk when resources are accessed
self._socket = None
self.setVerbosity(verbosity)
@@ -806,9 +902,8 @@
# as expected the socket simply breaks. So take it over.
job = JobClientHello(self)
self.jobAdd(job, synchron=True)
- error, msg = job.jobResult
- assert not error, 'Error should have been caught by handleMessage()'
- return msg
+ assert job.jobResult is not None, 'ClientHello is not working as expected'
+ return job.jobResult
self._log.info(self._logMessages.ConnectionRetry)
@@ -827,59 +922,66 @@
"""
- #if msg.name != 'USocketTimeOut':
+ if msg.name == 'USocketTimeOut':
+ return True
+
self._log.debug(self._logMessages.MessageReceived + msg.pprint())
- if msg.name == Message.NodeHello:
- #connectionIdentifier = msg['ConnectionIdentifier']
- self.jobStop(FixedJobIdentifiers.ClientHello, msg)
-
- elif msg.name == Message.ProtocolError:
+
+ if msg.name == Message.ProtocolError:
code = msg['Code']
- #if code == ProtocolError.NoLateClientHellos:
- # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True)
- #elif code == ProtocolError.ClientHelloMustBeFirst:
- # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True)
- #else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- #TODO: inform caller
- raise ProtocolError(msg)
+ if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
+
+ elif code == ProtocolError.Shutdown:
+ if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg):
+
+ # ########################################
+ #TODO: ???
+
+ return True
+
else:
- return self.jobStop(identifier, msg, flagError=True)
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ #TODO: inform caller
+ raise ProtocolError(msg)
+ else:
+ return self.jobDispatchMessage(identifier, msg)
+
+ else:
+
+ # check if the is something like an identifier in the message
+ if msg.name == Message.TestDDAReply:
+ identifier = msg['Directory']
+ elif msg.name == Message.TestDDAComplete:
+ identifier = msg['Directory']
+ else:
+ identifier = msg.get('Identifier', None)
- if msg.name == Message.DataFound:
- return self.jobStop(msg['Identifier'], msg, flagError=True)
-
- elif msg.name == Message.EndListPeers:
- return self.jobStop(FixedJobIdentifiers.ListPeers, msg)
-
- elif msg.name == Message.GetFailed:
- return self.jobStop(msg['Identifier'], msg, flagError=True)
+ # dispatch to jobs with fixed identifiers
+ if identifier is None:
+
+ if msg.name == Message.NodeHello:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
- elif msg.name == Message.PersistentRequestRemoved:
- return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg)
-
- elif msg.name == Message.SimpleProgress:
- return self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+ elif msg.name == Message.EndListPeers:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
- elif msg.name == Message.TestDDAComplete:
- return self.jobStop(msg['Directory'], msg)
+ elif msg.name == Message.Peer:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
- elif msg.name == Message.TestDDAReply:
- return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
+ # more here.....
+
+ else:
+ raise ValueError('Unhandled message: ' + msg.name)
+
+ else:
+ return self.jobDispatchMessage(identifier, msg)
+
+ raise RuntimeError('Should not have endet here: %s' % msg.name)
- elif msg.name == Message.IdentifierCollision:
- # identifier is unique to us, but for some reason not unique to the node
- #TODO: maybe for future implementations find a way to automise
- # reinsertion and a notification via a handleJobChanged() for the
- # caller to overwrite
- return self.jobStop(msg['Identifier'], msg, flagError=True)
-
- elif msg.name == Message.Peer:
- return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
-
- return False
+
#########################################################
@@ -905,6 +1007,9 @@
self._lock.acquire(True)
try:
if job.jobIdentifier in self._jobs['JobMapping']:
+ raise ValueError('Job with that identifier already present')
+
+ if job.jobIdentifier in self._jobs['JobMapping']:
raise ValueError('Duplicate job: %r' % job.jobIdentifier)
self._jobs['JobMapping'][job.jobIdentifier] = job
self._jobs['JobList'].append(job)
@@ -916,7 +1021,19 @@
if synchron:
while job.jobResult is None:
self.next()
+
+ def jobDispatchMessage(self, identifier, msg):
+ """Dispatches a message to a job
+ @param identifier: identifier of the job
+ @param msg: (Message) message to dispatch
+ @return: True if the message was handled, False otherwise
+ """
+ job = self.jobGet(identifier)
+ if job is not None:
+ return job.handleMessage(msg)
+ return False
+
def jobGet(self, identifier):
"""Returns a job given its identifier
@@ -943,36 +1060,15 @@
self._lock.release()
return result
-
- def jobNotify(self, identifier, handler, msg):
- """Notifies a job about an event while it is running
- @param identifier: identifier of the job to notify
- @param handler: (str) method of the job to call to handle the notification
- @param msg: Message() to pass to the job
- @return: True if a job was found to be notified, False otherwise
+
+ def jobRemove(self, identifier):
+ """Removes a job unconditionally
+ @param identifier: identifier of the job to remove
+ @return: True if the job was found, False otherwise
"""
self._lock.acquire(True)
try:
job = self._jobs['JobMapping'].get(identifier, None)
- finally:
- self._lock.release()
- if job is None:
- return False
- getattr(job, handler)(msg)
- return True
-
-
- #TODO: quite unclear when to remove a job
- def jobStop(self, identifier, msg, flagError=False):
- """Stops a job
- @param identifier: identifier of the job to stop
- @param msg: Message() to pass to the job as result
- @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse
- @return: True if a job was found to be stopped, False otherwise
- """
- self._lock.acquire(True)
- try:
- job = self._jobs['JobMapping'].get(identifier, None)
if job is not None:
self._jobs['JobList'].remove(job)
del self._jobs['JobMapping'][identifier]
@@ -981,10 +1077,9 @@
if job is None:
return False
self._log.info(self._logMessages.JobStop + job.jobMessage.name)
- job.handleStop(flagError, msg)
return True
-
+
#TODO: some info when all jobs are completed
def next(self):
"""Pumps the next message waiting
@@ -1059,8 +1154,13 @@
@note: use KeyboardInterrupt to stop prematurely
"""
try:
+ #n = 0
+
while self.hasJobsRunning():
+ #n += 1
+ #if n > 40: break
self.next()
+
except KeyboardInterrupt:
self._log(self._logMessages.KeyboardInterrupt)
self.close()
@@ -1117,7 +1217,9 @@
def getFile(self, job):
pass
+
+
#*****************************************************************************
#
#*****************************************************************************
@@ -1158,10 +1260,22 @@
def foo():
- job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
- c.jobAdd(job2)
+ job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip')
+
+
+ #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
+ #job.jobIdentifier = job.jobMessage['Identifier'] = 1
+ #job.jobMessage['Identifier'] = 1
+ #job.jobIdentifier = 1
+ c.jobAdd(job)
+
+ #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%201%281%29.jpg')
+ #job.jobMessage['Identifier'] = 1
+ #job.jobIdentifier = 1
+ #c.jobAdd(job)
+
c.run()
print '---------------------------'
- print job2.jobResult
+ print job.jobResult
print '---------------------------'
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|