SF.net SVN: fclient: [7] trunk/fclient/fclient_lib/fcp/fcp20.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <jU...@us...> - 2007-10-20 10:00:17
|
Revision: 7
http://fclient.svn.sourceforge.net/fclient/?rev=7&view=rev
Author: jUrner
Date: 2007-10-20 03:00:20 -0700 (Sat, 20 Oct 2007)
Log Message:
-----------
continued working on fcp protocol implementation
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp20.py
Modified: trunk/fclient/fclient_lib/fcp/fcp20.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 17:54:01 UTC (rev 6)
+++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:00:20 UTC (rev 7)
@@ -288,6 +288,21 @@
# classes
#**********************************************************************
class FcpSocketError(Exception): pass
+class FcpProtocolError(Exception):
+ def __init__(self, msg):
+ """
+ @param msg: (Message) ProtocolError message
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+
+ def __str__(self): return self.value
+
+
+
class FcpUri(object):
"""Wrapper class for freenet uris"""
@@ -467,8 +482,10 @@
self.fcpIdentifier = identifier #
self.fcpMessage = message # message send to node
self.fcpResult = None # job result
- self.fcpTime = 0 # start time (will hld duration whern the job is complte)
-
+ self.fcpTimeStart = 0 # time the job was started
+ self.fcpTimeStop = 0 # time the job was stopped
+ self.fcpStopped = False
+
def displayName(self):
"""Returns the display name of the job
@return: (str) display name
@@ -477,14 +494,16 @@
def start(self):
"""Starts the job"""
- self.fcpTime = time.time()
+ self.fcpStopped = False
+ self.fcpTimeStart = time.time()
self.fcpClient.sendMessageEx(self.fcpMessage)
def error(self, msg):
"""Called on job completion if an error was encounterd while runnng the job
@param msg: (Message) to pass to the job
"""
- self.fcpTime = time.time() - self.fcpTime
+ self.fcpStopped = True
+ self.fcpTimeStop = time.time()
self.fcpError = msg
self.fcpResult = None
@@ -492,7 +511,8 @@
"""Called on job completion to stop the job
@param msg: (Message) to pass to the job
"""
- self.fcpTime = time.time() - self.fcpTime
+ self.fcpStopped = True
+ self.fcpTimeStop = time.time()
self.fcpError = None
self.fcpResult = msg
@@ -519,7 +539,7 @@
JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
def displayName(self):
- return 'NodeHello'
+ return 'ClientHello'
class JobListPeers(JobBase):
@@ -531,8 +551,8 @@
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
message = Message(
Messages.ListPeers,
- WithMetadata='true' if withMetaData else 'false',
- WithVolatile='true' if withVolantile else 'false',
+ WithMetadata=fcpBool(withMetaData),
+ WithVolatile=fcpBool(withVolantile),
)
JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
@@ -587,8 +607,7 @@
return 'GetFileInfo'
def handleProgress(self, msg):
- """Handles the next progress made of a 'SimpleProgress' message
- while the job is running. Overwrite to process.
+ """Handles the next progress made. Overwrite to process.
"""
@@ -625,6 +644,9 @@
_fcp_auto_remove_ = False
def __init__(self, fcpClient, directory, read=True, write=True):
+ if not os.path.isdir(directory):
+ raise ValueError('No such directory: %r' % directory)
+
message = Message(
Messages.TestDDARequest,
Directory=directory,
@@ -660,6 +682,7 @@
ReadContent=readContent,
)
+
def error(self, msg):
JobBase.error(self, msg)
saveRemoveFile(self.fcpTmpFile)
@@ -728,7 +751,7 @@
self._logMessages = logMessages
self._lock = thread.allocate_lock()
self._socket = None
-
+
self.setVerbosity(verbosity)
atexit.register(self.close)
@@ -749,8 +772,9 @@
@param port: (int) port of the node
@param repeat: (int) how many seconds try to connect before giving up
@param timeout: (int) how much time to wait before another attempt to connect
- @return: True if successful, False otherwise
+ @return: (Message) NodeHello if successful,None otherwise
"""
+ self._clientHello = None
self._log.info(self._logMessages.Connecting)
# poll untill freenet responds
@@ -768,7 +792,10 @@
pass
else:
self._log.info(self._logMessages.Connected)
- return True
+ job = JobClientHello(self)
+ self.jobAdd(job, synchron=True)
+ assert job.fcpError is None, 'Error should have been caught by handleMessage()'
+ return job.fcpResult
self._log.info(self._logMessages.ConnectionRetry)
@@ -777,7 +804,7 @@
time.sleep(timeout)
self._log.info(self._logMessages.ConnectingFailed)
- return False
+ return None
def handleMessage(self, msg):
@@ -792,15 +819,17 @@
elif msg.name == Messages.ProtocolError:
code = msg['Code']
- if code == ProtocolErrors.NoLateClientHellos:
- self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
-
+ #if code == ProtocolErrors.NoLateClientHellos:
+ # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+ #elif code == ProtocolErrors.ClientHelloMustBeFirst:
+ # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+ #else:
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ #TODO: inform caller
+ raise FcpProtocolError(msg)
else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- pass # raise ???
- else:
- self.jobStop(identifier, msg, error=True)
+ self.jobStop(identifier, msg, error=True)
elif msg.name == Messages.Peer:
self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg)
@@ -824,9 +853,10 @@
pass
- def jobAdd(self, job):
+ def jobAdd(self, job, synchron=False):
"""Adds a job to the client
@param job: (Job*) job to add
+ @param synchron: if True, wait untill the job is completed, if False return emidiately
"""
self._lock.acquire(True)
try:
@@ -839,6 +869,10 @@
self._log.info(self._logMessages.JobStart + job.displayName())
job.start()
+ if synchron:
+ while not job.fcpStopped:
+ self.next()
+
def jobNotify(self, identifier, handler, msg):
@@ -857,6 +891,7 @@
getattr(job, handler)(msg)
+ #TODO: quite unclear when to remove a job
def jobStop(self, identifier, msg, error=False):
"""Stops a job
@param identifier: identifier of the job to stop
@@ -891,6 +926,7 @@
"""
msg = self.readMessage()
self.handleMessage(msg)
+ return msg
def readMessage(self):
"""Reads the next message directly from the socket and dispatches it
@@ -1026,32 +1062,37 @@
if __name__ == '__main__':
c = FcpClient(name='test', verbosity=logging.DEBUG)
if c.connect():
- job1 = JobClientHello(c)
- c.jobAdd(job1)
+ def foo():
+ job1 = JobClientHello(c)
+ c.jobAdd(job1)
- c.run()
- print '---------------------------'
- print job1.fcpError
- print job1.fcpResult
- print job1.fcpTime
- print '---------------------------'
+ c.run()
+ print '---------------------------'
+ print job1.fcpError
+ print job1.fcpResult
+ print job1.fcpTime
+ print '---------------------------'
+ #foo()
+
+ def foo():
+ d = os.path.dirname(os.path.abspath(__file__))
+ job2 = JobTestDDA(c, d)
+ c.jobAdd(job2)
+ c.run()
+ print '---------------------------'
+ print job2.fcpError
+ print job2.fcpResult
+ print job2.fcpTime
+ print '---------------------------'
- job2 = JobTestDDA(c, os.path.dirname(__file__))
- c.jobAdd(job2)
- c.run()
- print '---------------------------'
- print job1.fcpError
- print job2.fcpResult
- print job2.fcpTime
- print '---------------------------'
-
+ def foo():
job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job1.fcpError
+ print job2.fcpError
print job2.fcpResult
print job2.fcpTime
print '---------------------------'
-
+ #foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|