SF.net SVN: fclient: [7] trunk/fclient/fclient_lib/fcp/fcp20.py
Status: Pre-Alpha
Brought to you by:
jurner
From: <jU...@us...> - 2007-10-20 10:00:17
|
Revision: 7 http://fclient.svn.sourceforge.net/fclient/?rev=7&view=rev Author: jUrner Date: 2007-10-20 03:00:20 -0700 (Sat, 20 Oct 2007) Log Message: ----------- continued working on fcp protocol implementation Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp20.py Modified: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 17:54:01 UTC (rev 6) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:00:20 UTC (rev 7) @@ -288,6 +288,21 @@ # classes #********************************************************************** class FcpSocketError(Exception): pass +class FcpProtocolError(Exception): + def __init__(self, msg): + """ + @param msg: (Message) ProtocolError message + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + + def __str__(self): return self.value + + + class FcpUri(object): """Wrapper class for freenet uris""" @@ -467,8 +482,10 @@ self.fcpIdentifier = identifier # self.fcpMessage = message # message send to node self.fcpResult = None # job result - self.fcpTime = 0 # start time (will hld duration whern the job is complte) - + self.fcpTimeStart = 0 # time the job was started + self.fcpTimeStop = 0 # time the job was stopped + self.fcpStopped = False + def displayName(self): """Returns the display name of the job @return: (str) display name @@ -477,14 +494,16 @@ def start(self): """Starts the job""" - self.fcpTime = time.time() + self.fcpStopped = False + self.fcpTimeStart = time.time() self.fcpClient.sendMessageEx(self.fcpMessage) def error(self, msg): """Called on job completion if an error was encounterd while runnng the job @param msg: (Message) to pass to the job """ - self.fcpTime = time.time() - self.fcpTime + self.fcpStopped = True + self.fcpTimeStop = time.time() self.fcpError = msg self.fcpResult = None @@ -492,7 +511,8 @@ """Called on job completion to stop the job @param msg: (Message) to pass to the job """ - self.fcpTime = time.time() - self.fcpTime + self.fcpStopped = True + self.fcpTimeStop = time.time() self.fcpError = None self.fcpResult = msg @@ -519,7 +539,7 @@ JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) def displayName(self): - return 'NodeHello' + return 'ClientHello' class JobListPeers(JobBase): @@ -531,8 +551,8 @@ def __init__(self, fcpClient, withMetaData=False, withVolantile=False): message = Message( Messages.ListPeers, - WithMetadata='true' if withMetaData else 'false', - WithVolatile='true' if withVolantile else 'false', + WithMetadata=fcpBool(withMetaData), + WithVolatile=fcpBool(withVolantile), ) JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) @@ -587,8 +607,7 @@ return 'GetFileInfo' def handleProgress(self, msg): - """Handles the next progress made of a 'SimpleProgress' message - while the job is running. Overwrite to process. + """Handles the next progress made. Overwrite to process. """ @@ -625,6 +644,9 @@ _fcp_auto_remove_ = False def __init__(self, fcpClient, directory, read=True, write=True): + if not os.path.isdir(directory): + raise ValueError('No such directory: %r' % directory) + message = Message( Messages.TestDDARequest, Directory=directory, @@ -660,6 +682,7 @@ ReadContent=readContent, ) + def error(self, msg): JobBase.error(self, msg) saveRemoveFile(self.fcpTmpFile) @@ -728,7 +751,7 @@ self._logMessages = logMessages self._lock = thread.allocate_lock() self._socket = None - + self.setVerbosity(verbosity) atexit.register(self.close) @@ -749,8 +772,9 @@ @param port: (int) port of the node @param repeat: (int) how many seconds try to connect before giving up @param timeout: (int) how much time to wait before another attempt to connect - @return: True if successful, False otherwise + @return: (Message) NodeHello if successful,None otherwise """ + self._clientHello = None self._log.info(self._logMessages.Connecting) # poll untill freenet responds @@ -768,7 +792,10 @@ pass else: self._log.info(self._logMessages.Connected) - return True + job = JobClientHello(self) + self.jobAdd(job, synchron=True) + assert job.fcpError is None, 'Error should have been caught by handleMessage()' + return job.fcpResult self._log.info(self._logMessages.ConnectionRetry) @@ -777,7 +804,7 @@ time.sleep(timeout) self._log.info(self._logMessages.ConnectingFailed) - return False + return None def handleMessage(self, msg): @@ -792,15 +819,17 @@ elif msg.name == Messages.ProtocolError: code = msg['Code'] - if code == ProtocolErrors.NoLateClientHellos: - self.jobStop(JobIdentifiers.ClientHello, msg, error=True) - + #if code == ProtocolErrors.NoLateClientHellos: + # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + #elif code == ProtocolErrors.ClientHelloMustBeFirst: + # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + #else: + identifier = msg.get('Identifier', None) + if identifier is None: + #TODO: inform caller + raise FcpProtocolError(msg) else: - identifier = msg.get('Identifier', None) - if identifier is None: - pass # raise ??? - else: - self.jobStop(identifier, msg, error=True) + self.jobStop(identifier, msg, error=True) elif msg.name == Messages.Peer: self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg) @@ -824,9 +853,10 @@ pass - def jobAdd(self, job): + def jobAdd(self, job, synchron=False): """Adds a job to the client @param job: (Job*) job to add + @param synchron: if True, wait untill the job is completed, if False return emidiately """ self._lock.acquire(True) try: @@ -839,6 +869,10 @@ self._log.info(self._logMessages.JobStart + job.displayName()) job.start() + if synchron: + while not job.fcpStopped: + self.next() + def jobNotify(self, identifier, handler, msg): @@ -857,6 +891,7 @@ getattr(job, handler)(msg) + #TODO: quite unclear when to remove a job def jobStop(self, identifier, msg, error=False): """Stops a job @param identifier: identifier of the job to stop @@ -891,6 +926,7 @@ """ msg = self.readMessage() self.handleMessage(msg) + return msg def readMessage(self): """Reads the next message directly from the socket and dispatches it @@ -1026,32 +1062,37 @@ if __name__ == '__main__': c = FcpClient(name='test', verbosity=logging.DEBUG) if c.connect(): - job1 = JobClientHello(c) - c.jobAdd(job1) + def foo(): + job1 = JobClientHello(c) + c.jobAdd(job1) - c.run() - print '---------------------------' - print job1.fcpError - print job1.fcpResult - print job1.fcpTime - print '---------------------------' + c.run() + print '---------------------------' + print job1.fcpError + print job1.fcpResult + print job1.fcpTime + print '---------------------------' + #foo() + + def foo(): + d = os.path.dirname(os.path.abspath(__file__)) + job2 = JobTestDDA(c, d) + c.jobAdd(job2) + c.run() + print '---------------------------' + print job2.fcpError + print job2.fcpResult + print job2.fcpTime + print '---------------------------' - job2 = JobTestDDA(c, os.path.dirname(__file__)) - c.jobAdd(job2) - c.run() - print '---------------------------' - print job1.fcpError - print job2.fcpResult - print job2.fcpTime - print '---------------------------' - + def foo(): job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') c.jobAdd(job2) c.run() print '---------------------------' - print job1.fcpError + print job2.fcpError print job2.fcpResult print job2.fcpTime print '---------------------------' - + #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |