SF.net SVN: fclient: [10] trunk/fclient/fclient_lib/fcp/fcp2_0.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <jU...@us...> - 2007-10-22 09:02:02
|
Revision: 10
http://fclient.svn.sourceforge.net/fclient/?rev=10&view=rev
Author: jUrner
Date: 2007-10-22 02:02:04 -0700 (Mon, 22 Oct 2007)
Log Message:
-----------
continued working on protocol implementation
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 13:37:51 UTC (rev 9)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10)
@@ -16,10 +16,10 @@
#**************************************************************
NameClient = 'Fcp20Client'
DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
-DefaultFcpPort = 9481
try:
DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
-except: pass
+except ValueError:
+ DefaultFcpPort = 9481
SocketTimeout = 0.1
class FixedJobIdentifiers:
@@ -33,13 +33,13 @@
class Priorities:
"""All priorities supported by the client"""
- Maximum = 0
- Interactive = 1
- SemiInteractive = 2
- Updatable = 3
- Bulk = 4
- Prefetch = 5
- Minimum = 6
+ Maximum = '0'
+ Interactive = '1'
+ SemiInteractive = '2'
+ Updatable = '3'
+ Bulk = '4'
+ Prefetch = '5'
+ Minimum = '6'
PriorityMin = Minimum
PriorityDefault = Bulk
@@ -471,42 +471,40 @@
class JobBase(object):
"""Base class for jobs"""
- _fcp_auto_remove_ = True
-
+
def __init__(self, fcpClient, identifier, message):
"""
@param fcpClient: FcpClient() instance
@param identifier: (str) identifier of the job
@param message: (Message) to send to the node whne the job ist started
- @ivar fcpClient: FcpClient() instance
- @ivar fcpError: holding the error message if an error was encountered while running
- the job, None otherwise
- @ivar fcpIdentifier: identifier of the job
- @ivar fcpMessage: initial message send to the node
- @ivar fcpResult: if no error was encountered, holding the result of the job when complete
- @ivar fcpTimeStart: time the job was started
- @ivar fcpTimeStop: time the job was stopped
+ @ivar jobClient: FcpClient() instance of the job
+ @ivar jobIdentifier: identifier of the job
+ @ivar jobMessage: message to be send to the node
+ @ivar jobResult: if no error was encountered, holding the result of the job when complete
+ @ivar jobTimeStart: time the job was started
+ @ivar jobTimeStop: time the job was stopped
"""
- self.fcpClient = fcpClient # FcpClient() instance the job belongs to
- self.fcpIdentifier = identifier # job identifier
- self.fcpMessage = message # message send to node
- self.fcpResult = None # job result (bool error, Message msg)
- self.fcpTimeStart = 0 # time the job was started
- self.fcpTimeStop = 0 # time the job was stopped
+ self.jobClient = fcpClient
+ self.jobIdentifier = identifier
+ self.jobMessage = message
+ self.jobResult = None
+ self.jobTimeStart = 0
+ self.jobTimeStop = 0
+
def handleStart(self):
"""Starts the job"""
- self.fcpResult = None
- self.fcpTimeStart = time.time()
- self.fcpClient.sendMessageEx(self.fcpMessage)
+ self.jobResult = None
+ self.jobTimeStart = time.time()
+ self.jobClient.sendMessageEx(self.jobMessage)
def handleStop(self, flagError, msg):
"""Called on job completion to stop the job
@param flagError: True if an error was encountered, False otherwise
@param msg: (Message) to pass to the job
"""
- self.fcpTimeStop = time.time()
- self.fcpResult = (flagError, msg)
+ self.jobTimeStop = time.time()
+ self.jobResult = (flagError, msg)
class JobClientHello(JobBase):
@@ -516,8 +514,7 @@
goes well, you will get a NodeHello in response.
"""
- _fcp_auto_remove_ = True
-
+
def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
"""
@param name: (str) connection name or None, to use an arbitrary name
@@ -538,8 +535,7 @@
"""Lists all known peers of the node
"""
- _fcp_auto_remove_ = True
-
+
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
message = Message(
Message.ListPeers,
@@ -561,13 +557,10 @@
On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
Note, that both members may be '' (empty string)
-
"""
- _fcp_auto_remove_ = False
-
-
- # idea is to provoke a GetFailed message and take mimetype and size from it
+
+ # idea is to provoke a GetFailed message and take mimetype and size from 'GetFailed'
def __init__(self, fcpClient, uri, **params):
"""
@param fcpClient: FcpClient() instance
@@ -584,7 +577,9 @@
Message.ClientGet,
Identifier=identifier,
URI=uri,
- MaxSize='0',
+ # suggested by Mathew Toseland to use about 32k for mimeType requests
+ # basic sizes of keys are: 1k for SSks and 32k for CHKs
+ MaxSize='32000',
ReturnType='none',
Verbosity='1',
**params
@@ -592,34 +587,77 @@
JobBase.__init__(self, fcpClient, identifier, message)
+ def getPrority(self):
+ return self.jobMessage.get('PriorityClass', Priorities.PriorityDefault)
+
+
+ def setPriority(self, priority):
+ if not priority in Priorities:
+ raise ValueError('Invalid priority: %r' % priority)
+ self.jobClient.sendMessage(
+ Message.ModifyPersistentRequest,
+ Identifier=self.jobIdentifier,
+ Global=fcpBool(False),
+ PriorityClass=priority,
+ )
+ # not shure if the response arrives in any case, so set it here
+ self.jobMessage['PriorityClass'] = priority
+
+
+ def stopRequest(self):
+ self.jobClient.sendMessage(
+ Message.RemovePersistentRequest,
+ Global=fcpBool(False),
+ Identifier=self.jobIdentifier,
+ )
+
+
+ def handlePriorityChanged(self):
+ self.jobMessage['PriorityClass'] = priority
+
def handleProgress(self, msg):
"""Handles the next progress made. Overwrite to process.
"""
+
+ def handleRequestRemoved(self, msg):
+ if self.jobClient.jobIsRunning(self.jobIdentifier):
+ self.jobClient.jobStop(self.jobIdentifier)
+
def handleStop(self, flagError, msg):
+ """called when a job is finished
+ @note: jobResult may contain an 'IdentifierCollision' message. Make shure
+ to handle this appropriately
+ """
JobBase.handleStop(self,flagError, msg)
if msg.name == Message.DataFound:
- self.fcpResult = (
+ self.jobResult = (
msg.get('Metadata.ContentType', ''),
msg.get('DataLength', '')
)
elif msg.name == Message.GetFailed:
if msg['Code'] == FetchError.TooBig:
- self.fcpResult = (False, msg)
- self.fcpResult = (
+ self.jobResult = (False, msg)
+ self.jobResult = (
msg.get('ExpectedMetadata.ContentType', ''),
msg.get('ExpectedDataLength', '')
)
-
+ # not shure when removing the request is actually required
+ # so send it anyway
+ self.jobClient.sendMessage(
+ Message.RemovePersistentRequest,
+ Global=fcpBool(False),
+ Identifier=self.jobIdentifier,
+ )
+
-
+
#TODO: handle case where directories are registered multiple times
class JobTestDDA(JobBase):
"""Tests a directory for read / write accesss
"""
- _fcp_auto_remove_ = False
-
+
def __init__(self, fcpClient, directory, read=True, write=True):
if not os.path.isdir(directory):
raise ValueError('No such directory: %r' % directory)
@@ -631,7 +669,7 @@
WantWriteDirectory=fcpBool(write),
)
JobBase.__init__(self, fcpClient, directory, message)
- self.fcpTmpFile = None
+ self.jobTmpFile = None
def handleTestDDAReply(self, msg):
@@ -644,14 +682,14 @@
if os.path.isfile(fpathWrite):
os.remove(fpathWrite)
else:
- self.fcpTmpFile = fpathWrite
+ self.jobTmpFile = fpathWrite
if fpathRead is not None:
readContent = saveReadFile(fpathRead)
if readContent is None:
readContent = ''
- self.fcpClient.sendMessage(
+ self.jobClient.sendMessage(
Message.TestDDAResponse,
Directory=msg['Directory'],
ReadContent=readContent,
@@ -660,8 +698,8 @@
def handleStop(self, flagError, msg):
JobBase.handleStop(self, flagError, msg)
- saveRemoveFile(self.fcpTmpFile)
- self.fcpTmpFile = None
+ saveRemoveFile(self.jobTmpFile)
+ self.jobTmpFile = None
#**************************************************************************
# fcp client
@@ -682,6 +720,7 @@
JobStop = 'Stopping job: '
JobsCompleted = 'All jobs completed'
+
KeyboardInterrupt = 'Keyboard interrupt'
SocketDead = 'Socket is dead'
@@ -710,10 +749,10 @@
self._isConnected = False
self._jobs = {
- 'all': {},
- 'pending': [], # ???
- 'running': [],
- 'complete': [], # ???
+ #TODO: check if JobList is still required
+ 'JobMapping': {},
+ 'JobList': [],
+ 'RegisteredDirectories': [],
}
self._errorHandler = errorHandler
self._log = logging.getLogger(name)
@@ -734,7 +773,7 @@
self._socket = None
- #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
+ #TODO: an iterator would be nice to enshure Guis stay responsitive in the call
def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
"""Establishes the connection to a freenet node
@param host: (str) host of th node
@@ -767,7 +806,7 @@
# as expected the socket simply breaks. So take it over.
job = JobClientHello(self)
self.jobAdd(job, synchron=True)
- error, msg = job.fcpResult
+ error, msg = job.jobResult
assert not error, 'Error should have been caught by handleMessage()'
return msg
@@ -782,9 +821,13 @@
def handleMessage(self, msg):
- """Handles the next message from the freenet node
- @param msg: Message() to handle
+ """Handles a message from the freenet node
+ @param msg: (Message) to handle
+ @return: True if the message was handled, False otherwise
"""
+
+
+ #if msg.name != 'USocketTimeOut':
self._log.debug(self._logMessages.MessageReceived + msg.pprint())
if msg.name == Message.NodeHello:
@@ -803,30 +846,57 @@
#TODO: inform caller
raise ProtocolError(msg)
else:
- self.jobStop(identifier, msg, flagError=True)
+ return self.jobStop(identifier, msg, flagError=True)
- elif msg.name == Message.Peer:
- self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
+ if msg.name == Message.DataFound:
+ return self.jobStop(msg['Identifier'], msg, flagError=True)
elif msg.name == Message.EndListPeers:
- self.jobStop(FixedJobIdentifiers.ListPeers, msg)
+ return self.jobStop(FixedJobIdentifiers.ListPeers, msg)
elif msg.name == Message.GetFailed:
- self.jobStop(msg['Identifier'], msg, flagError=True)
+ return self.jobStop(msg['Identifier'], msg, flagError=True)
+ elif msg.name == Message.PersistentRequestRemoved:
+ return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg)
+
elif msg.name == Message.SimpleProgress:
- self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+ return self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+ elif msg.name == Message.TestDDAComplete:
+ return self.jobStop(msg['Directory'], msg)
+
elif msg.name == Message.TestDDAReply:
- self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
+ return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
- elif msg.name == Message.TestDDAComplete:
- self.jobStop(msg['Directory'], msg)
+ elif msg.name == Message.IdentifierCollision:
+ # identifier is unique to us, but for some reason not unique to the node
+ #TODO: maybe for future implementations find a way to automise
+ # reinsertion and a notification via a handleJobChanged() for the
+ # caller to overwrite
+ return self.jobStop(msg['Identifier'], msg, flagError=True)
- elif msg.name == Message.IdentifierCollision:
- pass
+ elif msg.name == Message.Peer:
+ return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
+
+ return False
+ #########################################################
+ ## jobs
+ #########################################################
+ def hasJobsRunning(self):
+ """Checks if the client has running jobs
+ @return: (bool) True if so, False otherwise
+ """
+ self._lock.acquire(True)
+ try:
+ result = bool(self._jobs['JobList'])
+ finally:
+ self._lock.release()
+ return result
+
+
def jobAdd(self, job, synchron=False):
"""Adds a job to the client
@param job: (Job*) job to add
@@ -834,35 +904,62 @@
"""
self._lock.acquire(True)
try:
- if job.fcpIdentifier in self._jobs['all']:
- raise ValueError('Duplicate job: %r' % job.identifier)
- self._jobs['all'][job.fcpIdentifier] = job
- self._jobs['running'].append(job)
+ if job.jobIdentifier in self._jobs['JobMapping']:
+ raise ValueError('Duplicate job: %r' % job.jobIdentifier)
+ self._jobs['JobMapping'][job.jobIdentifier] = job
+ self._jobs['JobList'].append(job)
finally:
self._lock.release()
- self._log.info(self._logMessages.JobStart + job.fcpMessage.name)
+ self._log.info(self._logMessages.JobStart + job.jobMessage.name)
job.handleStart()
if synchron:
- while job.fcpResult is None:
+ while job.jobResult is None:
self.next()
+ def jobGet(self, identifier):
+ """Returns a job given its identifier
+ @param identifier: identifier of the job
+ @return: (Job*) instance or None, if no corrosponding job was found
+ """
+ self._lock.acquire(True)
+ try:
+ result = self._jobs['JobMapping'].get(identifier, None)
+ finally:
+ self._lock.release()
+ return result
+
+ def jobIsRunning(self, identifier):
+ """Checks if a job is running
+ @param identifier: identifier of the job
+ @return: True if so, False otherwise
+ """
+ self._lock.acquire(True)
+ try:
+ result = identifier in self._jobs['JobMapping']
+ finally:
+ self._lock.release()
+ return result
+
+
def jobNotify(self, identifier, handler, msg):
"""Notifies a job about an event while it is running
@param identifier: identifier of the job to notify
@param handler: (str) method of the job to call to handle the notification
@param msg: Message() to pass to the job
+ @return: True if a job was found to be notified, False otherwise
"""
self._lock.acquire(True)
try:
- job = self._jobs['all'].get(identifier, None)
+ job = self._jobs['JobMapping'].get(identifier, None)
finally:
self._lock.release()
if job is None:
- raise ValueError('No such job: %r' % identifier)
+ return False
getattr(job, handler)(msg)
+ return True
#TODO: quite unclear when to remove a job
@@ -871,25 +968,23 @@
@param identifier: identifier of the job to stop
@param msg: Message() to pass to the job as result
@param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse
+ @return: True if a job was found to be stopped, False otherwise
"""
self._lock.acquire(True)
try:
- job = self._jobs['all'].get(identifier, None)
+ job = self._jobs['JobMapping'].get(identifier, None)
if job is not None:
- self._jobs['running'].remove(job)
- if job._fcp_auto_remove_:
- del self._jobs['all'][identifier]
- else:
- self._jobs['complete'].append(job)
+ self._jobs['JobList'].remove(job)
+ del self._jobs['JobMapping'][identifier]
finally:
self._lock.release()
-
if job is None:
- raise ValueError('No such job: %r' % identifier)
- self._log.info(self._logMessages.JobStop + job.fcpMessage.name)
+ return False
+ self._log.info(self._logMessages.JobStop + job.jobMessage.name)
job.handleStop(flagError, msg)
+ return True
+
-
#TODO: some info when all jobs are completed
def next(self):
"""Pumps the next message waiting
@@ -963,21 +1058,8 @@
"""Runs the client untill all jobs passed to it are completed
@note: use KeyboardInterrupt to stop prematurely
"""
-
- # TODO:
- # x. push pending jobs
try:
- while True:
- if not self._lock.acquire(False):
- continue
-
- try:
- if not self._jobs['pending'] and not self._jobs['running']:
- self._log.info(self._logMessages.JobsCompleted)
- break
- finally:
- self._lock.release()
-
+ while self.hasJobsRunning():
self.next()
except KeyboardInterrupt:
self._log(self._logMessages.KeyboardInterrupt)
@@ -1026,6 +1108,16 @@
""""""
self._log.setLevel(verbosity)
+
+ ########################################################
+ ##
+ ########################################################
+ def getFileInfo(self, job):
+ pass
+
+ def getFile(self, job):
+ pass
+
#*****************************************************************************
#
#*****************************************************************************
@@ -1039,8 +1131,7 @@
c.run()
print '---------------------------'
- print job1.fcpError
- print job1.fcpResult
+ print job1.jobResult
print '---------------------------'
# should raise
#foo()
@@ -1052,7 +1143,7 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpResult
+ print job2.jobResult
print '---------------------------'
#foo()
@@ -1061,7 +1152,7 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpResult
+ print job2.jobResult
print '---------------------------'
#foo()
@@ -1071,6 +1162,6 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpResult
+ print job2.jobResult
print '---------------------------'
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|