fclient-commit Mailing List for fclient (Page 39)
Status: Pre-Alpha
Brought to you by:
jurner
You can subscribe to this list here.
| 2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(23) |
Nov
(54) |
Dec
|
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 2008 |
Jan
(17) |
Feb
(209) |
Mar
(63) |
Apr
(31) |
May
(7) |
Jun
(39) |
Jul
(390) |
Aug
(122) |
Sep
(6) |
Oct
|
Nov
|
Dec
|
|
From: <jU...@us...> - 2007-10-25 14:04:41
|
Revision: 12
http://fclient.svn.sourceforge.net/fclient/?rev=12&view=rev
Author: jUrner
Date: 2007-10-25 05:59:16 -0700 (Thu, 25 Oct 2007)
Log Message:
-----------
bit more work on protocol
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-25 12:59:16 UTC (rev 12)
@@ -1,5 +1,16 @@
+'''Freenet client protocol 2.0 implementation
+'''
+#NOTE:
+#
+# downloading data to disk is not supported st the moment. TestDDA code is quite unwritable
+# and as far as I can see there are plans to get rid of it. So wait...
+#
+#
+
+
import atexit
+import base64
import logging
import os
import re
@@ -187,7 +198,7 @@
"""Returns a new unique identifier
@return: (str) uuid
"""
- return str(uuid.uuid4())
+ return 'fclient::' + str(uuid.uuid4())
def pythonBool(fcpBool):
@@ -251,7 +262,7 @@
@param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat')
@return: (string) whatever freenet returns
"""
- #TODO: on windows it may be necessary to hide the comand window
+ #TODO: on windows it may be necessary to hide the command window
p = subprocess.Popen(
args=cmdline,
shell=True,
@@ -368,10 +379,10 @@
ModifyPeerNote = 'ModifyPeerNote'
RemovePeer = 'RemovePeer'
GetNode = 'GetNode'
- GetConfig = 'GetConfig' # (since 1027)
+ GetConfig = 'GetConfig' # (since 1027)
ModifyConfig = 'ModifyConfig' # (since 1027)
- TestDDARequest = 'TestDDARequest' # (since 1027)
- TestDDAResponse = 'TestDDAResponse' # (since 1027)
+ TestDDARequest = 'TestDDARequest' # (since 1027)
+ TestDDAResponse = 'TestDDAResponse' # (since 1027)
GenerateSSK = 'GenerateSSK'
ClientPut = 'ClientPut'
ClientPutDiskDir = 'ClientPutDiskDir'
@@ -433,7 +444,7 @@
self.params = params
def toString(self):
- """Returns a string with the formated message, ready to be send"""
+ """Returns the message as formated string ready to be send"""
# TODO: "Data" not yet implemented
out = [self.name, ]
for param, value in self.params.items():
@@ -447,7 +458,6 @@
for param, value in self.params.items():
out.append('>> %s=%s' % (param, value))
out.append('>>EndMessage')
- out.append('')
return '\n'.join(out)
def __getitem__(self, name):
@@ -508,6 +518,8 @@
self.jobTimeStart = time.time()
self.jobClient.sendMessageEx(self.jobMessage)
+
+ # XXX
def handleStop(self, flagError, msg):
"""Called on job completion to stop the job
@param flagError: True if an error was encountered, False otherwise
@@ -569,13 +581,18 @@
"""
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
+ """
+ @param withMetaData: include meta data for each peer?
+ @param withVolantile: include volantile data for each peer?
+ @ivar jobResult: on job completion, will be a list containing all perrs as one 'Peer' message for each peer
+ """
message = Message(
Message.ListPeers,
WithMetadata=fcpBool(withMetaData),
WithVolatile=fcpBool(withVolantile),
)
JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
-
+
def handleMessage(self,msg):
if msg.name == Message.EndListPeers:
@@ -585,17 +602,72 @@
else:
raise ValueError('Unexpected message: %s' % msg.name)
+
def handlePeer(self, msg):
+ if self.jobResult is None:
+ self.jobResult = [msg, ]
+ else:
+ self.jobResult.append(msg)
return True
def handleEndListPeers(self, msg):
self.jobTimeStop = time.time()
- self.jobResult = msg
+ if self.jobResult is None:
+ self.jobResult = []
self.jobClient.jobRemove(self.jobIdentifier)
return True
+
+class JobListPeerNotes(JobBase):
+ """Lists all notes associated to a peer of the node
+ """
+
+ def __init__(self, fcpClient, identifier):
+ """
+ @param identifier: identifier of the peer to list notes for (peer identity)
+ @ivar jobResult: on job completion will be a list containing all notes associated to the peer
+
+ @note: notes are only available for darknet peers (opennet == false)
+ """
+
+ message = Message(
+ Message.ListPeerNotes,
+ NodeIdentifier=identifier
+ )
+ JobBase.__init__(self, fcpClient, identifier, message)
+
+
+ def handleMessage(self,msg):
+ if msg.name == Message.EndListPeerNotes:
+ return self.handleEndListPeerNotes(msg)
+ elif msg.name == Message.PeerNote:
+ return self.handlePeerNote(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+ def handlePeerNote(self, msg):
+ note = msg.get('NoteText', '')
+ if note:
+ note = base64.decodestring(note)
+ if self.jobResult is None:
+ self.jobResult = [note, ]
+ else:
+ self.jobResult.append(note)
+ return True
+
+
+ def handleEndListPeerNotes(self, msg):
+ self.jobTimeStop = time.time()
+ if self.jobResult is None:
+ self.jobResult = []
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
+
+
#TODO: identifier collisions are not yet handled
class JobGetFileInfo(JobBase):
"""Tries to retieve information about a file. If everything goes well
@@ -735,7 +807,13 @@
"""
- def __init__(self, fcpClient, directory, read=True, write=True):
+ def __init__(self, fcpClient, directory, read=False, write=False):
+ """
+
+ @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed)
+ """
+
+
if not os.path.isdir(directory):
raise ValueError('No such directory: %r' % directory)
@@ -747,8 +825,7 @@
)
JobBase.__init__(self, fcpClient, directory, message)
self.jobTmpFile = None
-
-
+
def handleMessage(self, msg):
if msg.name == Message.TestDDAReply:
@@ -786,13 +863,50 @@
def handleTestDDAComplete(self, msg):
self.jobTimeStop = time.time()
- self.jobResult = msg
+ self.jobResult = (
+ pythonBool(msg.get('ReadDirectoryAllowed', 'false')),
+ pythonBool(msg.get('WriteDirectoryAllowed', 'false')),
+ )
saveRemoveFile(self.jobTmpFile)
self.jobTmpFile = None
self.jobClient.jobRemove(self.jobIdentifier)
return True
+
+class JobGenerateSSK(JobBase):
+ """Job to generate a SSK key pair
+ """
+
+
+ def __init__(self, fcpClient):
+ """
+ @ivar jobResult: on job completion, a tuple(insertURI, requestURI) of the generated
+ SSK key
+ """
+
+ identifier = newIdentifier()
+ message = Message(
+ Message.GenerateSSK,
+ Identifier=identifier,
+ )
+ JobBase.__init__(self, fcpClient, identifier, message)
+
+
+ def handleMessage(self, msg):
+ if msg.name == Message.SSKKeypair:
+ return self.handleSSKKeypair(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+
+ def handleSSKKeypair(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = (msg['InsertURI'], msg['RequestURI'])
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
#**************************************************************************
# fcp client
#**************************************************************************
@@ -840,20 +954,19 @@
with two params: SocketError + details. When the handler is called the client
is already closed.
@param verbosity: verbosity level for debugging
- @param logMessages: LogMessages class containing messages
+ @param logMessages: LogMessages class containing message strings
"""
self._isConnected = False
self._jobs = {
- #TODO: check if JobList is still required
- 'JobMapping': {},
- 'JobList': [],
+ 'Jobs': {},
+ 'PendingJobs': [],
'RegisteredDirectories': [],
}
- self._errorHandler = errorHandler
+ self._errorHandler = errorHandler #TODO: check!
self._log = logging.getLogger(name)
self._logMessages = logMessages
- self._lock = thread.allocate_lock() # lovk when resources are accessed
+ self._lock = thread.allocate_lock() # lock when resources are accessed
self._socket = None
self.setVerbosity(verbosity)
@@ -933,7 +1046,7 @@
if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst:
return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
- elif code == ProtocolError.Shutdown:
+ elif code == ProtocolError.ShuttingDown:
if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg):
# ########################################
@@ -956,6 +1069,11 @@
identifier = msg['Directory']
elif msg.name == Message.TestDDAComplete:
identifier = msg['Directory']
+ elif msg.name == Message.PeerNote:
+ identifier = msg['NodeIdentifier']
+ elif msg.name == Message.EndListPeerNotes:
+ identifier = msg['NodeIdentifier']
+
else:
identifier = msg.get('Identifier', None)
@@ -971,6 +1089,13 @@
elif msg.name == Message.Peer:
return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
+ #elif msg.name == Message.PeerNote:
+ # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg)
+
+ #elif msg.name == Message.EndListPeerNotes:
+ # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg)
+
+
# more here.....
else:
@@ -993,9 +1118,12 @@
"""
self._lock.acquire(True)
try:
- result = bool(self._jobs['JobList'])
+ result = self._jobs['Jobs'] or self._jobs['PendingJobs']
finally:
self._lock.release()
+
+
+
return result
@@ -1006,20 +1134,16 @@
"""
self._lock.acquire(True)
try:
- if job.jobIdentifier in self._jobs['JobMapping']:
- raise ValueError('Job with that identifier already present')
-
- if job.jobIdentifier in self._jobs['JobMapping']:
+ if job.jobIdentifier in self._jobs['Jobs']:
raise ValueError('Duplicate job: %r' % job.jobIdentifier)
- self._jobs['JobMapping'][job.jobIdentifier] = job
- self._jobs['JobList'].append(job)
+ self._jobs['Jobs'][job.jobIdentifier] = job
finally:
self._lock.release()
self._log.info(self._logMessages.JobStart + job.jobMessage.name)
job.handleStart()
if synchron:
- while job.jobResult is None:
+ while self.jobGet(job.jobIdentifier):
self.next()
@@ -1042,7 +1166,7 @@
"""
self._lock.acquire(True)
try:
- result = self._jobs['JobMapping'].get(identifier, None)
+ result = self._jobs['Jobs'].get(identifier, None)
finally:
self._lock.release()
return result
@@ -1055,7 +1179,7 @@
"""
self._lock.acquire(True)
try:
- result = identifier in self._jobs['JobMapping']
+ result = identifier in self._jobs['Jobs']
finally:
self._lock.release()
return result
@@ -1068,10 +1192,9 @@
"""
self._lock.acquire(True)
try:
- job = self._jobs['JobMapping'].get(identifier, None)
+ job = self._jobs['Jobs'].get(identifier, None)
if job is not None:
- self._jobs['JobList'].remove(job)
- del self._jobs['JobMapping'][identifier]
+ del self._jobs['Jobs'][identifier]
finally:
self._lock.release()
if job is None:
@@ -1209,17 +1332,86 @@
self._log.setLevel(verbosity)
+
########################################################
##
########################################################
def getFileInfo(self, job):
pass
- def getFile(self, job):
- pass
+
+
+ #########################################################
+ ## boilerplate code to tackle TestDDA
+ ##
+ ## ...but I don't trust it ;-) I was not yet alble to wrap my head around
+ ## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not.
+ ##
+ ## Another problem is that there is no way to know when a directory is no longer
+ ## needed. And I fon't want to write code in a Gui to tackle a problem that will hopefully
+ ## go away in the near future.
+ ##
+ ## see: https://bugs.freenetproject.org/view.php?id=1753
+ ##
+ #########################################################
+ def testWriteAccess(self, directory):
+ canRead, canWrite = False, False
+ result = self._jobs.get('RegisteredDirectories', None)
+ if result is not None:
+ canRead, canWrite = result
+ if not canWrite:
+ job = JobTestDDA(directory, read=canRead, write=True)
+ self.addJob(job, synchron=True)
+ canWrite = job.jobResult[1]
+ self._jobs['RegisteredDirectories'] = (canRead, canWrite)
+ return canWrite
+
+ def testReadAccess(self, directory):
+ canRead, canWrite = False, False
+ result = self._jobs.get('RegisteredDirectories', None)
+ if result is not None:
+ canRead, canWrite = result
+ if not canRead:
+ job = JobTestDDA(directory, read=True, write=canWrite)
+ self.addJob(job, synchron=True)
+ canRead = job.jobResult[0]
+ self._jobs['RegisteredDirectories'] = (canRead, canWrite)
+ return canRead
+
+
+ def downloadFile(self, directory, job):
+ if not os.path.isdir(directory):
+ raise IOError('No such directory')
+
+ self._jobs['PendingJobs'].append(job)
+ try:
+ result = self.testWriteAccess(directory)
+ if result:
+ self.addJob(job)
+ finally:
+ self._jobs['PendingJobs'].remove(job)
+ return result
+
+ def uploadFile(self, directory, job):
+ if not os.path.isdir(directory):
+ raise IOError('No such directory')
+
+ self._jobs['PendingJobs'].append(job)
+ try:
+ result = self.testReadAccess(directory)
+ if result:
+ self.addJob(job)
+ finally:
+ self._jobs['PendingJobs'].remove(job)
+ return result
+
+
+
+
+
#*****************************************************************************
#
#*****************************************************************************
@@ -1239,7 +1431,16 @@
#foo()
+
def foo():
+ job = JobGenerateSSK(c)
+ c.jobAdd(job, synchron=True)
+ print job.jobResult
+ foo()
+
+
+
+ def foo():
d = os.path.dirname(os.path.abspath(__file__))
job2 = JobTestDDA(c, d)
c.jobAdd(job2)
@@ -1250,12 +1451,21 @@
#foo()
def foo():
- job2 = JobListPeers(c)
- c.jobAdd(job2)
+ job = JobListPeers(c)
+ c.jobAdd(job)
c.run()
print '---------------------------'
- print job2.jobResult
+ print job.jobResult
print '---------------------------'
+
+
+ for peer in job.jobResult:
+ if not pythonBool(peer['opennet']):
+ job = JobListPeerNotes(c, peer['identity'])
+ c.jobAdd(job, synchron=True)
+ print '>>', job.jobResult
+ #.get('NoteText')
+
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-23 09:19:10
|
Revision: 11
http://fclient.svn.sourceforge.net/fclient/?rev=11&view=rev
Author: jUrner
Date: 2007-10-23 02:19:12 -0700 (Tue, 23 Oct 2007)
Log Message:
-----------
combed over message dispatch
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11)
@@ -14,7 +14,6 @@
#**************************************************************
# consts
#**************************************************************
-NameClient = 'Fcp20Client'
DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
try:
DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
@@ -28,6 +27,13 @@
"""
ClientHello = 'ClientHello'
ListPeers = 'ListPeers'
+ ListPeerNotes = 'ListPeerNotes'
+ GetNode = 'GetNode'
+ GetConfig = 'GetConfig'
+ ModifyConfig = 'ModifyConfig'
+ WatchGlobal = 'WatchGlobal'
+ Shutdown = 'Shutdown'
+
class Priorities:
@@ -492,6 +498,10 @@
self.jobTimeStop = 0
+ def handleMessage(self, msg):
+ return False
+
+
def handleStart(self):
"""Starts the job"""
self.jobResult = None
@@ -514,7 +524,6 @@
goes well, you will get a NodeHello in response.
"""
-
def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
"""
@param name: (str) connection name or None, to use an arbitrary name
@@ -529,13 +538,36 @@
def displayName(self):
return 'ClientHello'
+
+
+ def handleMessage(self, msg):
+ if msg.name == Message.NodeHello:
+ return self.handleNodeHello(msg)
+ elif msg.name == Message.ProtocolError:
+ return self.handleProtocolError(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+ def handleNodeHello(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = msg
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
+ def handleProtocolError(self, msg):
+ raise ProtocolError(msg)
+
+
+
+
+
class JobListPeers(JobBase):
"""Lists all known peers of the node
"""
-
-
+
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
message = Message(
Message.ListPeers,
@@ -545,13 +577,26 @@
JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
- def handlePeer(self,msg):
- """Handles the next peer send by the node in form of a 'Peer' message
- while the job is running. Overwrite to process.
- """
+ def handleMessage(self,msg):
+ if msg.name == Message.EndListPeers:
+ return self.handleEndListPeers(msg)
+ elif msg.name == Message.Peer:
+ return self.handlePeer(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+ def handlePeer(self, msg):
+ return True
+
+ def handleEndListPeers(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = msg
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
-
+#TODO: identifier collisions are not yet handled
class JobGetFileInfo(JobBase):
"""Tries to retieve information about a file. If everything goes well
@@ -571,6 +616,11 @@
MaxRetries=-1 ...N
PriorityClass=Priority*
+ @ivar jobResult: will be a tuple(bool error, data). If error is True, no information could be
+ retrieved and data will be a GetFailed message containing details. If error is False
+ data will be a tuple(str metadataContentType, str size). Note that both may be empty
+ string and size may not be accurate.
+
"""
identifier = newIdentifier()
message = Message(
@@ -612,46 +662,73 @@
)
- def handlePriorityChanged(self):
- self.jobMessage['PriorityClass'] = priority
+ def handleMessage(self, msg):
+ if msg.name == Message.DataFound:
+ return self.handleDataFound(msg)
+ elif msg.name == Message.GetFailed:
+ return self.handleGetFailed(msg)
+ elif msg.name == Message.IdentifierCollision:
+ return self.handleIdentifierCollision(msg)
+ elif msg.name == Message.PersistentRequestModified:
+ return self.handlePersistentRequestModified(msg)
+ elif msg.name == Message.PersistentRequestRemoved:
+ return self.handlePersistentRequestRemoved(msg)
+ elif msg.name == Message.SimpleProgress:
+ return self.handleSimpleProgress(msg)
+
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
- def handleProgress(self, msg):
- """Handles the next progress made. Overwrite to process.
- """
+ def handleDataFound(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = (
+ False,
+ (
+ msg.get('Metadata.ContentType', ''),
+ msg.get('DataLength', '')
+ )
+ )
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
- def handleRequestRemoved(self, msg):
+ def handleGetFailed(self, msg):
+ self.jobTimeStop = time.time()
+ if msg['Code'] == FetchError.TooBig:
+ self.jobResult = (False, msg)
+ self.jobResult = (
+ False,
+ (
+ msg.get('ExpectedMetadata.ContentType', ''),
+ msg.get('ExpectedDataLength', '')
+ )
+ )
+ else:
+ self.jobResult = (True, msg)
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
+ def handleIdentifierCollision(self, msg):
+ raise
+
+
+ def handleSimpleProgress(self, msg):
+ return True
+
+
+ def handlePersistentRequestModified(self, msg):
+ priorityClass = msg.get('PriorityClass', None)
+ if priorityClass is not None:
+ self.jobMessage['PriorityClass'] = priorityClass
+ return True
+
+ def handlePersistentRequestRemoved(self, msg):
if self.jobClient.jobIsRunning(self.jobIdentifier):
self.jobClient.jobStop(self.jobIdentifier)
+ return True
+
-
- def handleStop(self, flagError, msg):
- """called when a job is finished
- @note: jobResult may contain an 'IdentifierCollision' message. Make shure
- to handle this appropriately
- """
- JobBase.handleStop(self,flagError, msg)
- if msg.name == Message.DataFound:
- self.jobResult = (
- msg.get('Metadata.ContentType', ''),
- msg.get('DataLength', '')
- )
- elif msg.name == Message.GetFailed:
- if msg['Code'] == FetchError.TooBig:
- self.jobResult = (False, msg)
- self.jobResult = (
- msg.get('ExpectedMetadata.ContentType', ''),
- msg.get('ExpectedDataLength', '')
- )
- # not shure when removing the request is actually required
- # so send it anyway
- self.jobClient.sendMessage(
- Message.RemovePersistentRequest,
- Global=fcpBool(False),
- Identifier=self.jobIdentifier,
- )
-
-
-
#TODO: handle case where directories are registered multiple times
class JobTestDDA(JobBase):
"""Tests a directory for read / write accesss
@@ -672,6 +749,16 @@
self.jobTmpFile = None
+
+ def handleMessage(self, msg):
+ if msg.name == Message.TestDDAReply:
+ return self.handleTestDDAReply(msg)
+ elif msg.name == Message.TestDDAComplete:
+ return self.handleTestDDAComplete(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+
def handleTestDDAReply(self, msg):
fpathWrite = msg.params.get('WriteFilename', None)
fpathRead = msg.params.get('ReadFilename', None)
@@ -693,13 +780,18 @@
Message.TestDDAResponse,
Directory=msg['Directory'],
ReadContent=readContent,
- )
+ )
+ return True
+
-
- def handleStop(self, flagError, msg):
- JobBase.handleStop(self, flagError, msg)
+ def handleTestDDAComplete(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = msg
saveRemoveFile(self.jobTmpFile)
self.jobTmpFile = None
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
#**************************************************************************
# fcp client
@@ -728,6 +820,10 @@
#TODO: no idea what happens on reconnect if socket died. What about running jobs?
#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
#TODO: no idea if to add support for pending jobs and queue management here
+
+#TODO: do not mix directories as identifiers with identifiers (might lead to collisions)
+#TODO: how to handle (ProtocolError code 18: Shutting down)?
+
class FcpClient(object):
"""Fcp client implementation"""
@@ -757,7 +853,7 @@
self._errorHandler = errorHandler
self._log = logging.getLogger(name)
self._logMessages = logMessages
- self._lock = thread.allocate_lock()
+ self._lock = thread.allocate_lock() # lovk when resources are accessed
self._socket = None
self.setVerbosity(verbosity)
@@ -806,9 +902,8 @@
# as expected the socket simply breaks. So take it over.
job = JobClientHello(self)
self.jobAdd(job, synchron=True)
- error, msg = job.jobResult
- assert not error, 'Error should have been caught by handleMessage()'
- return msg
+ assert job.jobResult is not None, 'ClientHello is not working as expected'
+ return job.jobResult
self._log.info(self._logMessages.ConnectionRetry)
@@ -827,59 +922,66 @@
"""
- #if msg.name != 'USocketTimeOut':
+ if msg.name == 'USocketTimeOut':
+ return True
+
self._log.debug(self._logMessages.MessageReceived + msg.pprint())
- if msg.name == Message.NodeHello:
- #connectionIdentifier = msg['ConnectionIdentifier']
- self.jobStop(FixedJobIdentifiers.ClientHello, msg)
-
- elif msg.name == Message.ProtocolError:
+
+ if msg.name == Message.ProtocolError:
code = msg['Code']
- #if code == ProtocolError.NoLateClientHellos:
- # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True)
- #elif code == ProtocolError.ClientHelloMustBeFirst:
- # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True)
- #else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- #TODO: inform caller
- raise ProtocolError(msg)
+ if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
+
+ elif code == ProtocolError.Shutdown:
+ if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg):
+
+ # ########################################
+ #TODO: ???
+
+ return True
+
else:
- return self.jobStop(identifier, msg, flagError=True)
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ #TODO: inform caller
+ raise ProtocolError(msg)
+ else:
+ return self.jobDispatchMessage(identifier, msg)
+
+ else:
+
+ # check if the is something like an identifier in the message
+ if msg.name == Message.TestDDAReply:
+ identifier = msg['Directory']
+ elif msg.name == Message.TestDDAComplete:
+ identifier = msg['Directory']
+ else:
+ identifier = msg.get('Identifier', None)
- if msg.name == Message.DataFound:
- return self.jobStop(msg['Identifier'], msg, flagError=True)
-
- elif msg.name == Message.EndListPeers:
- return self.jobStop(FixedJobIdentifiers.ListPeers, msg)
-
- elif msg.name == Message.GetFailed:
- return self.jobStop(msg['Identifier'], msg, flagError=True)
+ # dispatch to jobs with fixed identifiers
+ if identifier is None:
+
+ if msg.name == Message.NodeHello:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
- elif msg.name == Message.PersistentRequestRemoved:
- return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg)
-
- elif msg.name == Message.SimpleProgress:
- return self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+ elif msg.name == Message.EndListPeers:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
- elif msg.name == Message.TestDDAComplete:
- return self.jobStop(msg['Directory'], msg)
+ elif msg.name == Message.Peer:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
- elif msg.name == Message.TestDDAReply:
- return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
+ # more here.....
+
+ else:
+ raise ValueError('Unhandled message: ' + msg.name)
+
+ else:
+ return self.jobDispatchMessage(identifier, msg)
+
+ raise RuntimeError('Should not have endet here: %s' % msg.name)
- elif msg.name == Message.IdentifierCollision:
- # identifier is unique to us, but for some reason not unique to the node
- #TODO: maybe for future implementations find a way to automise
- # reinsertion and a notification via a handleJobChanged() for the
- # caller to overwrite
- return self.jobStop(msg['Identifier'], msg, flagError=True)
-
- elif msg.name == Message.Peer:
- return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
-
- return False
+
#########################################################
@@ -905,6 +1007,9 @@
self._lock.acquire(True)
try:
if job.jobIdentifier in self._jobs['JobMapping']:
+ raise ValueError('Job with that identifier already present')
+
+ if job.jobIdentifier in self._jobs['JobMapping']:
raise ValueError('Duplicate job: %r' % job.jobIdentifier)
self._jobs['JobMapping'][job.jobIdentifier] = job
self._jobs['JobList'].append(job)
@@ -916,7 +1021,19 @@
if synchron:
while job.jobResult is None:
self.next()
+
+ def jobDispatchMessage(self, identifier, msg):
+ """Dispatches a message to a job
+ @param identifier: identifier of the job
+ @param msg: (Message) message to dispatch
+ @return: True if the message was handled, False otherwise
+ """
+ job = self.jobGet(identifier)
+ if job is not None:
+ return job.handleMessage(msg)
+ return False
+
def jobGet(self, identifier):
"""Returns a job given its identifier
@@ -943,36 +1060,15 @@
self._lock.release()
return result
-
- def jobNotify(self, identifier, handler, msg):
- """Notifies a job about an event while it is running
- @param identifier: identifier of the job to notify
- @param handler: (str) method of the job to call to handle the notification
- @param msg: Message() to pass to the job
- @return: True if a job was found to be notified, False otherwise
+
+ def jobRemove(self, identifier):
+ """Removes a job unconditionally
+ @param identifier: identifier of the job to remove
+ @return: True if the job was found, False otherwise
"""
self._lock.acquire(True)
try:
job = self._jobs['JobMapping'].get(identifier, None)
- finally:
- self._lock.release()
- if job is None:
- return False
- getattr(job, handler)(msg)
- return True
-
-
- #TODO: quite unclear when to remove a job
- def jobStop(self, identifier, msg, flagError=False):
- """Stops a job
- @param identifier: identifier of the job to stop
- @param msg: Message() to pass to the job as result
- @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse
- @return: True if a job was found to be stopped, False otherwise
- """
- self._lock.acquire(True)
- try:
- job = self._jobs['JobMapping'].get(identifier, None)
if job is not None:
self._jobs['JobList'].remove(job)
del self._jobs['JobMapping'][identifier]
@@ -981,10 +1077,9 @@
if job is None:
return False
self._log.info(self._logMessages.JobStop + job.jobMessage.name)
- job.handleStop(flagError, msg)
return True
-
+
#TODO: some info when all jobs are completed
def next(self):
"""Pumps the next message waiting
@@ -1059,8 +1154,13 @@
@note: use KeyboardInterrupt to stop prematurely
"""
try:
+ #n = 0
+
while self.hasJobsRunning():
+ #n += 1
+ #if n > 40: break
self.next()
+
except KeyboardInterrupt:
self._log(self._logMessages.KeyboardInterrupt)
self.close()
@@ -1117,7 +1217,9 @@
def getFile(self, job):
pass
+
+
#*****************************************************************************
#
#*****************************************************************************
@@ -1158,10 +1260,22 @@
def foo():
- job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
- c.jobAdd(job2)
+ job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip')
+
+
+ #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
+ #job.jobIdentifier = job.jobMessage['Identifier'] = 1
+ #job.jobMessage['Identifier'] = 1
+ #job.jobIdentifier = 1
+ c.jobAdd(job)
+
+ #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%201%281%29.jpg')
+ #job.jobMessage['Identifier'] = 1
+ #job.jobIdentifier = 1
+ #c.jobAdd(job)
+
c.run()
print '---------------------------'
- print job2.jobResult
+ print job.jobResult
print '---------------------------'
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-22 09:02:02
|
Revision: 10
http://fclient.svn.sourceforge.net/fclient/?rev=10&view=rev
Author: jUrner
Date: 2007-10-22 02:02:04 -0700 (Mon, 22 Oct 2007)
Log Message:
-----------
continued working on protocol implementation
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 13:37:51 UTC (rev 9)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10)
@@ -16,10 +16,10 @@
#**************************************************************
NameClient = 'Fcp20Client'
DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
-DefaultFcpPort = 9481
try:
DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
-except: pass
+except ValueError:
+ DefaultFcpPort = 9481
SocketTimeout = 0.1
class FixedJobIdentifiers:
@@ -33,13 +33,13 @@
class Priorities:
"""All priorities supported by the client"""
- Maximum = 0
- Interactive = 1
- SemiInteractive = 2
- Updatable = 3
- Bulk = 4
- Prefetch = 5
- Minimum = 6
+ Maximum = '0'
+ Interactive = '1'
+ SemiInteractive = '2'
+ Updatable = '3'
+ Bulk = '4'
+ Prefetch = '5'
+ Minimum = '6'
PriorityMin = Minimum
PriorityDefault = Bulk
@@ -471,42 +471,40 @@
class JobBase(object):
"""Base class for jobs"""
- _fcp_auto_remove_ = True
-
+
def __init__(self, fcpClient, identifier, message):
"""
@param fcpClient: FcpClient() instance
@param identifier: (str) identifier of the job
@param message: (Message) to send to the node whne the job ist started
- @ivar fcpClient: FcpClient() instance
- @ivar fcpError: holding the error message if an error was encountered while running
- the job, None otherwise
- @ivar fcpIdentifier: identifier of the job
- @ivar fcpMessage: initial message send to the node
- @ivar fcpResult: if no error was encountered, holding the result of the job when complete
- @ivar fcpTimeStart: time the job was started
- @ivar fcpTimeStop: time the job was stopped
+ @ivar jobClient: FcpClient() instance of the job
+ @ivar jobIdentifier: identifier of the job
+ @ivar jobMessage: message to be send to the node
+ @ivar jobResult: if no error was encountered, holding the result of the job when complete
+ @ivar jobTimeStart: time the job was started
+ @ivar jobTimeStop: time the job was stopped
"""
- self.fcpClient = fcpClient # FcpClient() instance the job belongs to
- self.fcpIdentifier = identifier # job identifier
- self.fcpMessage = message # message send to node
- self.fcpResult = None # job result (bool error, Message msg)
- self.fcpTimeStart = 0 # time the job was started
- self.fcpTimeStop = 0 # time the job was stopped
+ self.jobClient = fcpClient
+ self.jobIdentifier = identifier
+ self.jobMessage = message
+ self.jobResult = None
+ self.jobTimeStart = 0
+ self.jobTimeStop = 0
+
def handleStart(self):
"""Starts the job"""
- self.fcpResult = None
- self.fcpTimeStart = time.time()
- self.fcpClient.sendMessageEx(self.fcpMessage)
+ self.jobResult = None
+ self.jobTimeStart = time.time()
+ self.jobClient.sendMessageEx(self.jobMessage)
def handleStop(self, flagError, msg):
"""Called on job completion to stop the job
@param flagError: True if an error was encountered, False otherwise
@param msg: (Message) to pass to the job
"""
- self.fcpTimeStop = time.time()
- self.fcpResult = (flagError, msg)
+ self.jobTimeStop = time.time()
+ self.jobResult = (flagError, msg)
class JobClientHello(JobBase):
@@ -516,8 +514,7 @@
goes well, you will get a NodeHello in response.
"""
- _fcp_auto_remove_ = True
-
+
def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
"""
@param name: (str) connection name or None, to use an arbitrary name
@@ -538,8 +535,7 @@
"""Lists all known peers of the node
"""
- _fcp_auto_remove_ = True
-
+
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
message = Message(
Message.ListPeers,
@@ -561,13 +557,10 @@
On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
Note, that both members may be '' (empty string)
-
"""
- _fcp_auto_remove_ = False
-
-
- # idea is to provoke a GetFailed message and take mimetype and size from it
+
+ # idea is to provoke a GetFailed message and take mimetype and size from 'GetFailed'
def __init__(self, fcpClient, uri, **params):
"""
@param fcpClient: FcpClient() instance
@@ -584,7 +577,9 @@
Message.ClientGet,
Identifier=identifier,
URI=uri,
- MaxSize='0',
+ # suggested by Mathew Toseland to use about 32k for mimeType requests
+ # basic sizes of keys are: 1k for SSks and 32k for CHKs
+ MaxSize='32000',
ReturnType='none',
Verbosity='1',
**params
@@ -592,34 +587,77 @@
JobBase.__init__(self, fcpClient, identifier, message)
+ def getPrority(self):
+ return self.jobMessage.get('PriorityClass', Priorities.PriorityDefault)
+
+
+ def setPriority(self, priority):
+ if not priority in Priorities:
+ raise ValueError('Invalid priority: %r' % priority)
+ self.jobClient.sendMessage(
+ Message.ModifyPersistentRequest,
+ Identifier=self.jobIdentifier,
+ Global=fcpBool(False),
+ PriorityClass=priority,
+ )
+ # not shure if the response arrives in any case, so set it here
+ self.jobMessage['PriorityClass'] = priority
+
+
+ def stopRequest(self):
+ self.jobClient.sendMessage(
+ Message.RemovePersistentRequest,
+ Global=fcpBool(False),
+ Identifier=self.jobIdentifier,
+ )
+
+
+ def handlePriorityChanged(self):
+ self.jobMessage['PriorityClass'] = priority
+
def handleProgress(self, msg):
"""Handles the next progress made. Overwrite to process.
"""
+
+ def handleRequestRemoved(self, msg):
+ if self.jobClient.jobIsRunning(self.jobIdentifier):
+ self.jobClient.jobStop(self.jobIdentifier)
+
def handleStop(self, flagError, msg):
+ """called when a job is finished
+ @note: jobResult may contain an 'IdentifierCollision' message. Make shure
+ to handle this appropriately
+ """
JobBase.handleStop(self,flagError, msg)
if msg.name == Message.DataFound:
- self.fcpResult = (
+ self.jobResult = (
msg.get('Metadata.ContentType', ''),
msg.get('DataLength', '')
)
elif msg.name == Message.GetFailed:
if msg['Code'] == FetchError.TooBig:
- self.fcpResult = (False, msg)
- self.fcpResult = (
+ self.jobResult = (False, msg)
+ self.jobResult = (
msg.get('ExpectedMetadata.ContentType', ''),
msg.get('ExpectedDataLength', '')
)
-
+ # not shure when removing the request is actually required
+ # so send it anyway
+ self.jobClient.sendMessage(
+ Message.RemovePersistentRequest,
+ Global=fcpBool(False),
+ Identifier=self.jobIdentifier,
+ )
+
-
+
#TODO: handle case where directories are registered multiple times
class JobTestDDA(JobBase):
"""Tests a directory for read / write accesss
"""
- _fcp_auto_remove_ = False
-
+
def __init__(self, fcpClient, directory, read=True, write=True):
if not os.path.isdir(directory):
raise ValueError('No such directory: %r' % directory)
@@ -631,7 +669,7 @@
WantWriteDirectory=fcpBool(write),
)
JobBase.__init__(self, fcpClient, directory, message)
- self.fcpTmpFile = None
+ self.jobTmpFile = None
def handleTestDDAReply(self, msg):
@@ -644,14 +682,14 @@
if os.path.isfile(fpathWrite):
os.remove(fpathWrite)
else:
- self.fcpTmpFile = fpathWrite
+ self.jobTmpFile = fpathWrite
if fpathRead is not None:
readContent = saveReadFile(fpathRead)
if readContent is None:
readContent = ''
- self.fcpClient.sendMessage(
+ self.jobClient.sendMessage(
Message.TestDDAResponse,
Directory=msg['Directory'],
ReadContent=readContent,
@@ -660,8 +698,8 @@
def handleStop(self, flagError, msg):
JobBase.handleStop(self, flagError, msg)
- saveRemoveFile(self.fcpTmpFile)
- self.fcpTmpFile = None
+ saveRemoveFile(self.jobTmpFile)
+ self.jobTmpFile = None
#**************************************************************************
# fcp client
@@ -682,6 +720,7 @@
JobStop = 'Stopping job: '
JobsCompleted = 'All jobs completed'
+
KeyboardInterrupt = 'Keyboard interrupt'
SocketDead = 'Socket is dead'
@@ -710,10 +749,10 @@
self._isConnected = False
self._jobs = {
- 'all': {},
- 'pending': [], # ???
- 'running': [],
- 'complete': [], # ???
+ #TODO: check if JobList is still required
+ 'JobMapping': {},
+ 'JobList': [],
+ 'RegisteredDirectories': [],
}
self._errorHandler = errorHandler
self._log = logging.getLogger(name)
@@ -734,7 +773,7 @@
self._socket = None
- #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
+ #TODO: an iterator would be nice to enshure Guis stay responsitive in the call
def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
"""Establishes the connection to a freenet node
@param host: (str) host of th node
@@ -767,7 +806,7 @@
# as expected the socket simply breaks. So take it over.
job = JobClientHello(self)
self.jobAdd(job, synchron=True)
- error, msg = job.fcpResult
+ error, msg = job.jobResult
assert not error, 'Error should have been caught by handleMessage()'
return msg
@@ -782,9 +821,13 @@
def handleMessage(self, msg):
- """Handles the next message from the freenet node
- @param msg: Message() to handle
+ """Handles a message from the freenet node
+ @param msg: (Message) to handle
+ @return: True if the message was handled, False otherwise
"""
+
+
+ #if msg.name != 'USocketTimeOut':
self._log.debug(self._logMessages.MessageReceived + msg.pprint())
if msg.name == Message.NodeHello:
@@ -803,30 +846,57 @@
#TODO: inform caller
raise ProtocolError(msg)
else:
- self.jobStop(identifier, msg, flagError=True)
+ return self.jobStop(identifier, msg, flagError=True)
- elif msg.name == Message.Peer:
- self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
+ if msg.name == Message.DataFound:
+ return self.jobStop(msg['Identifier'], msg, flagError=True)
elif msg.name == Message.EndListPeers:
- self.jobStop(FixedJobIdentifiers.ListPeers, msg)
+ return self.jobStop(FixedJobIdentifiers.ListPeers, msg)
elif msg.name == Message.GetFailed:
- self.jobStop(msg['Identifier'], msg, flagError=True)
+ return self.jobStop(msg['Identifier'], msg, flagError=True)
+ elif msg.name == Message.PersistentRequestRemoved:
+ return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg)
+
elif msg.name == Message.SimpleProgress:
- self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+ return self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+ elif msg.name == Message.TestDDAComplete:
+ return self.jobStop(msg['Directory'], msg)
+
elif msg.name == Message.TestDDAReply:
- self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
+ return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
- elif msg.name == Message.TestDDAComplete:
- self.jobStop(msg['Directory'], msg)
+ elif msg.name == Message.IdentifierCollision:
+ # identifier is unique to us, but for some reason not unique to the node
+ #TODO: maybe for future implementations find a way to automise
+ # reinsertion and a notification via a handleJobChanged() for the
+ # caller to overwrite
+ return self.jobStop(msg['Identifier'], msg, flagError=True)
- elif msg.name == Message.IdentifierCollision:
- pass
+ elif msg.name == Message.Peer:
+ return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
+
+ return False
+ #########################################################
+ ## jobs
+ #########################################################
+ def hasJobsRunning(self):
+ """Checks if the client has running jobs
+ @return: (bool) True if so, False otherwise
+ """
+ self._lock.acquire(True)
+ try:
+ result = bool(self._jobs['JobList'])
+ finally:
+ self._lock.release()
+ return result
+
+
def jobAdd(self, job, synchron=False):
"""Adds a job to the client
@param job: (Job*) job to add
@@ -834,35 +904,62 @@
"""
self._lock.acquire(True)
try:
- if job.fcpIdentifier in self._jobs['all']:
- raise ValueError('Duplicate job: %r' % job.identifier)
- self._jobs['all'][job.fcpIdentifier] = job
- self._jobs['running'].append(job)
+ if job.jobIdentifier in self._jobs['JobMapping']:
+ raise ValueError('Duplicate job: %r' % job.jobIdentifier)
+ self._jobs['JobMapping'][job.jobIdentifier] = job
+ self._jobs['JobList'].append(job)
finally:
self._lock.release()
- self._log.info(self._logMessages.JobStart + job.fcpMessage.name)
+ self._log.info(self._logMessages.JobStart + job.jobMessage.name)
job.handleStart()
if synchron:
- while job.fcpResult is None:
+ while job.jobResult is None:
self.next()
+ def jobGet(self, identifier):
+ """Returns a job given its identifier
+ @param identifier: identifier of the job
+ @return: (Job*) instance or None, if no corrosponding job was found
+ """
+ self._lock.acquire(True)
+ try:
+ result = self._jobs['JobMapping'].get(identifier, None)
+ finally:
+ self._lock.release()
+ return result
+
+ def jobIsRunning(self, identifier):
+ """Checks if a job is running
+ @param identifier: identifier of the job
+ @return: True if so, False otherwise
+ """
+ self._lock.acquire(True)
+ try:
+ result = identifier in self._jobs['JobMapping']
+ finally:
+ self._lock.release()
+ return result
+
+
def jobNotify(self, identifier, handler, msg):
"""Notifies a job about an event while it is running
@param identifier: identifier of the job to notify
@param handler: (str) method of the job to call to handle the notification
@param msg: Message() to pass to the job
+ @return: True if a job was found to be notified, False otherwise
"""
self._lock.acquire(True)
try:
- job = self._jobs['all'].get(identifier, None)
+ job = self._jobs['JobMapping'].get(identifier, None)
finally:
self._lock.release()
if job is None:
- raise ValueError('No such job: %r' % identifier)
+ return False
getattr(job, handler)(msg)
+ return True
#TODO: quite unclear when to remove a job
@@ -871,25 +968,23 @@
@param identifier: identifier of the job to stop
@param msg: Message() to pass to the job as result
@param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse
+ @return: True if a job was found to be stopped, False otherwise
"""
self._lock.acquire(True)
try:
- job = self._jobs['all'].get(identifier, None)
+ job = self._jobs['JobMapping'].get(identifier, None)
if job is not None:
- self._jobs['running'].remove(job)
- if job._fcp_auto_remove_:
- del self._jobs['all'][identifier]
- else:
- self._jobs['complete'].append(job)
+ self._jobs['JobList'].remove(job)
+ del self._jobs['JobMapping'][identifier]
finally:
self._lock.release()
-
if job is None:
- raise ValueError('No such job: %r' % identifier)
- self._log.info(self._logMessages.JobStop + job.fcpMessage.name)
+ return False
+ self._log.info(self._logMessages.JobStop + job.jobMessage.name)
job.handleStop(flagError, msg)
+ return True
+
-
#TODO: some info when all jobs are completed
def next(self):
"""Pumps the next message waiting
@@ -963,21 +1058,8 @@
"""Runs the client untill all jobs passed to it are completed
@note: use KeyboardInterrupt to stop prematurely
"""
-
- # TODO:
- # x. push pending jobs
try:
- while True:
- if not self._lock.acquire(False):
- continue
-
- try:
- if not self._jobs['pending'] and not self._jobs['running']:
- self._log.info(self._logMessages.JobsCompleted)
- break
- finally:
- self._lock.release()
-
+ while self.hasJobsRunning():
self.next()
except KeyboardInterrupt:
self._log(self._logMessages.KeyboardInterrupt)
@@ -1026,6 +1108,16 @@
""""""
self._log.setLevel(verbosity)
+
+ ########################################################
+ ##
+ ########################################################
+ def getFileInfo(self, job):
+ pass
+
+ def getFile(self, job):
+ pass
+
#*****************************************************************************
#
#*****************************************************************************
@@ -1039,8 +1131,7 @@
c.run()
print '---------------------------'
- print job1.fcpError
- print job1.fcpResult
+ print job1.jobResult
print '---------------------------'
# should raise
#foo()
@@ -1052,7 +1143,7 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpResult
+ print job2.jobResult
print '---------------------------'
#foo()
@@ -1061,7 +1152,7 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpResult
+ print job2.jobResult
print '---------------------------'
#foo()
@@ -1071,6 +1162,6 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpResult
+ print job2.jobResult
print '---------------------------'
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-20 13:37:47
|
Revision: 9
http://fclient.svn.sourceforge.net/fclient/?rev=9&view=rev
Author: jUrner
Date: 2007-10-20 06:37:51 -0700 (Sat, 20 Oct 2007)
Log Message:
-----------
bit of refactoring
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 10:02:28 UTC (rev 8)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 13:37:51 UTC (rev 9)
@@ -22,79 +22,14 @@
except: pass
SocketTimeout = 0.1
-class JobIdentifiers:
+class FixedJobIdentifiers:
"""Fixed job identifiers
@note: he client can only handle one job of these at a time
"""
ClientHello = 'ClientHello'
ListPeers = 'ListPeers'
-class Messages:
- """All messages supported by the client"""
-
- # client messages
- ClientHello = 'ClientHello'
- ListPeer = 'ListPeer' # (since 1045)
- ListPeers = 'ListPeers'
- ListPeerNotes = 'ListPeerNotes'
- AddPeer = 'AddPeer'
- ModifyPeer = 'ModifyPeer'
- ModifyPeerNote = 'ModifyPeerNote'
- RemovePeer = 'RemovePeer'
- GetNode = 'GetNode'
- GetConfig = 'GetConfig' # (since 1027)
- ModifyConfig = 'ModifyConfig' # (since 1027)
- TestDDARequest = 'TestDDARequest' # (since 1027)
- TestDDAResponse = 'TestDDAResponse' # (since 1027)
- GenerateSSK = 'GenerateSSK'
- ClientPut = 'ClientPut'
- ClientPutDiskDir = 'ClientPutDiskDir'
- ClientPutComplexDir = 'ClientPutComplexDir'
- ClientGet = 'ClientGet'
- SubscribeUSK = 'SubscribeUSK'
- WatchGlobal = 'WatchGlobal'
- GetRequestStatus = 'GetRequestStatus'
- ListPersistentRequests = 'ListPersistentRequests'
- RemovePersistentRequest = 'RemovePersistentRequest'
- ModifyPersistentRequest = 'ModifyPersistentRequest'
- Shutdown = 'Shutdown'
-
- # node messages
- NodeHello = 'NodeHello'
- CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
- Peer = 'Peer'
- PeerNote = 'PeerNote'
- EndListPeers = 'EndListPeers'
- EndListPeerNotes = 'EndListPeerNotes'
- PeerRemoved = 'PeerRemoved'
- NodeData = 'NodeData'
- ConfigData = 'ConfigData' # (since 1027)
- TestDDAReply = 'TestDDAReply' # (since 1027)
- TestDDAComplete = 'TestDDAComplete' # (since 1027)
- SSKKeypair = 'SSKKeypair'
- PersistentGet = 'PersistentGet'
- PersistentPut = 'PersistentPut'
- PersistentPutDir = 'PersistentPutDir'
- URIGenerated = 'URIGenerated'
- PutSuccessful = 'PutSuccessful'
- PutFetchable = 'PutFetchable'
- DataFound = 'DataFound'
- AllData = 'AllData'
- StartedCompression = 'StartedCompression'
- FinishedCompression = 'FinishedCompression'
- SimpleProgress = 'SimpleProgress'
- EndListPersistentRequests = 'EndListPersistentRequests'
- PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
- PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
- PutFailed = 'PutFailed'
- GetFailed = 'GetFailed'
- ProtocolError = 'ProtocolError'
- IdentifierCollision = 'IdentifierCollision'
- UnknownNodeIdentifier = 'UnknownNodeIdentifier'
- UnknownPeerNoteType = 'UnknownPeerNoteType'
- SubscribedUSKUpdate = 'SubscribedUSKUpdate'
-
class Priorities:
"""All priorities supported by the client"""
@@ -109,12 +44,23 @@
PriorityMin = Minimum
PriorityDefault = Bulk
-
-# errors
-
-class FetchErrors:
+#************************************************************************************
+# exceptions
+#************************************************************************************
+class FetchError(Exception):
"""All fetch errors supported by the client"""
+ def __init__(self, msg):
+ """
+ @param msg: (Message) GetFailed message
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+ def __str__(self): return self.value
+
MaxArchiveRecursionExceeded = '1'
UnknownSplitfileMetadata = '2'
UnknownMetadata = '3'
@@ -145,9 +91,20 @@
NotAllDataFound = '28'
-class InsertErrors:
+class InsertError(Exception):
"""All insert errors supported by the client"""
+ def __init__(self, msg):
+ """
+ @param msg: (Message) PutFailed message
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+ def __str__(self): return self.value
+
InvalidUri = '1'
BucketError = '2'
InternalError = '3'
@@ -160,9 +117,20 @@
Canceled = '10'
-class ProtocolErrors:
+class ProtocolError(Exception):
"""All protocol errors supported by the client"""
+ def __init__(self, msg):
+ """
+ @param msg: (Message) ProtocolError message
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+ def __str__(self): return self.value
+
ClientHelloMustBeFirst = '1'
NoLateClientHellos = '2'
MessageParseError = '3'
@@ -195,15 +163,35 @@
OpennetDisabled = '30'
DarknetOnly = '31'
+class SocketError(Exception): pass
#**********************************************************************
# functions
#**********************************************************************
+def fcpBool(pythonBool):
+ """Converts a python bool to a fcp bool
+ @param pythonBool: (bool)
+ @return: (str) 'true' or 'false'
+ """
+ if pythonBool:
+ return 'true'
+ return 'false'
+
+
def newIdentifier():
"""Returns a new unique identifier
@return: (str) uuid
"""
return str(uuid.uuid4())
+
+def pythonBool(fcpBool):
+ """Converts a fcp bool to a python bool
+ @param pythonBool: 'true' or 'false'
+ @return: (bool) True or False
+ """
+ return fcpBool == 'true'
+
+
def saveReadFile(fpath):
"""Reads contents of a file in the savest manner possible
@param fpath: file to write
@@ -267,42 +255,9 @@
stdout, stderr = p.communicate()
return stdout
-
-def fcpBool(pythonBool):
- """Converts a python bool to a fcp bool
- @param pythonBool: (bool)
- @return: (str) 'true' or 'false'
- """
- if pythonBool:
- return 'true'
- return 'false'
-
-def pythonBool(fcpBool):
- """Converts a fcp bool to a python bool
- @param pythonBool: 'true' or 'false'
- @return: (bool) True or False
- """
- return fcpBool == 'true'
-
#**********************************************************************
# classes
#**********************************************************************
-class FcpSocketError(Exception): pass
-class FcpProtocolError(Exception):
- def __init__(self, msg):
- """
- @param msg: (Message) ProtocolError message
- """
- self.value = '%s (%s, %s)' % (
- msg.get('CodeDescription', 'Unknown error') ,
- msg['Code'],
- msg.get('ExtraDescription', '...'),
- )
-
- def __str__(self): return self.value
-
-
-
class FcpUri(object):
"""Wrapper class for freenet uris"""
@@ -397,8 +352,68 @@
class Message(object):
"""Class wrapping a freenet message"""
- Name = 'UMessage'
+ # client messages
+ ClientHello = 'ClientHello'
+ ListPeer = 'ListPeer' # (since 1045)
+ ListPeers = 'ListPeers'
+ ListPeerNotes = 'ListPeerNotes'
+ AddPeer = 'AddPeer'
+ ModifyPeer = 'ModifyPeer'
+ ModifyPeerNote = 'ModifyPeerNote'
+ RemovePeer = 'RemovePeer'
+ GetNode = 'GetNode'
+ GetConfig = 'GetConfig' # (since 1027)
+ ModifyConfig = 'ModifyConfig' # (since 1027)
+ TestDDARequest = 'TestDDARequest' # (since 1027)
+ TestDDAResponse = 'TestDDAResponse' # (since 1027)
+ GenerateSSK = 'GenerateSSK'
+ ClientPut = 'ClientPut'
+ ClientPutDiskDir = 'ClientPutDiskDir'
+ ClientPutComplexDir = 'ClientPutComplexDir'
+ ClientGet = 'ClientGet'
+ SubscribeUSK = 'SubscribeUSK'
+ WatchGlobal = 'WatchGlobal'
+ GetRequestStatus = 'GetRequestStatus'
+ ListPersistentRequests = 'ListPersistentRequests'
+ RemovePersistentRequest = 'RemovePersistentRequest'
+ ModifyPersistentRequest = 'ModifyPersistentRequest'
+ Shutdown = 'Shutdown'
+ # node messages
+ NodeHello = 'NodeHello'
+ CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
+ Peer = 'Peer'
+ PeerNote = 'PeerNote'
+ EndListPeers = 'EndListPeers'
+ EndListPeerNotes = 'EndListPeerNotes'
+ PeerRemoved = 'PeerRemoved'
+ NodeData = 'NodeData'
+ ConfigData = 'ConfigData' # (since 1027)
+ TestDDAReply = 'TestDDAReply' # (since 1027)
+ TestDDAComplete = 'TestDDAComplete' # (since 1027)
+ SSKKeypair = 'SSKKeypair'
+ PersistentGet = 'PersistentGet'
+ PersistentPut = 'PersistentPut'
+ PersistentPutDir = 'PersistentPutDir'
+ URIGenerated = 'URIGenerated'
+ PutSuccessful = 'PutSuccessful'
+ PutFetchable = 'PutFetchable'
+ DataFound = 'DataFound'
+ AllData = 'AllData'
+ StartedCompression = 'StartedCompression'
+ FinishedCompression = 'FinishedCompression'
+ SimpleProgress = 'SimpleProgress'
+ EndListPersistentRequests = 'EndListPersistentRequests'
+ PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
+ PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
+ PutFailed = 'PutFailed'
+ GetFailed = 'GetFailed'
+ ProtocolError = 'ProtocolError'
+ IdentifierCollision = 'IdentifierCollision'
+ UnknownNodeIdentifier = 'UnknownNodeIdentifier'
+ UnknownPeerNoteType = 'UnknownPeerNoteType'
+ SubscribedUSKUpdate = 'SubscribedUSKUpdate'
+
def __init__(self, name, data=None, **params):
"""
@@ -410,29 +425,25 @@
self.data = data
self.name = name
self.params = params
-
-
+
def toString(self):
"""Returns a string with the formated message, ready to be send"""
-
# TODO: "Data" not yet implemented
out = [self.name, ]
for param, value in self.params.items():
out.append('%s=%s' % (param, value))
out.append('EndMessage\n')
return '\n'.join(out)
-
-
+
def pprint(self):
"""Returns the message as nicely formated human readable string"""
-
- out = [self.name, ]
+ out = ['', '>>' + self.name, ]
for param, value in self.params.items():
- out.append(' %s=%s' % (param, value))
- out.append('EndMessage')
+ out.append('>> %s=%s' % (param, value))
+ out.append('>>EndMessage')
out.append('')
return '\n'.join(out)
-
+
def __getitem__(self, name):
"""Returns the message parameter 'name' """
return self.params[name]
@@ -444,14 +455,13 @@
def __setitem__(self, name, value):
"""Sets the message parameter 'name' to 'value' """
self.params[name] = value
-
+
+
class MessageSocketTimeout(Message):
- Name = 'USocketTimeout'
-
def __init__(self):
- Message.__init__(self, self.Name)
+ Message.__init__(self, 'USocketTimeOut')
@@ -474,47 +484,29 @@
@ivar fcpIdentifier: identifier of the job
@ivar fcpMessage: initial message send to the node
@ivar fcpResult: if no error was encountered, holding the result of the job when complete
- @ivar fcpTime: when the job is complete, holding the time the job took to complete
+ @ivar fcpTimeStart: time the job was started
+ @ivar fcpTimeStop: time the job was stopped
"""
-
self.fcpClient = fcpClient # FcpClient() instance the job belongs to
- self.fcpError = None # last error (either this is set or dcpResult)
- self.fcpIdentifier = identifier #
+ self.fcpIdentifier = identifier # job identifier
self.fcpMessage = message # message send to node
- self.fcpResult = None # job result
+ self.fcpResult = None # job result (bool error, Message msg)
self.fcpTimeStart = 0 # time the job was started
self.fcpTimeStop = 0 # time the job was stopped
- self.fcpStopped = False
-
- def displayName(self):
- """Returns the display name of the job
- @return: (str) display name
- """
- return 'JobBase'
-
- def start(self):
+
+ def handleStart(self):
"""Starts the job"""
- self.fcpStopped = False
+ self.fcpResult = None
self.fcpTimeStart = time.time()
self.fcpClient.sendMessageEx(self.fcpMessage)
-
- def error(self, msg):
- """Called on job completion if an error was encounterd while runnng the job
- @param msg: (Message) to pass to the job
- """
- self.fcpStopped = True
- self.fcpTimeStop = time.time()
- self.fcpError = msg
- self.fcpResult = None
-
- def stop(self, msg):
+
+ def handleStop(self, flagError, msg):
"""Called on job completion to stop the job
+ @param flagError: True if an error was encountered, False otherwise
@param msg: (Message) to pass to the job
"""
- self.fcpStopped = True
self.fcpTimeStop = time.time()
- self.fcpError = None
- self.fcpResult = msg
+ self.fcpResult = (flagError, msg)
class JobClientHello(JobBase):
@@ -532,11 +524,11 @@
@param expectedVersion: (str) node version expected
"""
message = Message(
- Messages.ClientHello,
+ Message.ClientHello,
Name=name if name is not None else newIdentifier(),
ExpectedVersion=expectedVersion,
)
- JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
+ JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ClientHello, message)
def displayName(self):
return 'ClientHello'
@@ -550,16 +542,13 @@
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
message = Message(
- Messages.ListPeers,
+ Message.ListPeers,
WithMetadata=fcpBool(withMetaData),
WithVolatile=fcpBool(withVolantile),
)
- JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
+ JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
- def displayName(self):
- return 'ListPeers'
-
def handlePeer(self,msg):
"""Handles the next peer send by the node in form of a 'Peer' message
while the job is running. Overwrite to process.
@@ -592,7 +581,7 @@
"""
identifier = newIdentifier()
message = Message(
- Messages.ClientGet,
+ Message.ClientGet,
Identifier=identifier,
URI=uri,
MaxSize='0',
@@ -602,37 +591,25 @@
)
JobBase.__init__(self, fcpClient, identifier, message)
-
- def displayName(self):
- return 'GetFileInfo'
-
+
def handleProgress(self, msg):
"""Handles the next progress made. Overwrite to process.
"""
-
-
- def error(self, msg):
- JobBase.error(self, msg)
- if msg.name == Messages.GetFailed:
- if msg['Code'] == FetchErrors.TooBig:
- self.fcpError = None
- self.fcpResult = (
- msg.get('ExpectedMetadata.ContentType', ''),
- msg.get('ExpectedDataLength', '')
- )
- #else:
- # raise ValueError('Unhandled message: %s' % msg.name)
-
- def stop(self, msg):
- JobBase.stop(self, msg)
- if msg.name == Messages.DataFound:
+ def handleStop(self, flagError, msg):
+ JobBase.handleStop(self,flagError, msg)
+ if msg.name == Message.DataFound:
self.fcpResult = (
msg.get('Metadata.ContentType', ''),
msg.get('DataLength', '')
)
- else:
- raise ValueError('Unhandled message: %s' % msg.name)
+ elif msg.name == Message.GetFailed:
+ if msg['Code'] == FetchError.TooBig:
+ self.fcpResult = (False, msg)
+ self.fcpResult = (
+ msg.get('ExpectedMetadata.ContentType', ''),
+ msg.get('ExpectedDataLength', '')
+ )
@@ -648,7 +625,7 @@
raise ValueError('No such directory: %r' % directory)
message = Message(
- Messages.TestDDARequest,
+ Message.TestDDARequest,
Directory=directory,
WantReadDirectory=fcpBool(read),
WantWriteDirectory=fcpBool(write),
@@ -656,9 +633,7 @@
JobBase.__init__(self, fcpClient, directory, message)
self.fcpTmpFile = None
- def displayName(self):
- return 'TestDDA'
-
+
def handleTestDDAReply(self, msg):
fpathWrite = msg.params.get('WriteFilename', None)
fpathRead = msg.params.get('ReadFilename', None)
@@ -677,22 +652,16 @@
readContent = ''
self.fcpClient.sendMessage(
- Messages.TestDDAResponse,
+ Message.TestDDAResponse,
Directory=msg['Directory'],
ReadContent=readContent,
)
- def error(self, msg):
- JobBase.error(self, msg)
+ def handleStop(self, flagError, msg):
+ JobBase.handleStop(self, flagError, msg)
saveRemoveFile(self.fcpTmpFile)
self.fcpTmpFile = None
-
-
- def stop(self, msg):
- JobBase.stop(self, msg)
- saveRemoveFile(self.fcpTmpFile)
- self.fcpTmpFile = None
#**************************************************************************
# fcp client
@@ -706,8 +675,8 @@
ClientClose = 'Closing client'
- MessageSend = 'Message send'
- MessageReceived = 'Message received'
+ MessageSend = 'SendMessage'
+ MessageReceived = 'ReceivedMessage'
JobStart = 'Starting job: '
JobStop = 'Stopping job: '
@@ -733,7 +702,7 @@
"""
@param name: name of the client instance or '' (for debugging)
@param errorHandler: will be called if the socket conncetion to the node is dead
- with two params: FcpSocketError + details. When the handler is called the client
+ with two params: SocketError + details. When the handler is called the client
is already closed.
@param verbosity: verbosity level for debugging
@param logMessages: LogMessages class containing messages
@@ -747,7 +716,7 @@
'complete': [], # ???
}
self._errorHandler = errorHandler
- self._log = logging.getLogger(NameClient + ':' + name)
+ self._log = logging.getLogger(name)
self._logMessages = logMessages
self._lock = thread.allocate_lock()
self._socket = None
@@ -792,10 +761,15 @@
pass
else:
self._log.info(self._logMessages.Connected)
+
+ #NOTE: thought I could leave ClientHelloing up to the caller
+ # but instad of responding with ClientHelloMustBeFirst
+ # as expected the socket simply breaks. So take it over.
job = JobClientHello(self)
self.jobAdd(job, synchron=True)
- assert job.fcpError is None, 'Error should have been caught by handleMessage()'
- return job.fcpResult
+ error, msg = job.fcpResult
+ assert not error, 'Error should have been caught by handleMessage()'
+ return msg
self._log.info(self._logMessages.ConnectionRetry)
@@ -811,45 +785,45 @@
"""Handles the next message from the freenet node
@param msg: Message() to handle
"""
- self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
+ self._log.debug(self._logMessages.MessageReceived + msg.pprint())
- if msg.name == Messages.NodeHello:
+ if msg.name == Message.NodeHello:
#connectionIdentifier = msg['ConnectionIdentifier']
- self.jobStop(JobIdentifiers.ClientHello, msg)
+ self.jobStop(FixedJobIdentifiers.ClientHello, msg)
- elif msg.name == Messages.ProtocolError:
+ elif msg.name == Message.ProtocolError:
code = msg['Code']
- #if code == ProtocolErrors.NoLateClientHellos:
- # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
- #elif code == ProtocolErrors.ClientHelloMustBeFirst:
- # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+ #if code == ProtocolError.NoLateClientHellos:
+ # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True)
+ #elif code == ProtocolError.ClientHelloMustBeFirst:
+ # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True)
#else:
identifier = msg.get('Identifier', None)
if identifier is None:
#TODO: inform caller
- raise FcpProtocolError(msg)
+ raise ProtocolError(msg)
else:
- self.jobStop(identifier, msg, error=True)
+ self.jobStop(identifier, msg, flagError=True)
- elif msg.name == Messages.Peer:
- self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg)
+ elif msg.name == Message.Peer:
+ self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
- elif msg.name == Messages.EndListPeers:
- self.jobStop(IdentifierListPeers, msg)
+ elif msg.name == Message.EndListPeers:
+ self.jobStop(FixedJobIdentifiers.ListPeers, msg)
- elif msg.name == Messages.GetFailed:
- self.jobStop(msg['Identifier'], msg, error=True)
+ elif msg.name == Message.GetFailed:
+ self.jobStop(msg['Identifier'], msg, flagError=True)
- elif msg.name == Messages.SimpleProgress:
+ elif msg.name == Message.SimpleProgress:
self.jobNotify(msg['Identifier'], 'handleProgress', msg)
- elif msg.name == Messages.TestDDAReply:
+ elif msg.name == Message.TestDDAReply:
self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
- elif msg.name == Messages.TestDDAComplete:
+ elif msg.name == Message.TestDDAComplete:
self.jobStop(msg['Directory'], msg)
- elif msg.name == Messages.IdentifierCollision:
+ elif msg.name == Message.IdentifierCollision:
pass
@@ -867,10 +841,10 @@
finally:
self._lock.release()
- self._log.info(self._logMessages.JobStart + job.displayName())
- job.start()
+ self._log.info(self._logMessages.JobStart + job.fcpMessage.name)
+ job.handleStart()
if synchron:
- while not job.fcpStopped:
+ while job.fcpResult is None:
self.next()
@@ -892,11 +866,11 @@
#TODO: quite unclear when to remove a job
- def jobStop(self, identifier, msg, error=False):
+ def jobStop(self, identifier, msg, flagError=False):
"""Stops a job
@param identifier: identifier of the job to stop
@param msg: Message() to pass to the job as result
- @param error: set to True to indicate unsuccessful completion of the job, True otherwisse
+ @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse
"""
self._lock.acquire(True)
try:
@@ -912,11 +886,8 @@
if job is None:
raise ValueError('No such job: %r' % identifier)
- self._log.info(self._logMessages.JobStop + job.displayName())
- if error:
- job.error(msg)
- else:
- job.stop(msg)
+ self._log.info(self._logMessages.JobStop + job.fcpMessage.name)
+ job.handleStop(flagError, msg)
#TODO: some info when all jobs are completed
@@ -931,7 +902,7 @@
def readMessage(self):
"""Reads the next message directly from the socket and dispatches it
@return: (Message) the next message read from the socket
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ @raise SocketError: if the socket connection to the node dies unexpectedly
If an error handler is passed to the client it is called emidiately before the error
is raised.
"""
@@ -949,8 +920,8 @@
self._log.info(self._logMessages.SocketDead)
self.close()
if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d) #!!
+ self._errorHandler(SocketError, d)
+ raise SocketError(d) #!!
if p == '\r': # ignore
continue
@@ -975,8 +946,8 @@
self._log.info(self._logMessages.SocketDead)
self.close()
if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d) #!!
+ self._errorHandler(SocketError, d)
+ raise SocketError(d) #!!
else:
head, sep, tail = line.partition('=')
@@ -1019,7 +990,7 @@
@param data: data to atatch to the message
@param params: {para-name: param-calue, ...} of parameters to pass along
with the message (see freenet protocol)
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ @raise SocketError: if the socket connection to the node dies unexpectedly
If an error handler is passed to the client it is called emidiately before the error
is raised.
"""
@@ -1030,20 +1001,19 @@
"""Sends a message to freenet
@param msg: (Message) message to send
@return: Message
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly.
+ @raise SocketError: if the socket connection to the node dies unexpectedly.
If an error handler is passed to the client it is called emidiately before the error
is raised.
"""
- rawMsg = msg.toString()
- self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint())
+ self._log.debug(self._logMessages.MessageSend + msg.pprint())
try:
- self._socket.sendall(rawMsg)
- except Exception, d:
+ self._socket.sendall(msg.toString())
+ except socket.error, d:
self._log.info(self._logMessages.SocketDead)
self.close()
if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d)
+ self._errorHandler(SocketError, d)
+ raise SocketError(d)
return msg
@@ -1061,7 +1031,8 @@
#*****************************************************************************
if __name__ == '__main__':
c = FcpClient(name='test', verbosity=logging.DEBUG)
- if c.connect():
+ nodeHello = c.connect()
+ if nodeHello is not None:
def foo():
job1 = JobClientHello(c)
c.jobAdd(job1)
@@ -1070,8 +1041,8 @@
print '---------------------------'
print job1.fcpError
print job1.fcpResult
- print job1.fcpTime
print '---------------------------'
+ # should raise
#foo()
@@ -1081,18 +1052,25 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpError
print job2.fcpResult
- print job2.fcpTime
print '---------------------------'
-
+ #foo()
+
def foo():
+ job2 = JobListPeers(c)
+ c.jobAdd(job2)
+ c.run()
+ print '---------------------------'
+ print job2.fcpResult
+ print '---------------------------'
+ #foo()
+
+
+ def foo():
job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpError
print job2.fcpResult
- print job2.fcpTime
print '---------------------------'
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-20 10:02:24
|
Revision: 8
http://fclient.svn.sourceforge.net/fclient/?rev=8&view=rev
Author: jUrner
Date: 2007-10-20 03:02:28 -0700 (Sat, 20 Oct 2007)
Log Message:
-----------
renamed to be able to map to fcp version numbers to modules
Added Paths:
-----------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Removed Paths:
-------------
trunk/fclient/fclient_lib/fcp/fcp20.py
Deleted: trunk/fclient/fclient_lib/fcp/fcp20.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:00:20 UTC (rev 7)
+++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:02:28 UTC (rev 8)
@@ -1,1098 +0,0 @@
-
-import atexit
-import logging
-import os
-import re
-import socket
-import subprocess
-import sys
-import time
-import thread
-import uuid
-
-logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
-#**************************************************************
-# consts
-#**************************************************************
-NameClient = 'Fcp20Client'
-DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
-DefaultFcpPort = 9481
-try:
- DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
-except: pass
-SocketTimeout = 0.1
-
-class JobIdentifiers:
- """Fixed job identifiers
- @note: he client can only handle one job of these at a time
- """
- ClientHello = 'ClientHello'
- ListPeers = 'ListPeers'
-
-class Messages:
- """All messages supported by the client"""
-
- # client messages
- ClientHello = 'ClientHello'
- ListPeer = 'ListPeer' # (since 1045)
- ListPeers = 'ListPeers'
- ListPeerNotes = 'ListPeerNotes'
- AddPeer = 'AddPeer'
- ModifyPeer = 'ModifyPeer'
- ModifyPeerNote = 'ModifyPeerNote'
- RemovePeer = 'RemovePeer'
- GetNode = 'GetNode'
- GetConfig = 'GetConfig' # (since 1027)
- ModifyConfig = 'ModifyConfig' # (since 1027)
- TestDDARequest = 'TestDDARequest' # (since 1027)
- TestDDAResponse = 'TestDDAResponse' # (since 1027)
- GenerateSSK = 'GenerateSSK'
- ClientPut = 'ClientPut'
- ClientPutDiskDir = 'ClientPutDiskDir'
- ClientPutComplexDir = 'ClientPutComplexDir'
- ClientGet = 'ClientGet'
- SubscribeUSK = 'SubscribeUSK'
- WatchGlobal = 'WatchGlobal'
- GetRequestStatus = 'GetRequestStatus'
- ListPersistentRequests = 'ListPersistentRequests'
- RemovePersistentRequest = 'RemovePersistentRequest'
- ModifyPersistentRequest = 'ModifyPersistentRequest'
- Shutdown = 'Shutdown'
-
- # node messages
- NodeHello = 'NodeHello'
- CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
- Peer = 'Peer'
- PeerNote = 'PeerNote'
- EndListPeers = 'EndListPeers'
- EndListPeerNotes = 'EndListPeerNotes'
- PeerRemoved = 'PeerRemoved'
- NodeData = 'NodeData'
- ConfigData = 'ConfigData' # (since 1027)
- TestDDAReply = 'TestDDAReply' # (since 1027)
- TestDDAComplete = 'TestDDAComplete' # (since 1027)
- SSKKeypair = 'SSKKeypair'
- PersistentGet = 'PersistentGet'
- PersistentPut = 'PersistentPut'
- PersistentPutDir = 'PersistentPutDir'
- URIGenerated = 'URIGenerated'
- PutSuccessful = 'PutSuccessful'
- PutFetchable = 'PutFetchable'
- DataFound = 'DataFound'
- AllData = 'AllData'
- StartedCompression = 'StartedCompression'
- FinishedCompression = 'FinishedCompression'
- SimpleProgress = 'SimpleProgress'
- EndListPersistentRequests = 'EndListPersistentRequests'
- PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
- PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
- PutFailed = 'PutFailed'
- GetFailed = 'GetFailed'
- ProtocolError = 'ProtocolError'
- IdentifierCollision = 'IdentifierCollision'
- UnknownNodeIdentifier = 'UnknownNodeIdentifier'
- UnknownPeerNoteType = 'UnknownPeerNoteType'
- SubscribedUSKUpdate = 'SubscribedUSKUpdate'
-
-
-class Priorities:
- """All priorities supported by the client"""
-
- Maximum = 0
- Interactive = 1
- SemiInteractive = 2
- Updatable = 3
- Bulk = 4
- Prefetch = 5
- Minimum = 6
-
- PriorityMin = Minimum
- PriorityDefault = Bulk
-
-
-# errors
-
-class FetchErrors:
- """All fetch errors supported by the client"""
-
- MaxArchiveRecursionExceeded = '1'
- UnknownSplitfileMetadata = '2'
- UnknownMetadata = '3'
- InvalidMetadata = '4'
- ArchiveFailure = '5'
- BlockDecodeError = '6'
- MaxMetadataLevelsExceeded = '7'
- MaxArchiveRestartsExceeded = '8'
- MaxRecursionLevelExceeded = '9'
- NotAnArchve = '10'
- TooManyMetastrings = '11'
- BucketError = '12'
- DataNotFound = '13'
- RouteNotFound = '14'
- RejectedOverload = '15'
- TooManyRedirects = '16'
- InternalError = '17'
- TransferFailed = '18'
- SplitfileError = '19'
- InvalidUri = '20'
- TooBig = '21'
- MetadataTooBig = '22'
- TooManyBlocks = '23'
- NotEnoughMetastrings = '24'
- Canceled = '25'
- ArchiveRestart = '26'
- PermanentRedirect = '27'
- NotAllDataFound = '28'
-
-
-class InsertErrors:
- """All insert errors supported by the client"""
-
- InvalidUri = '1'
- BucketError = '2'
- InternalError = '3'
- RejectedOverload = '4'
- RouteNotFound = '5'
- FatalErrorInBlocks = '6'
- TooManyRetriesInBlock = '7'
- RouteReallyNotFound = '8'
- Collision = '9'
- Canceled = '10'
-
-
-class ProtocolErrors:
- """All protocol errors supported by the client"""
-
- ClientHelloMustBeFirst = '1'
- NoLateClientHellos = '2'
- MessageParseError = '3'
- UriParseError = '4'
- MissingField = '5'
- ErrorParsingNumber = '6'
- InvalidMessage = '7'
- InvalidField = '8'
- FileNotFound = '9'
- DiskTargetExists = '10'
- SameDirectoryExpected = '11'
- CouldNotCreateFile = '12'
- CouldNotWriteFile = '13'
- CouldNotRenameFile = '14'
- NoSuchIdentifier = '15'
- NotSupported = '16'
- InternalError = '17'
- ShuttingDown = '18'
- NoSuchNodeIdentifier = '19' # Unused since 995
- UrlParseError = '20'
- ReferenceParseError = '21'
- FileParseError = '22'
- NotAFile = '23'
- AccessDenied = '24'
- DDADenied = '25'
- CouldNotReadFile = '26'
- ReferenceSignature = '27'
- CanNotPeerWithSelf = '28'
- PeerExists = '29'
- OpennetDisabled = '30'
- DarknetOnly = '31'
-
-#**********************************************************************
-# functions
-#**********************************************************************
-def newIdentifier():
- """Returns a new unique identifier
- @return: (str) uuid
- """
- return str(uuid.uuid4())
-
-def saveReadFile(fpath):
- """Reads contents of a file in the savest manner possible
- @param fpath: file to write
- @return: contents of the file if successful, None otherwise
- """
- read = None
- try:
- fp = open(fpath, 'rb')
- except: pass
- else:
- try:
- read = fp.read()
- except: pass
- fp.close()
- return read
-
-def saveRemoveFile(fpath):
- """Savely removes a file
- @param fpath: filepath of the file to remove or None
- @return: True if the file was removed, False otherwise
- """
- if fpath is not None:
- if os.path.isfile(fpath):
- os.remove(fpath)
- return True
- return False
-
-
-def saveWriteFile(fpath, data):
- """Writes data to a file i the savest manner possible
- @param fpath: file to write
- @param data: data to write to file
- @return: True if successful, False otherwise
- """
- written = False
- try:
- fp = open(fpath, 'wb')
- except: pass
- else:
- try:
- fp.write(data)
- written = True
- except:
- fp.Close()
- else:
- fp.close()
- return written
-
-def startFreenet(cmdline):
- """Starts freenet
- @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat')
- @return: (string) whatever freenet returns
- """
- #TODO: on windows it may be necessary to hide the comand window
- p = subprocess.Popen(
- args=cmdline,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- stdout, stderr = p.communicate()
- return stdout
-
-
-def fcpBool(pythonBool):
- """Converts a python bool to a fcp bool
- @param pythonBool: (bool)
- @return: (str) 'true' or 'false'
- """
- if pythonBool:
- return 'true'
- return 'false'
-
-def pythonBool(fcpBool):
- """Converts a fcp bool to a python bool
- @param pythonBool: 'true' or 'false'
- @return: (bool) True or False
- """
- return fcpBool == 'true'
-
-#**********************************************************************
-# classes
-#**********************************************************************
-class FcpSocketError(Exception): pass
-class FcpProtocolError(Exception):
- def __init__(self, msg):
- """
- @param msg: (Message) ProtocolError message
- """
- self.value = '%s (%s, %s)' % (
- msg.get('CodeDescription', 'Unknown error') ,
- msg['Code'],
- msg.get('ExtraDescription', '...'),
- )
-
- def __str__(self): return self.value
-
-
-
-class FcpUri(object):
- """Wrapper class for freenet uris"""
-
-
- KeySSK = 'SSK@'
- KeyKSK = 'KSK@'
- KeyCHK = 'CHK@'
- KeyUSK = 'USK@'
- KeySVK = 'SVK@'
- KeyUnknown = ''
- KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK)
-
- ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I)
- ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I)
-
-
- def __init__(self, uri):
- """
- @param uri: uri to wrap
- @param cvar ReUriPattern: pattern matching a freenet uri
- @param cvar ReKeyPattern: pattern matching the key type of a freenet uri
-
- @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible
-
-
- >>> uri = FcpUri('freenet:SSK@foo/bar')
- >>> str(uri)
- 'SSK@foo/bar'
- >>> uri.keyType() == FcpUri.KeySSK
- True
- >>> uri.split()
- ('SSK@foo', 'bar')
- >>> uri.fileName()
- 'bar'
-
- >>> uri = FcpUri('http://SSK@foo/bar')
- >>> str(uri)
- 'SSK@foo/bar'
-
- # uris not containing freenet keys are left unchanged
- >>> uri = FcpUri('http://foo/bar')
- >>> str(uri)
- 'http://foo/bar'
- >>> uri.keyType() == FcpUri.KeyUnknown
- True
- >>> uri.split()
- ('http://foo/bar', '')
- >>> uri.fileName()
- 'http://foo/bar'
-
- """
- self._uri = uri
-
- result = self.ReUriPattern.search(uri)
- if result is not None:
- self._uri = result.group(0)
-
- def __str__(self):
- return str(self._uri)
-
- def __unicode__(self):
- return unicode(self._uri)
-
- def keyType(self):
- """Retuns the key type of the uri
- @return: one of the Key* consts
- """
- result = self.ReKeyPattern.search(self._uri)
- if result is not None:
- return result.group(0).upper()
- return self.KeyUnknown
-
- def split(self):
- """Splits the uri
- @return: tuple(freenet-key, file-name)
- """
- if self.keyType() != self.KeyUnknown:
- head, sep, tail = self._uri.partition('/')
- return head, tail
- return self._uri, ''
-
- def fileName(self):
- """Returns the filename part of the uri
- @return: str
- """
- head, tail = self.split()
- if tail:
- return tail
- return self._uri
-
-
-class Message(object):
- """Class wrapping a freenet message"""
-
- Name = 'UMessage'
-
-
- def __init__(self, name, data=None, **params):
- """
- @param name: messge name
- @param data: data associated to the messge (not yet implemented)
- @param params: {field-name: value, ...} of parameters of the message
- @note: all params can be accessed as attributes of the class
- """
- self.data = data
- self.name = name
- self.params = params
-
-
- def toString(self):
- """Returns a string with the formated message, ready to be send"""
-
- # TODO: "Data" not yet implemented
- out = [self.name, ]
- for param, value in self.params.items():
- out.append('%s=%s' % (param, value))
- out.append('EndMessage\n')
- return '\n'.join(out)
-
-
- def pprint(self):
- """Returns the message as nicely formated human readable string"""
-
- out = [self.name, ]
- for param, value in self.params.items():
- out.append(' %s=%s' % (param, value))
- out.append('EndMessage')
- out.append('')
- return '\n'.join(out)
-
- def __getitem__(self, name):
- """Returns the message parameter 'name' """
- return self.params[name]
-
- def get(self, name, default=None):
- """Returns the message parameter 'name' or 'default' """
- return self.params.get(name, default)
-
- def __setitem__(self, name, value):
- """Sets the message parameter 'name' to 'value' """
- self.params[name] = value
-
-
-class MessageSocketTimeout(Message):
-
- Name = 'USocketTimeout'
-
- def __init__(self):
- Message.__init__(self, self.Name)
-
-
-
-#**************************************************************************
-# jobs
-#**************************************************************************
-class JobBase(object):
- """Base class for jobs"""
-
- _fcp_auto_remove_ = True
-
- def __init__(self, fcpClient, identifier, message):
- """
- @param fcpClient: FcpClient() instance
- @param identifier: (str) identifier of the job
- @param message: (Message) to send to the node whne the job ist started
- @ivar fcpClient: FcpClient() instance
- @ivar fcpError: holding the error message if an error was encountered while running
- the job, None otherwise
- @ivar fcpIdentifier: identifier of the job
- @ivar fcpMessage: initial message send to the node
- @ivar fcpResult: if no error was encountered, holding the result of the job when complete
- @ivar fcpTime: when the job is complete, holding the time the job took to complete
- """
-
- self.fcpClient = fcpClient # FcpClient() instance the job belongs to
- self.fcpError = None # last error (either this is set or dcpResult)
- self.fcpIdentifier = identifier #
- self.fcpMessage = message # message send to node
- self.fcpResult = None # job result
- self.fcpTimeStart = 0 # time the job was started
- self.fcpTimeStop = 0 # time the job was stopped
- self.fcpStopped = False
-
- def displayName(self):
- """Returns the display name of the job
- @return: (str) display name
- """
- return 'JobBase'
-
- def start(self):
- """Starts the job"""
- self.fcpStopped = False
- self.fcpTimeStart = time.time()
- self.fcpClient.sendMessageEx(self.fcpMessage)
-
- def error(self, msg):
- """Called on job completion if an error was encounterd while runnng the job
- @param msg: (Message) to pass to the job
- """
- self.fcpStopped = True
- self.fcpTimeStop = time.time()
- self.fcpError = msg
- self.fcpResult = None
-
- def stop(self, msg):
- """Called on job completion to stop the job
- @param msg: (Message) to pass to the job
- """
- self.fcpStopped = True
- self.fcpTimeStop = time.time()
- self.fcpError = None
- self.fcpResult = msg
-
-
-class JobClientHello(JobBase):
- """Sed a ClientHello message to the node
-
- @note: this must be the first message passed to the node. If everything
- goes well, you will get a NodeHello in response.
- """
-
- _fcp_auto_remove_ = True
-
- def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
- """
- @param name: (str) connection name or None, to use an arbitrary name
- @param expectedVersion: (str) node version expected
- """
- message = Message(
- Messages.ClientHello,
- Name=name if name is not None else newIdentifier(),
- ExpectedVersion=expectedVersion,
- )
- JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
-
- def displayName(self):
- return 'ClientHello'
-
-
-class JobListPeers(JobBase):
- """Lists all known peers of the node
- """
-
- _fcp_auto_remove_ = True
-
- def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
- message = Message(
- Messages.ListPeers,
- WithMetadata=fcpBool(withMetaData),
- WithVolatile=fcpBool(withVolantile),
- )
- JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
-
-
- def displayName(self):
- return 'ListPeers'
-
- def handlePeer(self,msg):
- """Handles the next peer send by the node in form of a 'Peer' message
- while the job is running. Overwrite to process.
- """
-
-
-
-class JobGetFileInfo(JobBase):
- """Tries to retieve information about a file. If everything goes well
-
- On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
- Note, that both members may be '' (empty string)
-
- """
-
- _fcp_auto_remove_ = False
-
-
- # idea is to provoke a GetFailed message and take mimetype and size from it
- def __init__(self, fcpClient, uri, **params):
- """
- @param fcpClient: FcpClient() instance
- @param uri: uri of the file to retrieve info for
- @param params: additional parameters:
- IgnoreDS='true' / 'false'
- DSOnly='true' / 'false'
- MaxRetries=-1 ...N
- PriorityClass=Priority*
-
- """
- identifier = newIdentifier()
- message = Message(
- Messages.ClientGet,
- Identifier=identifier,
- URI=uri,
- MaxSize='0',
- ReturnType='none',
- Verbosity='1',
- **params
- )
- JobBase.__init__(self, fcpClient, identifier, message)
-
-
- def displayName(self):
- return 'GetFileInfo'
-
- def handleProgress(self, msg):
- """Handles the next progress made. Overwrite to process.
- """
-
-
- def error(self, msg):
- JobBase.error(self, msg)
- if msg.name == Messages.GetFailed:
- if msg['Code'] == FetchErrors.TooBig:
- self.fcpError = None
- self.fcpResult = (
- msg.get('ExpectedMetadata.ContentType', ''),
- msg.get('ExpectedDataLength', '')
- )
- #else:
- # raise ValueError('Unhandled message: %s' % msg.name)
-
-
- def stop(self, msg):
- JobBase.stop(self, msg)
- if msg.name == Messages.DataFound:
- self.fcpResult = (
- msg.get('Metadata.ContentType', ''),
- msg.get('DataLength', '')
- )
- else:
- raise ValueError('Unhandled message: %s' % msg.name)
-
-
-
-#TODO: handle case where directories are registered multiple times
-class JobTestDDA(JobBase):
- """Tests a directory for read / write accesss
- """
-
- _fcp_auto_remove_ = False
-
- def __init__(self, fcpClient, directory, read=True, write=True):
- if not os.path.isdir(directory):
- raise ValueError('No such directory: %r' % directory)
-
- message = Message(
- Messages.TestDDARequest,
- Directory=directory,
- WantReadDirectory=fcpBool(read),
- WantWriteDirectory=fcpBool(write),
- )
- JobBase.__init__(self, fcpClient, directory, message)
- self.fcpTmpFile = None
-
- def displayName(self):
- return 'TestDDA'
-
- def handleTestDDAReply(self, msg):
- fpathWrite = msg.params.get('WriteFilename', None)
- fpathRead = msg.params.get('ReadFilename', None)
- readContent = ''
- if fpathWrite is not None:
- written = saveWriteFile(fpathWrite, msg['ContentToWrite'])
- if not written:
- if os.path.isfile(fpathWrite):
- os.remove(fpathWrite)
- else:
- self.fcpTmpFile = fpathWrite
-
- if fpathRead is not None:
- readContent = saveReadFile(fpathRead)
- if readContent is None:
- readContent = ''
-
- self.fcpClient.sendMessage(
- Messages.TestDDAResponse,
- Directory=msg['Directory'],
- ReadContent=readContent,
- )
-
-
- def error(self, msg):
- JobBase.error(self, msg)
- saveRemoveFile(self.fcpTmpFile)
- self.fcpTmpFile = None
-
-
- def stop(self, msg):
- JobBase.stop(self, msg)
- saveRemoveFile(self.fcpTmpFile)
- self.fcpTmpFile = None
-
-#**************************************************************************
-# fcp client
-#**************************************************************************
-class LogMessages:
- """Message strings used for log infos"""
- Connecting = 'Connecting to node...'
- Connected = 'Connected to node'
- ConnectionRetry = 'Connecting to node failed... retrying'
- ConnectingFailed = 'Connecting to node failed'
-
- ClientClose = 'Closing client'
-
- MessageSend = 'Message send'
- MessageReceived = 'Message received'
-
- JobStart = 'Starting job: '
- JobStop = 'Stopping job: '
- JobsCompleted = 'All jobs completed'
-
- KeyboardInterrupt = 'Keyboard interrupt'
- SocketDead = 'Socket is dead'
-
-
-#TODO: no idea what happens on reconnect if socket died. What about running jobs?
-#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
-#TODO: no idea if to add support for pending jobs and queue management here
-class FcpClient(object):
- """Fcp client implementation"""
-
-
- def __init__(self,
- name='',
- errorHandler=None,
- verbosity=logging.WARNING,
- logMessages=LogMessages
- ):
- """
- @param name: name of the client instance or '' (for debugging)
- @param errorHandler: will be called if the socket conncetion to the node is dead
- with two params: FcpSocketError + details. When the handler is called the client
- is already closed.
- @param verbosity: verbosity level for debugging
- @param logMessages: LogMessages class containing messages
- """
-
- self._isConnected = False
- self._jobs = {
- 'all': {},
- 'pending': [], # ???
- 'running': [],
- 'complete': [], # ???
- }
- self._errorHandler = errorHandler
- self._log = logging.getLogger(NameClient + ':' + name)
- self._logMessages = logMessages
- self._lock = thread.allocate_lock()
- self._socket = None
-
- self.setVerbosity(verbosity)
- atexit.register(self.close)
-
- def close(self):
- """Closes the client
- @note: make shure to call close() when done with the client
- """
- self._log.info(self._logMessages.ClientClose)
- if self._socket is not None:
- self._socket.close()
- self._socket = None
-
-
- #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
- def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
- """Establishes the connection to a freenet node
- @param host: (str) host of th node
- @param port: (int) port of the node
- @param repeat: (int) how many seconds try to connect before giving up
- @param timeout: (int) how much time to wait before another attempt to connect
- @return: (Message) NodeHello if successful,None otherwise
- """
- self._clientHello = None
- self._log.info(self._logMessages.Connecting)
-
- # poll untill freenet responds
- time_elapsed = 0
- while time_elapsed <= repeat:
-
- # try to Connect socket
- if self._socket is not None:
- self.close()
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.settimeout(SocketTimeout)
- try:
- self._socket.connect((host, port))
- except Exception, d:
- pass
- else:
- self._log.info(self._logMessages.Connected)
- job = JobClientHello(self)
- self.jobAdd(job, synchron=True)
- assert job.fcpError is None, 'Error should have been caught by handleMessage()'
- return job.fcpResult
-
- self._log.info(self._logMessages.ConnectionRetry)
-
- # continue polling
- time_elapsed += timeout
- time.sleep(timeout)
-
- self._log.info(self._logMessages.ConnectingFailed)
- return None
-
-
- def handleMessage(self, msg):
- """Handles the next message from the freenet node
- @param msg: Message() to handle
- """
- self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
-
- if msg.name == Messages.NodeHello:
- #connectionIdentifier = msg['ConnectionIdentifier']
- self.jobStop(JobIdentifiers.ClientHello, msg)
-
- elif msg.name == Messages.ProtocolError:
- code = msg['Code']
- #if code == ProtocolErrors.NoLateClientHellos:
- # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
- #elif code == ProtocolErrors.ClientHelloMustBeFirst:
- # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
- #else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- #TODO: inform caller
- raise FcpProtocolError(msg)
- else:
- self.jobStop(identifier, msg, error=True)
-
- elif msg.name == Messages.Peer:
- self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg)
-
- elif msg.name == Messages.EndListPeers:
- self.jobStop(IdentifierListPeers, msg)
-
- elif msg.name == Messages.GetFailed:
- self.jobStop(msg['Identifier'], msg, error=True)
-
- elif msg.name == Messages.SimpleProgress:
- self.jobNotify(msg['Identifier'], 'handleProgress', msg)
-
- elif msg.name == Messages.TestDDAReply:
- self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
-
- elif msg.name == Messages.TestDDAComplete:
- self.jobStop(msg['Directory'], msg)
-
- elif msg.name == Messages.IdentifierCollision:
- pass
-
-
- def jobAdd(self, job, synchron=False):
- """Adds a job to the client
- @param job: (Job*) job to add
- @param synchron: if True, wait untill the job is completed, if False return emidiately
- """
- self._lock.acquire(True)
- try:
- if job.fcpIdentifier in self._jobs['all']:
- raise ValueError('Duplicate job: %r' % job.identifier)
- self._jobs['all'][job.fcpIdentifier] = job
- self._jobs['running'].append(job)
- finally:
- self._lock.release()
-
- self._log.info(self._logMessages.JobStart + job.displayName())
- job.start()
- if synchron:
- while not job.fcpStopped:
- self.next()
-
-
-
- def jobNotify(self, identifier, handler, msg):
- """Notifies a job about an event while it is running
- @param identifier: identifier of the job to notify
- @param handler: (str) method of the job to call to handle the notification
- @param msg: Message() to pass to the job
- """
- self._lock.acquire(True)
- try:
- job = self._jobs['all'].get(identifier, None)
- finally:
- self._lock.release()
- if job is None:
- raise ValueError('No such job: %r' % identifier)
- getattr(job, handler)(msg)
-
-
- #TODO: quite unclear when to remove a job
- def jobStop(self, identifier, msg, error=False):
- """Stops a job
- @param identifier: identifier of the job to stop
- @param msg: Message() to pass to the job as result
- @param error: set to True to indicate unsuccessful completion of the job, True otherwisse
- """
- self._lock.acquire(True)
- try:
- job = self._jobs['all'].get(identifier, None)
- if job is not None:
- self._jobs['running'].remove(job)
- if job._fcp_auto_remove_:
- del self._jobs['all'][identifier]
- else:
- self._jobs['complete'].append(job)
- finally:
- self._lock.release()
-
- if job is None:
- raise ValueError('No such job: %r' % identifier)
- self._log.info(self._logMessages.JobStop + job.displayName())
- if error:
- job.error(msg)
- else:
- job.stop(msg)
-
-
- #TODO: some info when all jobs are completed
- def next(self):
- """Pumps the next message waiting
- @note: use this method instead of run() to run the client step by step
- """
- msg = self.readMessage()
- self.handleMessage(msg)
- return msg
-
- def readMessage(self):
- """Reads the next message directly from the socket and dispatches it
- @return: (Message) the next message read from the socket
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly
- If an error handler is passed to the client it is called emidiately before the error
- is raised.
- """
- msg = Message(None)
- buf = []
- while True:
-
- try:
- p = self._socket.recv(1)
- if not p: raise ValueError('Socket is dead')
- except socket.timeout, d: # no new messages in queue
- msg = MessageSocketTimeout()
- break
- except Exception, d:
- self._log.info(self._logMessages.SocketDead)
- self.close()
- if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d) #!!
-
- if p == '\r': # ignore
- continue
-
- if p != '\n':
- buf.append(p)
- continue
-
- line = ''.join(buf)
- if line in ('End', "EndMessage"):
- break
- buf = []
-
- if msg.name is None:
- msg.name = line
- elif line == 'Data':
- n = int(msg.params['DataLength'])
- try:
- msg.data = self._socket.recv(n)
- if not msg.data: raise ValueError('Socket is dead')
- except Exception, d:
- self._log.info(self._logMessages.SocketDead)
- self.close()
- if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d) #!!
-
- else:
- head, sep, tail = line.partition('=')
- msg.params[head] = tail
- if not sep:
- # TODO: chek for invalid messages or not
- pass
-
- return msg
-
-
- def run(self):
- """Runs the client untill all jobs passed to it are completed
- @note: use KeyboardInterrupt to stop prematurely
- """
-
- # TODO:
- # x. push pending jobs
- try:
- while True:
- if not self._lock.acquire(False):
- continue
-
- try:
- if not self._jobs['pending'] and not self._jobs['running']:
- self._log.info(self._logMessages.JobsCompleted)
- break
- finally:
- self._lock.release()
-
- self.next()
- except KeyboardInterrupt:
- self._log(self._logMessages.KeyboardInterrupt)
- self.close()
-
-
- def sendMessage(self, name, data=None, **params):
- """Sends a message to freenet
- @param name: name of the message to send
- @param data: data to atatch to the message
- @param params: {para-name: param-calue, ...} of parameters to pass along
- with the message (see freenet protocol)
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly
- If an error handler is passed to the client it is called emidiately before the error
- is raised.
- """
- return self.sendMessageEx(Message(name, data=data, **params))
-
-
- def sendMessageEx(self, msg):
- """Sends a message to freenet
- @param msg: (Message) message to send
- @return: Message
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly.
- If an error handler is passed to the client it is called emidiately before the error
- is raised.
- """
- rawMsg = msg.toString()
- self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint())
- try:
- self._socket.sendall(rawMsg)
- except Exception, d:
- self._log.info(self._logMessages.SocketDead)
- self.close()
- if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d)
- return msg
-
-
- def setLogMessages(self, logMessages):
- """"""
- self._logMessages = logMessages
-
-
- def setVerbosity(self, verbosity):
- """"""
- self._log.setLevel(verbosity)
-
-#*****************************************************************************
-#
-#*****************************************************************************
-if __name__ == '__main__':
- c = FcpClient(name='test', verbosity=logging.DEBUG)
- if c.connect():
- def foo():
- job1 = JobClientHello(c)
- c.jobAdd(job1)
-
- c.run()
- print '---------------------------'
- print job1.fcpError
- print job1.fcpResult
- print job1.fcpTime
- print '---------------------------'
- #foo()
-
-
- def foo():
- d = os.path.dirname(os.path.abspath(__file__))
- job2 = JobTestDDA(c, d)
- c.jobAdd(job2)
- c.run()
- print '---------------------------'
- print job2.fcpError
- print job2.fcpResult
- print job2.fcpTime
- print '---------------------------'
-
- def foo():
- job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
- c.jobAdd(job2)
- c.run()
- print '---------------------------'
- print job2.fcpError
- print job2.fcpResult
- print job2.fcpTime
- print '---------------------------'
- #foo()
Copied: trunk/fclient/fclient_lib/fcp/fcp2_0.py (from rev 7, trunk/fclient/fclient_lib/fcp/fcp20.py)
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py (rev 0)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 10:02:28 UTC (rev 8)
@@ -0,0 +1,1098 @@
+
+import atexit
+import logging
+import os
+import re
+import socket
+import subprocess
+import sys
+import time
+import thread
+import uuid
+
+logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
+#**************************************************************
+# consts
+#**************************************************************
+NameClient = 'Fcp20Client'
+DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
+DefaultFcpPort = 9481
+try:
+ DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
+except: pass
+SocketTimeout = 0.1
+
+class JobIdentifiers:
+ """Fixed job identifiers
+ @note: he client can only handle one job of these at a time
+ """
+ ClientHello = 'ClientHello'
+ ListPeers = 'ListPeers'
+
+class Messages:
+ """All messages supported by the client"""
+
+ # client messages
+ ClientHello = 'ClientHello'
+ ListPeer = 'ListPeer' # (since 1045)
+ ListPeers = 'ListPeers'
+ ListPeerNotes = 'ListPeerNotes'
+ AddPeer = 'AddPeer'
+ ModifyPeer = 'ModifyPeer'
+ ModifyPeerNote = 'ModifyPeerNote'
+ RemovePeer = 'RemovePeer'
+ GetNode = 'GetNode'
+ GetConfig = 'GetConfig' # (since 1027)
+ ModifyConfig = 'ModifyConfig' # (since 1027)
+ TestDDARequest = 'TestDDARequest' # (since 1027)
+ TestDDAResponse = 'TestDDAResponse' # (since 1027)
+ GenerateSSK = 'GenerateSSK'
+ ClientPut = 'ClientPut'
+ ClientPutDiskDir = 'ClientPutDiskDir'
+ ClientPutComplexDir = 'ClientPutComplexDir'
+ ClientGet = 'ClientGet'
+ SubscribeUSK = 'SubscribeUSK'
+ WatchGlobal = 'WatchGlobal'
+ GetRequestStatus = 'GetRequestStatus'
+ ListPersistentRequests = 'ListPersistentRequests'
+ RemovePersistentRequest = 'RemovePersistentRequest'
+ ModifyPersistentRequest = 'ModifyPersistentRequest'
+ Shutdown = 'Shutdown'
+
+ # node messages
+ NodeHello = 'NodeHello'
+ CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
+ Peer = 'Peer'
+ PeerNote = 'PeerNote'
+ EndListPeers = 'EndListPeers'
+ EndListPeerNotes = 'EndListPeerNotes'
+ PeerRemoved = 'PeerRemoved'
+ NodeData = 'NodeData'
+ ConfigData = 'ConfigData' # (since 1027)
+ TestDDAReply = 'TestDDAReply' # (since 1027)
+ TestDDAComplete = 'TestDDAComplete' # (since 1027)
+ SSKKeypair = 'SSKKeypair'
+ PersistentGet = 'PersistentGet'
+ PersistentPut = 'PersistentPut'
+ PersistentPutDir = 'PersistentPutDir'
+ URIGenerated = 'URIGenerated'
+ PutSuccessful = 'PutSuccessful'
+ PutFetchable = 'PutFetchable'
+ DataFound = 'DataFound'
+ AllData = 'AllData'
+ StartedCompression = 'StartedCompression'
+ FinishedCompression = 'FinishedCompression'
+ SimpleProgress = 'SimpleProgress'
+ EndListPersistentRequests = 'EndListPersistentRequests'
+ PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
+ PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
+ PutFailed = 'PutFailed'
+ GetFailed = 'GetFailed'
+ ProtocolError = 'ProtocolError'
+ IdentifierCollision = 'IdentifierCollision'
+ UnknownNodeIdentifier = 'UnknownNodeIdentifier'
+ UnknownPeerNoteType = 'UnknownPeerNoteType'
+ SubscribedUSKUpdate = 'SubscribedUSKUpdate'
+
+
+class Priorities:
+ """All priorities supported by the client"""
+
+ Maximum = 0
+ Interactive = 1
+ SemiInteractive = 2
+ Updatable = 3
+ Bulk = 4
+ Prefetch = 5
+ Minimum = 6
+
+ PriorityMin = Minimum
+ PriorityDefault = Bulk
+
+
+# errors
+
+class FetchErrors:
+ """All fetch errors supported by the client"""
+
+ MaxArchiveRecursionExceeded = '1'
+ UnknownSplitfileMetadata = '2'
+ UnknownMetadata = '3'
+ InvalidMetadata = '4'
+ ArchiveFailure = '5'
+ BlockDecodeError = '6'
+ MaxMetadataLevelsExceeded = '7'
+ MaxArchiveRestartsExceeded = '8'
+ MaxRecursionLevelExceeded = '9'
+ NotAnArchve = '10'
+ TooManyMetastrings = '11'
+ BucketError = '12'
+ DataNotFound = '13'
+ RouteNotFound = '14'
+ RejectedOverload = '15'
+ TooManyRedirects = '16'
+ InternalError = '17'
+ TransferFailed = '18'
+ SplitfileError = '19'
+ InvalidUri = '20'
+ TooBig = '21'
+ MetadataTooBig = '22'
+ TooManyBlocks = '23'
+ NotEnoughMetastrings = '24'
+ Canceled = '25'
+ ArchiveRestart = '26'
+ PermanentRedirect = '27'
+ NotAllDataFound = '28'
+
+
+class InsertErrors:
+ """All insert errors supported by the client"""
+
+ InvalidUri = '1'
+ BucketError = '2'
+ InternalError = '3'
+ RejectedOverload = '4'
+ RouteNotFound = '5'
+ FatalErrorInBlocks = '6'
+ TooManyRetriesInBlock = '7'
+ RouteReallyNotFound = '8'
+ Collision = '9'
+ Canceled = '10'
+
+
+class ProtocolErrors:
+ """All protocol errors supported by the client"""
+
+ ClientHelloMustBeFirst = '1'
+ NoLateClientHellos = '2'
+ MessageParseError = '3'
+ UriParseError = '4'
+ MissingField = '5'
+ ErrorParsingNumber = '6'
+ InvalidMessage = '7'
+ InvalidField = '8'
+ FileNotFound = '9'
+ DiskTargetExists = '10'
+ SameDirectoryExpected = '11'
+ CouldNotCreateFile = '12'
+ CouldNotWriteFile = '13'
+ CouldNotRenameFile = '14'
+ NoSuchIdentifier = '15'
+ NotSupported = '16'
+ InternalError = '17'
+ ShuttingDown = '18'
+ NoSuchNodeIdentifier = '19' # Unused since 995
+ UrlParseError = '20'
+ ReferenceParseError = '21'
+ FileParseError = '22'
+ NotAFile = '23'
+ AccessDenied = '24'
+ DDADenied = '25'
+ CouldNotReadFile = '26'
+ ReferenceSignature = '27'
+ CanNotPeerWithSelf = '28'
+ PeerExists = '29'
+ OpennetDisabled = '30'
+ DarknetOnly = '31'
+
+#**********************************************************************
+# functions
+#**********************************************************************
+def newIdentifier():
+ """Returns a new unique identifier
+ @return: (str) uuid
+ """
+ return str(uuid.uuid4())
+
+def saveReadFile(fpath):
+ """Reads contents of a file in the savest manner possible
+ @param fpath: file to write
+ @return: contents of the file if successful, None otherwise
+ """
+ read = None
+ try:
+ fp = open(fpath, 'rb')
+ except: pass
+ else:
+ try:
+ read = fp.read()
+ except: pass
+ fp.close()
+ return read
+
+def saveRemoveFile(fpath):
+ """Savely removes a file
+ @param fpath: filepath of the file to remove or None
+ @return: True if the file was removed, False otherwise
+ """
+ if fpath is not None:
+ if os.path.isfile(fpath):
+ os.remove(fpath)
+ return True
+ return False
+
+
+def saveWriteFile(fpath, data):
+ """Writes data to a file i the savest manner possible
+ @param fpath: file to write
+ @param data: data to write to file
+ @return: True if successful, False otherwise
+ """
+ written = False
+ try:
+ fp = open(fpath, 'wb')
+ except: pass
+ else:
+ try:
+ fp.write(data)
+ written = True
+ except:
+ fp.Close()
+ else:
+ fp.close()
+ return written
+
+def startFreenet(cmdline):
+ """Starts freenet
+ @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat')
+ @return: (string) whatever freenet returns
+ """
+ #TODO: on windows it may be necessary to hide the comand window
+ p = subprocess.Popen(
+ args=cmdline,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ stdout, stderr = p.communicate()
+ return stdout
+
+
+def fcpBool(pythonBool):
+ """Converts a python bool to a fcp bool
+ @param pythonBool: (bool)
+ @return: (str) 'true' or 'false'
+ """
+ if pythonBool:
+ return 'true'
+ return 'false'
+
+def pythonBool(fcpBool):
+ """Converts a fcp bool to a python bool
+ @param pythonBool: 'true' or 'false'
+ @return: (bool) True or False
+ """
+ return fcpBool == 'true'
+
+#**********************************************************************
+# classes
+#**********************************************************************
+class FcpSocketError(Exception): pass
+class FcpProtocolError(Exception):
+ def __init__(self, msg):
+ """
+ @param msg: (Message) ProtocolError message
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+
+ def __str__(self): return self.value
+
+
+
+class FcpUri(object):
+ """Wrapper class for freenet uris"""
+
+
+ KeySSK = 'SSK@'
+ KeyKSK = 'KSK@'
+ KeyCHK = 'CHK@'
+ KeyUSK = 'USK@'
+ KeySVK = 'SVK@'
+ KeyUnknown = ''
+ KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK)
+
+ ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I)
+ ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I)
+
+
+ def __init__(self, uri):
+ """
+ @param uri: uri to wrap
+ @param cvar ReUriPattern: pattern matching a freenet uri
+ @param cvar ReKeyPattern: pattern matching the key type of a freenet uri
+
+ @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible
+
+
+ >>> uri = FcpUri('freenet:SSK@foo/bar')
+ >>> str(uri)
+ 'SSK@foo/bar'
+ >>> uri.keyType() == FcpUri.KeySSK
+ True
+ >>> uri.split()
+ ('SSK@foo', 'bar')
+ >>> uri.fileName()
+ 'bar'
+
+ >>> uri = FcpUri('http://SSK@foo/bar')
+ >>> str(uri)
+ 'SSK@foo/bar'
+
+ # uris not containing freenet keys are left unchanged
+ >>> uri = FcpUri('http://foo/bar')
+ >>> str(uri)
+ 'http://foo/bar'
+ >>> uri.keyType() == FcpUri.KeyUnknown
+ True
+ >>> uri.split()
+ ('http://foo/bar', '')
+ >>> uri.fileName()
+ 'http://foo/bar'
+
+ """
+ self._uri = uri
+
+ result = self.ReUriPattern.search(uri)
+ if result is not None:
+ self._uri = result.group(0)
+
+ def __str__(self):
+ return str(self._uri)
+
+ def __unicode__(self):
+ return unicode(self._uri)
+
+ def keyType(self):
+ """Retuns the key type of the uri
+ @return: one of the Key* consts
+ """
+ result = self.ReKeyPattern.search(self._uri)
+ if result is not None:
+ return result.group(0).upper()
+ return self.KeyUnknown
+
+ def split(self):
+ """Splits the uri
+ @return: tuple(freenet-key, file-name)
+ """
+ if self.keyType() != self.KeyUnknown:
+ head, sep, tail = self._uri.partition('/')
+ return head, tail
+ return self._uri, ''
+
+ def fileName(self):
+ """Returns the filename part of the uri
+ @return: str
+ """
+ head, tail = self.split()
+ if tail:
+ return tail
+ return self._uri
+
+
+class Message(object):
+ """Class wrapping a freenet message"""
+
+ Name = 'UMessage'
+
+
+ def __init__(self, name, data=None, **params):
+ """
+ @param name: messge name
+ @param data: data associated to the messge (not yet implemented)
+ @param params: {field-name: value, ...} of parameters of the message
+ @note: all params can be accessed as attributes of the class
+ """
+ self.data = data
+ self.name = name
+ self.params = params
+
+
+ def toString(self):
+ """Returns a string with the formated message, ready to be send"""
+
+ # TODO: "Data" not yet implemented
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append('%s=%s' % (param, value))
+ out.append('EndMessage\n')
+ return '\n'.join(out)
+
+
+ def pprint(self):
+ """Returns the message as nicely formated human readable string"""
+
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append(' %s=%s' % (param, value))
+ out.append('EndMessage')
+ out.append('')
+ return '\n'.join(out)
+
+ def __getitem__(self, name):
+ """Returns the message parameter 'name' """
+ return self.params[name]
+
+ def get(self, name, default=None):
+ """Returns the message parameter 'name' or 'default' """
+ return self.params.get(name, default)
+
+ def __setitem__(self, name, value):
+ """Sets the message parameter 'name' to 'value' """
+ self.params[name] = value
+
+
+class MessageSocketTimeout(Message):
+
+ Name = 'USocketTimeout'
+
+ def __init__(self):
+ Message.__init__(self, self.Name)
+
+
+
+#**************************************************************************
+# jobs
+#**************************************************************************
+class JobBase(object):
+ """Base class for jobs"""
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, identifier, message):
+ """
+ @param fcpClient: FcpClient() instance
+ @param identifier: (str) identifier of the job
+ @param message: (Message) to send to the node whne the job ist started
+ @ivar fcpClient: FcpClient() instance
+ @ivar fcpError: holding the error message if an error was encountered while running
+ the job, None otherwise
+ @ivar fcpIdentifier: identifier of the job
+ @ivar fcpMessage: initial message send to the node
+ @ivar fcpResult: if no error was encountered, holding the result of the job when complete
+ @ivar fcpTime: when the job is complete, holding the time the job took to complete
+ """
+
+ self.fcpClient = fcpClient # FcpClient() instance the job belongs to
+ self.fcpError = None # last error (either this is set or dcpResult)
+ self.fcpIdentifier = identifier #
+ self.fcpMessage = message # message send to node
+ self.fcpResult = None # job result
+ self.fcpTimeStart = 0 # time the job was started
+ self.fcpTimeStop = 0 # time the job was stopped
+ self.fcpStopped = False
+
+ def displayName(self):
+ """Returns the display name of the job
+ @return: (str) display name
+ """
+ return 'JobBase'
+
+ def start(self):
+ """Starts the job"""
+ self.fcpStopped = False
+ self.fcpTimeStart = time.time()
+ self.fcpClient.sendMessageEx(self.fcpMessage)
+
+ def error(self, msg):
+ """Called on job completion if an error was encounterd while runnng the job
+ @param msg: (Message) to pass to the job
+ """
+ self.fcpStopped = True
+ self.fcpTimeStop = time.time()
+ self.fcpError = msg
+ self.fcpResult = None
+
+ def stop(self, msg):
+ """Called on job completion to stop the job
+ @param msg: (Message) to pass to the job
+ """
+ self.fcpStopped = True
+ self.fcpTimeStop = time.time()
+ self.fcpError = None
+ self.fcpResult = msg
+
+
+class JobClientHello(JobBase):
+ """Sed a ClientHello message to the node
+
+ @note: this must be the first message passed to the node. If everything
+ goes well, you will get a NodeHello in response.
+ """
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
+ """
+ @param name: (str) connection name or None, to use an arbitrary name
+ @param expectedVersion: (str) node version expected
+ """
+ message = Message(
+ Messages.ClientHello,
+ Name=name if name is not None else newIdentifier(),
+ ExpectedVersion=expectedVersion,
+ )
+ JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
+
+ def displayName(self):
+ return 'ClientHello'
+
+
+class JobListPeers(JobBase):
+ """Lists all known peers of the node
+ """
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
+ message = Message(
+ Messages.ListPeers,
+ WithMetadata=fcpBool(withMetaData),
+ WithVolatile=fcpBool(withVolantile),
+ )
+ JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
+
+
+ def displayName(self):
+ return 'ListPeers'
+
+ def handlePeer(self,msg):
+ """Handles the next peer send by the node in form of a 'Peer' message
+ while the job is running. Overwrite to process.
+ """
+
+
+
+class JobGetFileInfo(JobBase):
+ """Tries to retieve information about a file. If everything goes well
+
+ On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
+ Note, that both members may be '' (empty string)
+
+ """
+
+ _fcp_auto_remove_ = False
+
+
+ # idea is to provoke a GetFailed message and take mimetype and size from it
+ def __init__(self, fcpClient, uri, **params):
+ """
+ @param fcpClient: FcpClient() instance
+ @param uri: uri of the file to retrieve info for
+ @param params: additional parameters:
+ IgnoreDS='true' / 'false'
+ DSOnly='true' / 'false'
+ MaxRetries=-1 ...N
+ PriorityClass=Priority*
+
+ """
+ identifier = newIdentifier()
+ message = Message(
+ Messages.ClientGet,
+ Identifier=identifier,
+ URI=uri,
+ MaxSize='0',
+ ReturnType='none',
+ Verbosity='1',
+ **params
+ )
+ JobBase.__init__(self, fcpClient, identifier, message)
+
+
+ def displayName(self):
+ return 'GetFileInfo'
+
+ def handleProgress(self, msg):
+ """Handles the next progress made. Overwrite to process.
+ """
+
+
+ def error(self, msg):
+ JobBase.error(self, msg)
+ if msg.name == Messages.GetFailed:
+ if msg['Code'] == FetchErrors.TooBig:
+ self.fcpError = None
+ self.fcpResult = (
+ msg.get('ExpectedMetadata.ContentType', ''),
+ msg.get('ExpectedDataLength', '')
+ )
+ #else:
+ # raise ValueError('Unhandled message: %s' % msg.name)
+
+
+ def stop(self, msg):
+ JobBase.stop(self, msg)
+ if msg.name == Messages.DataFound:
+ self.fcpResult = (
+ msg.get('Metadata.ContentType', ''),
+ msg.get('DataLength', '')
+ )
+ else:
+ raise ValueError('Unhandled message: %s' % msg.name)
+
+
+
+#TODO: handle case where directories are registered multiple times
+class JobTestDDA(JobBase):
+ """Tests a directory for read / write accesss
+ """
+
+ _fcp_auto_remove_ = False
+
+ def __init__(self, fcpClient, directory, read=True, write=True):
+ if not os.path.isdir(directory):
+ raise ValueError('No such directory: %r' % directory)
+
+ message = Message(
+ Messages.TestDDARequest,
+ Directory=directory,
+ WantReadDirectory=fcpBool(read),
+ WantWriteDirectory=fcpBool(write),
+ )
+ JobBase.__init__(self, fcpClient, directory, message)
+ self.fcpTmpFile = None
+
+ def displayName(self):
+ return 'TestDDA'
+
+ def handleTestDDAReply(self, msg):
+ fpathWrite = msg.params.get('WriteFilename', None)
+ fpathRead = msg.params.get('ReadFilename', None)
+ readContent = ''
+ if fpathWrite is not None:
+ written = saveWriteFile(fpathWrite, msg['ContentToWrite'])
+ if not written:
+ if os.path.isfile(fpathWrite):
+ os.remove(fpathWrite)
+ else:
+ self.fcpTmpFile = fpathWrite
+
+ if fpathRead is not None:
+ readContent = saveReadFile(fpathRead)
+ if readContent is None:
+ readContent = ''
+
+ self.fcpClient.sendMessage(
+ Messages.TestDDAResponse,
+ Directory=msg['Directory'],
+ ReadContent=readContent,
+ )
+
+
+ def error(self, msg):
+ JobBase.error(self, msg)
+ saveRemoveFile(self.fcpTmpFile)
+ self.fcpTmpFile = None
+
+
+ def stop(self, msg):
+ JobBase.stop(self, msg)
+ saveRemoveFile(self.fcpTmpFile)
+ self.fcpTmpFile = None
+
+#**************************************************************************
+# fcp client
+#**************************************************************************
+class LogMessages:
+ """Message strings used for log infos"""
+ Connecting = 'Connecting to node...'
+ Connected = 'Connected to node'
+ ConnectionRetry = 'Connecting to node failed... retrying'
+ ConnectingFailed = 'Connecting to node failed'
+
+ ClientClose = 'Closing client'
+
+ MessageSend = 'Message send'
+ MessageReceived = 'Message received'
+
+ JobStart = 'Starting job: '
+ JobStop = 'Stopping job: '
+ JobsCompleted = 'All jobs completed'
+
+ KeyboardInterrupt = 'Keyboard interrupt'
+ SocketDead = 'Socket is dead'
+
+
+#TODO: no idea what happens on reconnect if socket died. What about running jobs?
+#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
+#TODO: no idea if to add support for pending jobs and queue management here
+class FcpClient(object):
+ """Fcp client implementation"""
+
+
+ def __init__(self,
+ name='',
+ errorHandler=None,
+ verbosity=logging.WARNING,
+ logMessages=LogMessages
+ ):
+ """
+ @param name: name of the client instance or '' (for debugging)
+ @param errorHandler: will be called if the socket conncetion to the node is dead
+ with two params: FcpSocketError + details. When the handler is called the client
+ is already closed.
+ @param verbosity: verbosity level for debugging
+ @param logMessages: LogMessages class containing messages
+ """
+
+ self._isConnected = False
+ self._jobs = {
+ 'all': {},
+ 'pending': [], # ???
+ 'running': [],
+ 'complete': [], # ???
+ }
+ self._errorHandler = errorHandler
+ self._log = logging.getLogger(NameClient + ':' + name)
+ self._logMessages = logMessages
+ self._lock = thread.allocate_lock()
+ self._socket = None
+
+ self.setVerbosity(verbosity)
+ atexit.register(self.close)
+
+ def close(self):
+ """Closes the client
+ @note: make shure to call close() when done with the client
+ """
+ self._log.info(self._logMessages.ClientClose)
+ if self._socket is not None:
+ self._socket.close()
+ self._socket = None
+
+
+ #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
+ def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
+ """Establishes the connection to a freenet node
+ @param host: (str) host of th node
+ @param port: (int) port of the node
+ @param repeat: (int) how many seconds try to connect before giving up
+ @param timeout: (int) how much time to wait before another attempt to connect
+ @return: (Message) NodeHello if successful,None otherwise
+ """
+ self._clientHello = None
+ self._log.info(self._logMessages.Connecting)
+
+ # poll untill freenet responds
+ time_elapsed = 0
+ while time_elapsed <= repeat:
+
+ # try to Connect socket
+ if self._socket is not None:
+ self.close()
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.settimeout(SocketTimeout)
+ try:
+ self._socket.connect((host, port))
+ except Exception, d:
+ pass
+ else:
+ self._log.info(self._logMessages.Connected)
+ job = JobClientHello(self)
+ self.jobAdd(job, synchron=True)
+ assert job.fcpError is None, 'Error should have been caught by handleMessage()'
+ return job.fcpResult
+
+ self._log.info(self._logMessages.ConnectionRetry)
+
+ # continue polling
+ time_elapsed += timeout
+ time.sleep(timeout)
+
+ self._log.info(self._logMessages.ConnectingFailed)
+ return None
+
+
+ def handleMessage(self, msg):
+ """Handles the next message from the freenet node
+ @param msg: Message() to handle
+ """
+ self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
+
+ if msg.name == Messages.NodeHello:
+ #connectionIdentifier = msg['ConnectionIdentifier']
+ self.jobStop(JobIdentifiers.ClientHello, msg)
+
+ elif msg.name == Messages.ProtocolError:
+ code = msg['Code']
+ #if code == ProtocolErrors.NoLateClientHellos:
+ # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+ #elif code == ProtocolErrors.ClientHelloMustBeFirst:
+ # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+ #else:
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ #TODO: inform caller
+ raise FcpProtocolError(msg)
+ else:
+ self.jobStop(identifier, msg, error=True)
+
+ elif msg.name == Messages.Peer:
+ self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg)
+
+ elif msg.name == Messages.EndListPeers:
+ self.jobStop(IdentifierListPeers, msg)
+
+ elif msg.name == Messages.GetFailed:
+ self.jobStop(msg['Identifier'], msg, error=True)
+
+ elif msg.name == Messages.SimpleProgress:
+ self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+
+ elif msg.name == Messages.TestDDAReply:
+ self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
+
+ elif msg.name == Messages.TestDDAComplete:
+ self.jobStop(msg['Directory'], msg)
+
+ elif msg.name == Messages.IdentifierCollision:
+ pass
+
+
+ def jobAdd(self, job, synchron=False):
+ """Adds a job to the client
+ @param job: (Job*) job to add
+ @param synchron: if True, wait untill the job is completed, if False return emidiately
+ """
+ self._lock.acquire(True)
+ try:
+ if job.fcpIdentifier in self._jobs['all']:
+ raise ValueError('Duplicate job: %r' % job.identifier)
+ self._jobs['all'][job.fcpIdentifier] = job
+ self._jobs['running'].append(job)
+ finally:
+ self._lock.release()
+
+ self._log.info(self._logMessages.JobStart + job.displayName())
+ job.start()
+ if synchron:
+ while not job.fcpStopped:
+ self.next()
+
+
+
+ def jobNotify(self, identifier, handler, msg):
+ """Notifies a job about an event while it is running
+ @param identifier: identifier of the job to notify
+ @param handler: (str) method of the job to call to handle the notification
+ @param msg: Message() to pass to the job
+ """
+ self._lock.acquire(True)
+ try:
+ job = self._jobs['all'].get(identifier, None)
+ finally:
+ self._lock.release()
+ if job is None:
+ raise ValueError('No such job: %r' % identifier)
+ getattr(job, handler)(msg)
+
+
+ #TODO: quite unclear when to remove a job
+ def jobStop(self, identifier, msg, error=False):
+ """Stops a job
+ @param identifier: identifier of the job to stop
+ @param msg: Message() to pass to the job as result
+ @param error: set to True to indicate unsuccessful completion of the job, True otherwisse
+ """
+ self._lock.acquire(True)
+ try:
+ job = self._jobs['all'].get(identifier, None)
+ if job is not None:
+ self._jobs['running'].remove(job)
+ if job._fcp_auto_remove_:
+ del self._jobs['all'][identifier]
+ else:
+ self._jobs['complete'].append(job)
+ finally:
+ self._lock.release()
+
+ if job is None:
+ raise ValueError('No such job: %r' % identifier)
+ self._log.info(self._logMessages.JobStop + job.displayName())
+ if error:
+ job.error(msg)
+ else:
+ job.stop(msg)
+
+
+ #TODO: some info when all jobs are completed
+ def next(self):
+ """Pumps the next message waiting
+ @note: use this method instead of run() to run the client step by step
+ """
+ msg = self.readMessage()
+ self.handleMessage(msg)
+ return msg
+
+ def readMessage(self):
+ """Reads the next message directly from the socket and dispatches it
+ @return: (Message) the next message read from the socket
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
+ """
+ msg = Message(None)
+ buf = []
+ while True:
+
+ try:
+ p = self._socket.recv(1)
+ if not p: raise ValueError('Socket is dead')
+ except socket.timeout, d: # no new messages in queue
+ msg = MessageSocketTimeout()
+ break
+ except Exception, d:
+ self._log.info(self._logMessages.SocketDead)
+ self.close()
+ if self._errorHandler is not None:
+ self._errorHandler(FcpSocketError, d)
+ raise FcpSocketError(d) #!!
+
+ if p == '\r': # ignore
+ continue
+
+ if p != '\n':
+ buf.append(p)
+ continue
+
+ line = ''.join(buf)
+ if line in ('End', "EndMessage"):
+ break
+ buf = []
+
+ if msg.name is None:
+ msg.name = line
+ elif line == 'Data':
+ n = int(msg.params['DataLength'])
+ try:
+ msg.data = self._socket.recv(n)
+ if not msg.data: raise ValueError('Socket is dead')
+ except Exception, d:
+ self._log.info(self._logMessages.SocketDead)
+ self.close()
+ if self._errorHandler is not None:
+ self._errorHandler(FcpSocketError, d)
+ raise FcpSocketError(d) #!!
+
+ else:
+ head, sep, tail = line.partition('=')
+ msg.params[head] = tail
+ if not sep:
+ # TODO: chek for invalid messages or not
+ pass
+
+ return msg
+
+
+ def run(self):
+ """Runs the client untill all jobs passed to it are completed
+ @note: use KeyboardInterrupt to stop prematurely
+ """
+
+ # TODO:
+ # x. push pending jobs
+ try:
+ while True:
+ if not self._lock.acquire(False):
+ continue
+
+ try:
+ if not self._jobs['pending'] and not self._jobs['running']:
+ self._log.info(self._logMessages.JobsCompleted)
+ break
+ finally:
+ self._lock.release()
+
+ self.next()
+ except KeyboardInterrupt:
+ self._log(self._logMessages.KeyboardInterrupt)
+ self.close()
+
+
+ def sendMessage(self, name, data=None, **params):
+ """Sends a message to freenet
+ @param name: name of the message to send
+ @param data: data to atatch to the message
+ @param params: {para-name: param-calue, ...} of parameters to pass along
+ with the message (see freenet protocol)
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
+ """
+ return self.sendMessageEx(Message(name, data=data, **params))
+
+
+ def sendMessageEx(self, msg):
+ """Sends a message to freenet
+ @param msg: (Message) message to send
+ @return: Message
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly.
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
+ """
+ rawMsg = msg.toString()
+ self._log.debug(self._logMessages.Mes...
[truncated message content] |
|
From: <jU...@us...> - 2007-10-20 10:00:17
|
Revision: 7
http://fclient.svn.sourceforge.net/fclient/?rev=7&view=rev
Author: jUrner
Date: 2007-10-20 03:00:20 -0700 (Sat, 20 Oct 2007)
Log Message:
-----------
continued working on fcp protocol implementation
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp20.py
Modified: trunk/fclient/fclient_lib/fcp/fcp20.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 17:54:01 UTC (rev 6)
+++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:00:20 UTC (rev 7)
@@ -288,6 +288,21 @@
# classes
#**********************************************************************
class FcpSocketError(Exception): pass
+class FcpProtocolError(Exception):
+ def __init__(self, msg):
+ """
+ @param msg: (Message) ProtocolError message
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+
+ def __str__(self): return self.value
+
+
+
class FcpUri(object):
"""Wrapper class for freenet uris"""
@@ -467,8 +482,10 @@
self.fcpIdentifier = identifier #
self.fcpMessage = message # message send to node
self.fcpResult = None # job result
- self.fcpTime = 0 # start time (will hld duration whern the job is complte)
-
+ self.fcpTimeStart = 0 # time the job was started
+ self.fcpTimeStop = 0 # time the job was stopped
+ self.fcpStopped = False
+
def displayName(self):
"""Returns the display name of the job
@return: (str) display name
@@ -477,14 +494,16 @@
def start(self):
"""Starts the job"""
- self.fcpTime = time.time()
+ self.fcpStopped = False
+ self.fcpTimeStart = time.time()
self.fcpClient.sendMessageEx(self.fcpMessage)
def error(self, msg):
"""Called on job completion if an error was encounterd while runnng the job
@param msg: (Message) to pass to the job
"""
- self.fcpTime = time.time() - self.fcpTime
+ self.fcpStopped = True
+ self.fcpTimeStop = time.time()
self.fcpError = msg
self.fcpResult = None
@@ -492,7 +511,8 @@
"""Called on job completion to stop the job
@param msg: (Message) to pass to the job
"""
- self.fcpTime = time.time() - self.fcpTime
+ self.fcpStopped = True
+ self.fcpTimeStop = time.time()
self.fcpError = None
self.fcpResult = msg
@@ -519,7 +539,7 @@
JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
def displayName(self):
- return 'NodeHello'
+ return 'ClientHello'
class JobListPeers(JobBase):
@@ -531,8 +551,8 @@
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
message = Message(
Messages.ListPeers,
- WithMetadata='true' if withMetaData else 'false',
- WithVolatile='true' if withVolantile else 'false',
+ WithMetadata=fcpBool(withMetaData),
+ WithVolatile=fcpBool(withVolantile),
)
JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
@@ -587,8 +607,7 @@
return 'GetFileInfo'
def handleProgress(self, msg):
- """Handles the next progress made of a 'SimpleProgress' message
- while the job is running. Overwrite to process.
+ """Handles the next progress made. Overwrite to process.
"""
@@ -625,6 +644,9 @@
_fcp_auto_remove_ = False
def __init__(self, fcpClient, directory, read=True, write=True):
+ if not os.path.isdir(directory):
+ raise ValueError('No such directory: %r' % directory)
+
message = Message(
Messages.TestDDARequest,
Directory=directory,
@@ -660,6 +682,7 @@
ReadContent=readContent,
)
+
def error(self, msg):
JobBase.error(self, msg)
saveRemoveFile(self.fcpTmpFile)
@@ -728,7 +751,7 @@
self._logMessages = logMessages
self._lock = thread.allocate_lock()
self._socket = None
-
+
self.setVerbosity(verbosity)
atexit.register(self.close)
@@ -749,8 +772,9 @@
@param port: (int) port of the node
@param repeat: (int) how many seconds try to connect before giving up
@param timeout: (int) how much time to wait before another attempt to connect
- @return: True if successful, False otherwise
+ @return: (Message) NodeHello if successful,None otherwise
"""
+ self._clientHello = None
self._log.info(self._logMessages.Connecting)
# poll untill freenet responds
@@ -768,7 +792,10 @@
pass
else:
self._log.info(self._logMessages.Connected)
- return True
+ job = JobClientHello(self)
+ self.jobAdd(job, synchron=True)
+ assert job.fcpError is None, 'Error should have been caught by handleMessage()'
+ return job.fcpResult
self._log.info(self._logMessages.ConnectionRetry)
@@ -777,7 +804,7 @@
time.sleep(timeout)
self._log.info(self._logMessages.ConnectingFailed)
- return False
+ return None
def handleMessage(self, msg):
@@ -792,15 +819,17 @@
elif msg.name == Messages.ProtocolError:
code = msg['Code']
- if code == ProtocolErrors.NoLateClientHellos:
- self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
-
+ #if code == ProtocolErrors.NoLateClientHellos:
+ # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+ #elif code == ProtocolErrors.ClientHelloMustBeFirst:
+ # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+ #else:
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ #TODO: inform caller
+ raise FcpProtocolError(msg)
else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- pass # raise ???
- else:
- self.jobStop(identifier, msg, error=True)
+ self.jobStop(identifier, msg, error=True)
elif msg.name == Messages.Peer:
self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg)
@@ -824,9 +853,10 @@
pass
- def jobAdd(self, job):
+ def jobAdd(self, job, synchron=False):
"""Adds a job to the client
@param job: (Job*) job to add
+ @param synchron: if True, wait untill the job is completed, if False return emidiately
"""
self._lock.acquire(True)
try:
@@ -839,6 +869,10 @@
self._log.info(self._logMessages.JobStart + job.displayName())
job.start()
+ if synchron:
+ while not job.fcpStopped:
+ self.next()
+
def jobNotify(self, identifier, handler, msg):
@@ -857,6 +891,7 @@
getattr(job, handler)(msg)
+ #TODO: quite unclear when to remove a job
def jobStop(self, identifier, msg, error=False):
"""Stops a job
@param identifier: identifier of the job to stop
@@ -891,6 +926,7 @@
"""
msg = self.readMessage()
self.handleMessage(msg)
+ return msg
def readMessage(self):
"""Reads the next message directly from the socket and dispatches it
@@ -1026,32 +1062,37 @@
if __name__ == '__main__':
c = FcpClient(name='test', verbosity=logging.DEBUG)
if c.connect():
- job1 = JobClientHello(c)
- c.jobAdd(job1)
+ def foo():
+ job1 = JobClientHello(c)
+ c.jobAdd(job1)
- c.run()
- print '---------------------------'
- print job1.fcpError
- print job1.fcpResult
- print job1.fcpTime
- print '---------------------------'
+ c.run()
+ print '---------------------------'
+ print job1.fcpError
+ print job1.fcpResult
+ print job1.fcpTime
+ print '---------------------------'
+ #foo()
+
+ def foo():
+ d = os.path.dirname(os.path.abspath(__file__))
+ job2 = JobTestDDA(c, d)
+ c.jobAdd(job2)
+ c.run()
+ print '---------------------------'
+ print job2.fcpError
+ print job2.fcpResult
+ print job2.fcpTime
+ print '---------------------------'
- job2 = JobTestDDA(c, os.path.dirname(__file__))
- c.jobAdd(job2)
- c.run()
- print '---------------------------'
- print job1.fcpError
- print job2.fcpResult
- print job2.fcpTime
- print '---------------------------'
-
+ def foo():
job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job1.fcpError
+ print job2.fcpError
print job2.fcpResult
print job2.fcpTime
print '---------------------------'
-
+ #foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ju...@us...> - 2007-10-16 17:54:04
|
Revision: 6
http://fclient.svn.sourceforge.net/fclient/?rev=6&view=rev
Author: jurner
Date: 2007-10-16 10:54:01 -0700 (Tue, 16 Oct 2007)
Log Message:
-----------
added a bit of documentation and a class to deal with freenet uris
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp20.py
Modified: trunk/fclient/fclient_lib/fcp/fcp20.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 10:19:24 UTC (rev 5)
+++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 17:54:01 UTC (rev 6)
@@ -2,6 +2,7 @@
import atexit
import logging
import os
+import re
import socket
import subprocess
import sys
@@ -13,6 +14,7 @@
#**************************************************************
# consts
#**************************************************************
+NameClient = 'Fcp20Client'
DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
DefaultFcpPort = 9481
try:
@@ -21,12 +23,14 @@
SocketTimeout = 0.1
class JobIdentifiers:
- # fixed job identifiers
- # note that the client can only handle one job of these at a time
+ """Fixed job identifiers
+ @note: he client can only handle one job of these at a time
+ """
ClientHello = 'ClientHello'
ListPeers = 'ListPeers'
class Messages:
+ """All messages supported by the client"""
# client messages
ClientHello = 'ClientHello'
@@ -92,6 +96,8 @@
class Priorities:
+ """All priorities supported by the client"""
+
Maximum = 0
Interactive = 1
SemiInteractive = 2
@@ -107,6 +113,8 @@
# errors
class FetchErrors:
+ """All fetch errors supported by the client"""
+
MaxArchiveRecursionExceeded = '1'
UnknownSplitfileMetadata = '2'
UnknownMetadata = '3'
@@ -138,6 +146,8 @@
class InsertErrors:
+ """All insert errors supported by the client"""
+
InvalidUri = '1'
BucketError = '2'
InternalError = '3'
@@ -151,6 +161,8 @@
class ProtocolErrors:
+ """All protocol errors supported by the client"""
+
ClientHelloMustBeFirst = '1'
NoLateClientHellos = '2'
MessageParseError = '3'
@@ -187,6 +199,9 @@
# functions
#**********************************************************************
def newIdentifier():
+ """Returns a new unique identifier
+ @return: (str) uuid
+ """
return str(uuid.uuid4())
def saveReadFile(fpath):
@@ -206,6 +221,10 @@
return read
def saveRemoveFile(fpath):
+ """Savely removes a file
+ @param fpath: filepath of the file to remove or None
+ @return: True if the file was removed, False otherwise
+ """
if fpath is not None:
if os.path.isfile(fpath):
os.remove(fpath)
@@ -265,99 +284,101 @@
"""
return fcpBool == 'true'
-
-def uriToList(uri):
- """Splits a freenet uri into its components
- @param uri: (str) uri to split
- @return: (list) or components
- @note: additional dexoration like 'freenet:' or 'http://blah' are ignored
+#**********************************************************************
+# classes
+#**********************************************************************
+class FcpSocketError(Exception): pass
+class FcpUri(object):
+ """Wrapper class for freenet uris"""
- >>> uriToList('SSK@foo')
- ['SSK@foo']
- >>> uriToList('SSK@foo/bar/baz')
- ['SSK@foo', 'bar', 'baz']
+ KeySSK = 'SSK@'
+ KeyKSK = 'KSK@'
+ KeyCHK = 'CHK@'
+ KeyUSK = 'USK@'
+ KeySVK = 'SVK@'
+ KeyUnknown = ''
+ KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK)
+
+ ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I)
+ ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I)
- >>> uriToList('freenet:SSK@foo')
- ['SSK@foo']
- >>> uriToList('http://foo/SSK@foo')
- ['SSK@foo']
+ def __init__(self, uri):
+ """
+ @param uri: uri to wrap
+ @param cvar ReUriPattern: pattern matching a freenet uri
+ @param cvar ReKeyPattern: pattern matching the key type of a freenet uri
+
+ @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible
+
+
+ >>> uri = FcpUri('freenet:SSK@foo/bar')
+ >>> str(uri)
+ 'SSK@foo/bar'
+ >>> uri.keyType() == FcpUri.KeySSK
+ True
+ >>> uri.split()
+ ('SSK@foo', 'bar')
+ >>> uri.fileName()
+ 'bar'
+
+ >>> uri = FcpUri('http://SSK@foo/bar')
+ >>> str(uri)
+ 'SSK@foo/bar'
+
+ # uris not containing freenet keys are left unchanged
+ >>> uri = FcpUri('http://foo/bar')
+ >>> str(uri)
+ 'http://foo/bar'
+ >>> uri.keyType() == FcpUri.KeyUnknown
+ True
+ >>> uri.split()
+ ('http://foo/bar', '')
+ >>> uri.fileName()
+ 'http://foo/bar'
+
+ """
+ self._uri = uri
+
+ result = self.ReUriPattern.search(uri)
+ if result is not None:
+ self._uri = result.group(0)
+
+ def __str__(self):
+ return str(self._uri)
- >>> uriToList('http://foo')
- []
+ def __unicode__(self):
+ return unicode(self._uri)
- """
- if uri.startswith('freenet:'):
- uri = uri[len('freenet:'): ]
-
- components = []
- head = uri
- while head:
- head, tail = posixpath.split(head)
- components.append(tail)
+ def keyType(self):
+ """Retuns the key type of the uri
+ @return: one of the Key* consts
+ """
+ result = self.ReKeyPattern.search(self._uri)
+ if result is not None:
+ return result.group(0).upper()
+ return self.KeyUnknown
- components.reverse()
+ def split(self):
+ """Splits the uri
+ @return: tuple(freenet-key, file-name)
+ """
+ if self.keyType() != self.KeyUnknown:
+ head, sep, tail = self._uri.partition('/')
+ return head, tail
+ return self._uri, ''
+
+ def fileName(self):
+ """Returns the filename part of the uri
+ @return: str
+ """
+ head, tail = self.split()
+ if tail:
+ return tail
+ return self._uri
- while components:
- if components[0][:4] in FcpKeys.KeysAll:
- break
- else:
- components.pop(0)
-
- return components
-def uriKeyType(uri):
- pass
-
-
-def isValidUri(uri):
- """Checks if an uri is a valid freenet uri
- @param uri: (str) uri to check
- @reuturn: (bool)
- """
- return bool(UriToList(uri))
-
-
-def splitUri(uri):
- """Splits an uri into uri and filename
- @param uri: uri to split
- @return: tuple(uri, filename)
-
- >>> splitUri('SSK@foo/bar/baz')
- ('SSK@foo', 'bar/baz')
-
- >>> splitUri('SSK@foo')
- ('SSK@foo', '')
-
- >>> splitUri('NoUri')
- ('NoUri', '')
-
- """
- L = uriToList(uri)
- if not L:
- return (uri, '')
- names = L[1:]
- if not names:
- name = ''
- else:
- name = '/'.join(names)
- return (L[0], name)
-
-
-def fileNameFromUri(uri):
- """Returns the filename part of an uri
- @return: (str) filename. If no filename is found the uri is returned unchanged
- """
- tmp_uri, name = splitUri(uri)
- if name:
- return name
- return uri
-
-#**********************************************************************
-# classes
-#**********************************************************************
-class FcpSocketError(Exception): pass
class Message(object):
"""Class wrapping a freenet message"""
@@ -417,6 +438,8 @@
def __init__(self):
Message.__init__(self, self.Name)
+
+
#**************************************************************************
# jobs
#**************************************************************************
@@ -426,6 +449,18 @@
_fcp_auto_remove_ = True
def __init__(self, fcpClient, identifier, message):
+ """
+ @param fcpClient: FcpClient() instance
+ @param identifier: (str) identifier of the job
+ @param message: (Message) to send to the node whne the job ist started
+ @ivar fcpClient: FcpClient() instance
+ @ivar fcpError: holding the error message if an error was encountered while running
+ the job, None otherwise
+ @ivar fcpIdentifier: identifier of the job
+ @ivar fcpMessage: initial message send to the node
+ @ivar fcpResult: if no error was encountered, holding the result of the job when complete
+ @ivar fcpTime: when the job is complete, holding the time the job took to complete
+ """
self.fcpClient = fcpClient # FcpClient() instance the job belongs to
self.fcpError = None # last error (either this is set or dcpResult)
@@ -435,31 +470,50 @@
self.fcpTime = 0 # start time (will hld duration whern the job is complte)
def displayName(self):
+ """Returns the display name of the job
+ @return: (str) display name
+ """
return 'JobBase'
def start(self):
+ """Starts the job"""
self.fcpTime = time.time()
self.fcpClient.sendMessageEx(self.fcpMessage)
def error(self, msg):
+ """Called on job completion if an error was encounterd while runnng the job
+ @param msg: (Message) to pass to the job
+ """
self.fcpTime = time.time() - self.fcpTime
self.fcpError = msg
self.fcpResult = None
- def stop(self, result):
+ def stop(self, msg):
+ """Called on job completion to stop the job
+ @param msg: (Message) to pass to the job
+ """
self.fcpTime = time.time() - self.fcpTime
self.fcpError = None
- self.fcpResult = result
+ self.fcpResult = msg
-class JobNodeHello(JobBase):
+class JobClientHello(JobBase):
+ """Sed a ClientHello message to the node
+ @note: this must be the first message passed to the node. If everything
+ goes well, you will get a NodeHello in response.
+ """
+
_fcp_auto_remove_ = True
- def __init__(self, fcpClient, expectedVersion='2.0'):
+ def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
+ """
+ @param name: (str) connection name or None, to use an arbitrary name
+ @param expectedVersion: (str) node version expected
+ """
message = Message(
Messages.ClientHello,
- Name=newIdentifier(),
+ Name=name if name is not None else newIdentifier(),
ExpectedVersion=expectedVersion,
)
JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
@@ -469,6 +523,8 @@
class JobListPeers(JobBase):
+ """Lists all known peers of the node
+ """
_fcp_auto_remove_ = True
@@ -485,13 +541,20 @@
return 'ListPeers'
def handlePeer(self,msg):
- pass
+ """Handles the next peer send by the node in form of a 'Peer' message
+ while the job is running. Overwrite to process.
+ """
+
-
class JobGetFileInfo(JobBase):
- """"""
+ """Tries to retieve information about a file. If everything goes well
+ On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
+ Note, that both members may be '' (empty string)
+
+ """
+
_fcp_auto_remove_ = False
@@ -524,9 +587,11 @@
return 'GetFileInfo'
def handleProgress(self, msg):
- pass
+ """Handles the next progress made of a 'SimpleProgress' message
+ while the job is running. Overwrite to process.
+ """
+
-
def error(self, msg):
JobBase.error(self, msg)
if msg.name == Messages.GetFailed:
@@ -554,7 +619,9 @@
#TODO: handle case where directories are registered multiple times
class JobTestDDA(JobBase):
-
+ """Tests a directory for read / write accesss
+ """
+
_fcp_auto_remove_ = False
def __init__(self, fcpClient, directory, read=True, write=True):
@@ -608,6 +675,7 @@
# fcp client
#**************************************************************************
class LogMessages:
+ """Message strings used for log infos"""
Connecting = 'Connecting to node...'
Connected = 'Connected to node'
ConnectionRetry = 'Connecting to node failed... retrying'
@@ -628,8 +696,11 @@
#TODO: no idea what happens on reconnect if socket died. What about running jobs?
#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
+#TODO: no idea if to add support for pending jobs and queue management here
class FcpClient(object):
+ """Fcp client implementation"""
+
def __init__(self,
name='',
errorHandler=None,
@@ -648,12 +719,12 @@
self._isConnected = False
self._jobs = {
'all': {},
- 'pending': [],
+ 'pending': [], # ???
'running': [],
- 'complete': [],
+ 'complete': [], # ???
}
- self._errrorHandler = errorHandler
- self._log = logging.getLogger('FcpClient20:%s' % name)
+ self._errorHandler = errorHandler
+ self._log = logging.getLogger(NameClient + ':' + name)
self._logMessages = logMessages
self._lock = thread.allocate_lock()
self._socket = None
@@ -662,6 +733,9 @@
atexit.register(self.close)
def close(self):
+ """Closes the client
+ @note: make shure to call close() when done with the client
+ """
self._log.info(self._logMessages.ClientClose)
if self._socket is not None:
self._socket.close()
@@ -670,7 +744,13 @@
#TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
-
+ """Establishes the connection to a freenet node
+ @param host: (str) host of th node
+ @param port: (int) port of the node
+ @param repeat: (int) how many seconds try to connect before giving up
+ @param timeout: (int) how much time to wait before another attempt to connect
+ @return: True if successful, False otherwise
+ """
self._log.info(self._logMessages.Connecting)
# poll untill freenet responds
@@ -699,8 +779,55 @@
self._log.info(self._logMessages.ConnectingFailed)
return False
+
+ def handleMessage(self, msg):
+ """Handles the next message from the freenet node
+ @param msg: Message() to handle
+ """
+ self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
+
+ if msg.name == Messages.NodeHello:
+ #connectionIdentifier = msg['ConnectionIdentifier']
+ self.jobStop(JobIdentifiers.ClientHello, msg)
+
+ elif msg.name == Messages.ProtocolError:
+ code = msg['Code']
+ if code == ProtocolErrors.NoLateClientHellos:
+ self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+
+ else:
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ pass # raise ???
+ else:
+ self.jobStop(identifier, msg, error=True)
+
+ elif msg.name == Messages.Peer:
+ self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg)
+
+ elif msg.name == Messages.EndListPeers:
+ self.jobStop(IdentifierListPeers, msg)
+
+ elif msg.name == Messages.GetFailed:
+ self.jobStop(msg['Identifier'], msg, error=True)
+
+ elif msg.name == Messages.SimpleProgress:
+ self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+
+ elif msg.name == Messages.TestDDAReply:
+ self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
+
+ elif msg.name == Messages.TestDDAComplete:
+ self.jobStop(msg['Directory'], msg)
+
+ elif msg.name == Messages.IdentifierCollision:
+ pass
- def addJob(self, job):
+
+ def jobAdd(self, job):
+ """Adds a job to the client
+ @param job: (Job*) job to add
+ """
self._lock.acquire(True)
try:
if job.fcpIdentifier in self._jobs['all']:
@@ -712,12 +839,33 @@
self._log.info(self._logMessages.JobStart + job.displayName())
job.start()
-
- def stopJob(self, identifier, msg, error=False):
+
+ def jobNotify(self, identifier, handler, msg):
+ """Notifies a job about an event while it is running
+ @param identifier: identifier of the job to notify
+ @param handler: (str) method of the job to call to handle the notification
+ @param msg: Message() to pass to the job
+ """
self._lock.acquire(True)
try:
job = self._jobs['all'].get(identifier, None)
+ finally:
+ self._lock.release()
+ if job is None:
+ raise ValueError('No such job: %r' % identifier)
+ getattr(job, handler)(msg)
+
+
+ def jobStop(self, identifier, msg, error=False):
+ """Stops a job
+ @param identifier: identifier of the job to stop
+ @param msg: Message() to pass to the job as result
+ @param error: set to True to indicate unsuccessful completion of the job, True otherwisse
+ """
+ self._lock.acquire(True)
+ try:
+ job = self._jobs['all'].get(identifier, None)
if job is not None:
self._jobs['running'].remove(job)
if job._fcp_auto_remove_:
@@ -734,92 +882,22 @@
job.error(msg)
else:
job.stop(msg)
-
- def notifyJob(self, identifier, handler, msg):
- self._lock.acquire(True)
- try:
- job = self._jobs['all'].get(identifier, None)
- finally:
- self._lock.release()
- if job is None:
- raise ValueError('No such job: %r' % identifier)
- getattr(job, handler)(msg)
-
- def run(self):
-
- # TODO:
- # x. push pending jobs
- try:
- while True:
- if not self._lock.acquire(False):
- continue
-
- try:
- if not self._jobs['pending'] and not self._jobs['running']:
- self._log.info(self._logMessages.JobsCompleted)
- break
- finally:
- self._lock.release()
-
- self.next()
- except KeyboardInterrupt:
- self._log(self._logMessages.KeyboardInterrupt)
- self.close()
-
-
#TODO: some info when all jobs are completed
def next(self):
+ """Pumps the next message waiting
+ @note: use this method instead of run() to run the client step by step
+ """
msg = self.readMessage()
self.handleMessage(msg)
-
- def handleMessage(self, msg):
-
- self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
-
- if msg.name == Messages.NodeHello:
- #connectionIdentifier = msg['ConnectionIdentifier']
- self.stopJob(JobIdentifiers.ClientHello, msg)
-
- elif msg.name == Messages.ProtocolError:
- code = msg['Code']
- if code == ProtocolErrors.NoLateClientHellos:
- self.stopJob(JobIdentifiers.ClientHello, msg, error=True)
-
- else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- pass # raise ???
- else:
- self.stopJob(identifier, msg, error=True)
-
- elif msg.name == Messages.Peer:
- self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg)
-
- elif msg.name == Messages.EndListPeers:
- self.stopJob(IdentifierListPeers, msg)
-
- elif msg.name == Messages.GetFailed:
- self.stopJob(msg['Identifier'], msg, error=True)
-
- elif msg.name == Messages.SimpleProgress:
- self.notifyJob(msg['Identifier'], 'handleProgress', msg)
-
- elif msg.name == Messages.TestDDAReply:
- self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg)
-
- elif msg.name == Messages.TestDDAComplete:
- self.stopJob(msg['Directory'], msg)
-
- elif msg.name == Messages.IdentifierCollision:
- pass
-
-
def readMessage(self):
"""Reads the next message directly from the socket and dispatches it
- @return: valid or invalid Message()
+ @return: (Message) the next message read from the socket
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
"""
msg = Message(None)
buf = []
@@ -872,20 +950,42 @@
pass
return msg
+
+
+ def run(self):
+ """Runs the client untill all jobs passed to it are completed
+ @note: use KeyboardInterrupt to stop prematurely
+ """
+ # TODO:
+ # x. push pending jobs
+ try:
+ while True:
+ if not self._lock.acquire(False):
+ continue
+
+ try:
+ if not self._jobs['pending'] and not self._jobs['running']:
+ self._log.info(self._logMessages.JobsCompleted)
+ break
+ finally:
+ self._lock.release()
+
+ self.next()
+ except KeyboardInterrupt:
+ self._log(self._logMessages.KeyboardInterrupt)
+ self.close()
- def setLogMessages(self, logMessages):
- self._logMessages = logMessages
- def setVerbosity(self, verbosity):
- self._log.setLevel(verbosity)
-
def sendMessage(self, name, data=None, **params):
"""Sends a message to freenet
@param name: name of the message to send
@param data: data to atatch to the message
@param params: {para-name: param-calue, ...} of parameters to pass along
with the message (see freenet protocol)
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
"""
return self.sendMessageEx(Message(name, data=data, **params))
@@ -894,6 +994,9 @@
"""Sends a message to freenet
@param msg: (Message) message to send
@return: Message
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly.
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
"""
rawMsg = msg.toString()
self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint())
@@ -907,14 +1010,24 @@
raise FcpSocketError(d)
return msg
+
+ def setLogMessages(self, logMessages):
+ """"""
+ self._logMessages = logMessages
+
+
+ def setVerbosity(self, verbosity):
+ """"""
+ self._log.setLevel(verbosity)
+
#*****************************************************************************
#
#*****************************************************************************
if __name__ == '__main__':
c = FcpClient(name='test', verbosity=logging.DEBUG)
if c.connect():
- job1 = JobNodeHello(c)
- c.addJob(job1)
+ job1 = JobClientHello(c)
+ c.jobAdd(job1)
c.run()
print '---------------------------'
@@ -924,21 +1037,21 @@
print '---------------------------'
- job2 = JobTestDDA(c, os.path.dirname(__file__))
- c.addJob(job2)
- c.run()
- print '---------------------------'
- print job1.fcpError
- print job2.fcpResult
- print job2.fcpTime
- print '---------------------------'
-
- job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
- c.addJob(job2)
- c.run()
- print '---------------------------'
- print job1.fcpError
- print job2.fcpResult
- print job2.fcpTime
- print '---------------------------'
+ job2 = JobTestDDA(c, os.path.dirname(__file__))
+ c.jobAdd(job2)
+ c.run()
+ print '---------------------------'
+ print job1.fcpError
+ print job2.fcpResult
+ print job2.fcpTime
+ print '---------------------------'
+
+ job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
+ c.jobAdd(job2)
+ c.run()
+ print '---------------------------'
+ print job1.fcpError
+ print job2.fcpResult
+ print job2.fcpTime
+ print '---------------------------'
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ju...@us...> - 2007-10-16 10:19:40
|
Revision: 5
http://fclient.svn.sourceforge.net/fclient/?rev=5&view=rev
Author: jurner
Date: 2007-10-16 03:19:24 -0700 (Tue, 16 Oct 2007)
Log Message:
-----------
continued working on FcpClient
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp20.py
Modified: trunk/fclient/fclient_lib/fcp/fcp20.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-15 17:42:24 UTC (rev 4)
+++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 10:19:24 UTC (rev 5)
@@ -1,9 +1,15 @@
-
+
+import atexit
+import logging
import os
import socket
+import subprocess
+import sys
import time
import thread
import uuid
+
+logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
#**************************************************************
# consts
#**************************************************************
@@ -13,7 +19,6 @@
DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
except: pass
SocketTimeout = 0.1
-KeyTypes = ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@')
class JobIdentifiers:
# fixed job identifiers
@@ -200,6 +205,13 @@
fp.close()
return read
+def saveRemoveFile(fpath):
+ if fpath is not None:
+ if os.path.isfile(fpath):
+ os.remove(fpath)
+ return True
+ return False
+
def saveWriteFile(fpath, data):
"""Writes data to a file i the savest manner possible
@@ -221,6 +233,22 @@
fp.close()
return written
+def startFreenet(cmdline):
+ """Starts freenet
+ @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat')
+ @return: (string) whatever freenet returns
+ """
+ #TODO: on windows it may be necessary to hide the comand window
+ p = subprocess.Popen(
+ args=cmdline,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ stdout, stderr = p.communicate()
+ return stdout
+
+
def fcpBool(pythonBool):
"""Converts a python bool to a fcp bool
@param pythonBool: (bool)
@@ -237,6 +265,95 @@
"""
return fcpBool == 'true'
+
+def uriToList(uri):
+ """Splits a freenet uri into its components
+ @param uri: (str) uri to split
+ @return: (list) or components
+ @note: additional dexoration like 'freenet:' or 'http://blah' are ignored
+
+ >>> uriToList('SSK@foo')
+ ['SSK@foo']
+
+ >>> uriToList('SSK@foo/bar/baz')
+ ['SSK@foo', 'bar', 'baz']
+
+ >>> uriToList('freenet:SSK@foo')
+ ['SSK@foo']
+
+ >>> uriToList('http://foo/SSK@foo')
+ ['SSK@foo']
+
+ >>> uriToList('http://foo')
+ []
+
+ """
+ if uri.startswith('freenet:'):
+ uri = uri[len('freenet:'): ]
+
+ components = []
+ head = uri
+ while head:
+ head, tail = posixpath.split(head)
+ components.append(tail)
+
+ components.reverse()
+
+ while components:
+ if components[0][:4] in FcpKeys.KeysAll:
+ break
+ else:
+ components.pop(0)
+
+ return components
+
+def uriKeyType(uri):
+ pass
+
+
+def isValidUri(uri):
+ """Checks if an uri is a valid freenet uri
+ @param uri: (str) uri to check
+ @reuturn: (bool)
+ """
+ return bool(UriToList(uri))
+
+
+def splitUri(uri):
+ """Splits an uri into uri and filename
+ @param uri: uri to split
+ @return: tuple(uri, filename)
+
+ >>> splitUri('SSK@foo/bar/baz')
+ ('SSK@foo', 'bar/baz')
+
+ >>> splitUri('SSK@foo')
+ ('SSK@foo', '')
+
+ >>> splitUri('NoUri')
+ ('NoUri', '')
+
+ """
+ L = uriToList(uri)
+ if not L:
+ return (uri, '')
+ names = L[1:]
+ if not names:
+ name = ''
+ else:
+ name = '/'.join(names)
+ return (L[0], name)
+
+
+def fileNameFromUri(uri):
+ """Returns the filename part of an uri
+ @return: (str) filename. If no filename is found the uri is returned unchanged
+ """
+ tmp_uri, name = splitUri(uri)
+ if name:
+ return name
+ return uri
+
#**********************************************************************
# classes
#**********************************************************************
@@ -303,7 +420,6 @@
#**************************************************************************
# jobs
#**************************************************************************
-#TODO: do somrthing that this class does not lock the queue
class JobBase(object):
"""Base class for jobs"""
@@ -311,18 +427,28 @@
def __init__(self, fcpClient, identifier, message):
- self.fcpClient = fcpClient
- self.fcpIdentifier = identifier
- self.fcpMessage = message
- self.fcpResult = None
- self.fcpTime = 0
+ self.fcpClient = fcpClient # FcpClient() instance the job belongs to
+ self.fcpError = None # last error (either this is set or dcpResult)
+ self.fcpIdentifier = identifier #
+ self.fcpMessage = message # message send to node
+ self.fcpResult = None # job result
+ self.fcpTime = 0 # start time (will hld duration whern the job is complte)
+ def displayName(self):
+ return 'JobBase'
+
def start(self):
self.fcpTime = time.time()
self.fcpClient.sendMessageEx(self.fcpMessage)
+ def error(self, msg):
+ self.fcpTime = time.time() - self.fcpTime
+ self.fcpError = msg
+ self.fcpResult = None
+
def stop(self, result):
self.fcpTime = time.time() - self.fcpTime
+ self.fcpError = None
self.fcpResult = result
@@ -338,8 +464,10 @@
)
JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
+ def displayName(self):
+ return 'NodeHello'
+
-
class JobListPeers(JobBase):
_fcp_auto_remove_ = True
@@ -353,15 +481,21 @@
JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
+ def displayName(self):
+ return 'ListPeers'
+
def handlePeer(self,msg):
pass
-class JobFileInfo(JobBase):
+class JobGetFileInfo(JobBase):
+ """"""
_fcp_auto_remove_ = False
+
+ # idea is to provoke a GetFailed message and take mimetype and size from it
def __init__(self, fcpClient, uri, **params):
"""
@param fcpClient: FcpClient() instance
@@ -378,7 +512,7 @@
Messages.ClientGet,
Identifier=identifier,
URI=uri,
- MaxSize='ase0',
+ MaxSize='0',
ReturnType='none',
Verbosity='1',
**params
@@ -386,36 +520,37 @@
JobBase.__init__(self, fcpClient, identifier, message)
+ def displayName(self):
+ return 'GetFileInfo'
+
def handleProgress(self, msg):
pass
- def stop(self, msg):
- JobBase.stop(self, msg)
- error = result = None
+ def error(self, msg):
+ JobBase.error(self, msg)
if msg.name == Messages.GetFailed:
if msg['Code'] == FetchErrors.TooBig:
- result = (
+ self.fcpError = None
+ self.fcpResult = (
msg.get('ExpectedMetadata.ContentType', ''),
msg.get('ExpectedDataLength', '')
)
- else:
- error, result = msg['Code'], msg
-
- elif msg.name == Messages.DataFound:
- result = (
+ #else:
+ # raise ValueError('Unhandled message: %s' % msg.name)
+
+
+ def stop(self, msg):
+ JobBase.stop(self, msg)
+ if msg.name == Messages.DataFound:
+ self.fcpResult = (
msg.get('Metadata.ContentType', ''),
msg.get('DataLength', '')
)
-
- elif msg.name == Messages.ProtocolError:
- error, result = msg['Code'], msg
-
else:
raise ValueError('Unhandled message: %s' % msg.name)
-
- self.fcpResult = error, result
+
#TODO: handle case where directories are registered multiple times
class JobTestDDA(JobBase):
@@ -432,7 +567,9 @@
JobBase.__init__(self, fcpClient, directory, message)
self.fcpTmpFile = None
-
+ def displayName(self):
+ return 'TestDDA'
+
def handleTestDDAReply(self, msg):
fpathWrite = msg.params.get('WriteFilename', None)
fpathRead = msg.params.get('ReadFilename', None)
@@ -456,19 +593,58 @@
ReadContent=readContent,
)
+ def error(self, msg):
+ JobBase.error(self, msg)
+ saveRemoveFile(self.fcpTmpFile)
+ self.fcpTmpFile = None
+
+
def stop(self, msg):
JobBase.stop(self, msg)
- if self.fcpTmpFile is not None:
- if os.path.isfile(self.fcpTmpFile):
- os.remove(self.fcpTmpFile)
+ saveRemoveFile(self.fcpTmpFile)
+ self.fcpTmpFile = None
#**************************************************************************
# fcp client
#**************************************************************************
+class LogMessages:
+ Connecting = 'Connecting to node...'
+ Connected = 'Connected to node'
+ ConnectionRetry = 'Connecting to node failed... retrying'
+ ConnectingFailed = 'Connecting to node failed'
+
+ ClientClose = 'Closing client'
+
+ MessageSend = 'Message send'
+ MessageReceived = 'Message received'
+
+ JobStart = 'Starting job: '
+ JobStop = 'Stopping job: '
+ JobsCompleted = 'All jobs completed'
+
+ KeyboardInterrupt = 'Keyboard interrupt'
+ SocketDead = 'Socket is dead'
+
+
+#TODO: no idea what happens on reconnect if socket died. What about running jobs?
+#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
class FcpClient(object):
- def __init__(self):
-
+ def __init__(self,
+ name='',
+ errorHandler=None,
+ verbosity=logging.WARNING,
+ logMessages=LogMessages
+ ):
+ """
+ @param name: name of the client instance or '' (for debugging)
+ @param errorHandler: will be called if the socket conncetion to the node is dead
+ with two params: FcpSocketError + details. When the handler is called the client
+ is already closed.
+ @param verbosity: verbosity level for debugging
+ @param logMessages: LogMessages class containing messages
+ """
+
self._isConnected = False
self._jobs = {
'all': {},
@@ -476,24 +652,34 @@
'running': [],
'complete': [],
}
+ self._errrorHandler = errorHandler
+ self._log = logging.getLogger('FcpClient20:%s' % name)
+ self._logMessages = logMessages
self._lock = thread.allocate_lock()
self._socket = None
-
+ self.setVerbosity(verbosity)
+ atexit.register(self.close)
def close(self):
+ self._log.info(self._logMessages.ClientClose)
if self._socket is not None:
self._socket.close()
self._socket = None
+ #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
+
+ self._log.info(self._logMessages.Connecting)
+
# poll untill freenet responds
time_elapsed = 0
while time_elapsed <= repeat:
# try to Connect socket
- self.close()
+ if self._socket is not None:
+ self.close()
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(SocketTimeout)
try:
@@ -501,19 +687,19 @@
except Exception, d:
pass
else:
- #self._isConnected = True
+ self._log.info(self._logMessages.Connected)
return True
+ self._log.info(self._logMessages.ConnectionRetry)
+
# continue polling
time_elapsed += timeout
time.sleep(timeout)
+ self._log.info(self._logMessages.ConnectingFailed)
return False
- #def __nonzero__(self):
- # return self._isConnected
-
def addJob(self, job):
self._lock.acquire(True)
try:
@@ -523,9 +709,12 @@
self._jobs['running'].append(job)
finally:
self._lock.release()
+
+ self._log.info(self._logMessages.JobStart + job.displayName())
job.start()
- def finishJob(self, identifier, msg):
+
+ def stopJob(self, identifier, msg, error=False):
self._lock.acquire(True)
try:
job = self._jobs['all'].get(identifier, None)
@@ -540,7 +729,11 @@
if job is None:
raise ValueError('No such job: %r' % identifier)
- job.stop(msg)
+ self._log.info(self._logMessages.JobStop + job.displayName())
+ if error:
+ job.error(msg)
+ else:
+ job.stop(msg)
def notifyJob(self, identifier, handler, msg):
@@ -558,27 +751,25 @@
# TODO:
# x. push pending jobs
- # x. on error stop this thingy
-
- n = 0
- while True:
- if not self._lock.acquire(False):
- continue
+ try:
+ while True:
+ if not self._lock.acquire(False):
+ continue
+
+ try:
+ if not self._jobs['pending'] and not self._jobs['running']:
+ self._log.info(self._logMessages.JobsCompleted)
+ break
+ finally:
+ self._lock.release()
+
+ self.next()
+ except KeyboardInterrupt:
+ self._log(self._logMessages.KeyboardInterrupt)
+ self.close()
- try:
- if not self._jobs['pending'] and not self._jobs['running']:
- break
- finally:
- self._lock.release()
-
- msg = self.readMessage()
- self.handleMessage(msg)
-
-
- n += 1
- if n > 50: break
-
+ #TODO: some info when all jobs are completed
def next(self):
msg = self.readMessage()
self.handleMessage(msg)
@@ -586,33 +777,32 @@
def handleMessage(self, msg):
- print msg.pprint()
-
+ self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
+
if msg.name == Messages.NodeHello:
#connectionIdentifier = msg['ConnectionIdentifier']
- self.finishJob(JobIdentifiers.ClientHello, msg)
+ self.stopJob(JobIdentifiers.ClientHello, msg)
elif msg.name == Messages.ProtocolError:
code = msg['Code']
-
if code == ProtocolErrors.NoLateClientHellos:
- self.finishJob(JobIdentifiers.ClientHello, msg)
+ self.stopJob(JobIdentifiers.ClientHello, msg, error=True)
else:
identifier = msg.get('Identifier', None)
if identifier is None:
pass # raise ???
else:
- self.finishJob(identifier, msg)
+ self.stopJob(identifier, msg, error=True)
elif msg.name == Messages.Peer:
self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg)
elif msg.name == Messages.EndListPeers:
- self.finishJob(IdentifierListPeers, msg)
+ self.stopJob(IdentifierListPeers, msg)
elif msg.name == Messages.GetFailed:
- self.finishJob(msg['Identifier'], msg)
+ self.stopJob(msg['Identifier'], msg, error=True)
elif msg.name == Messages.SimpleProgress:
self.notifyJob(msg['Identifier'], 'handleProgress', msg)
@@ -621,7 +811,7 @@
self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg)
elif msg.name == Messages.TestDDAComplete:
- self.finishJob(msg['Directory'], msg)
+ self.stopJob(msg['Directory'], msg)
elif msg.name == Messages.IdentifierCollision:
pass
@@ -642,6 +832,10 @@
msg = MessageSocketTimeout()
break
except Exception, d:
+ self._log.info(self._logMessages.SocketDead)
+ self.close()
+ if self._errorHandler is not None:
+ self._errorHandler(FcpSocketError, d)
raise FcpSocketError(d) #!!
if p == '\r': # ignore
@@ -662,7 +856,12 @@
n = int(msg.params['DataLength'])
try:
msg.data = self._socket.recv(n)
+ if not msg.data: raise ValueError('Socket is dead')
except Exception, d:
+ self._log.info(self._logMessages.SocketDead)
+ self.close()
+ if self._errorHandler is not None:
+ self._errorHandler(FcpSocketError, d)
raise FcpSocketError(d) #!!
else:
@@ -674,8 +873,13 @@
return msg
-
+ def setLogMessages(self, logMessages):
+ self._logMessages = logMessages
+
+ def setVerbosity(self, verbosity):
+ self._log.setLevel(verbosity)
+
def sendMessage(self, name, data=None, **params):
"""Sends a message to freenet
@param name: name of the message to send
@@ -691,25 +895,50 @@
@param msg: (Message) message to send
@return: Message
"""
- #self.log.info('SendMessage\n' + msg.pprint())
rawMsg = msg.toString()
+ self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint())
try:
self._socket.sendall(rawMsg)
except Exception, d:
+ self._log.info(self._logMessages.SocketDead)
+ self.close()
+ if self._errorHandler is not None:
+ self._errorHandler(FcpSocketError, d)
raise FcpSocketError(d)
- #TODO: allow for an error handler to handle
return msg
#*****************************************************************************
#
#*****************************************************************************
if __name__ == '__main__':
- c = FcpClient()
+ c = FcpClient(name='test', verbosity=logging.DEBUG)
if c.connect():
job1 = JobNodeHello(c)
c.addJob(job1)
c.run()
print '---------------------------'
- print job1.fcpResult.pprint()
+ print job1.fcpError
+ print job1.fcpResult
+ print job1.fcpTime
+ print '---------------------------'
+
+
+ job2 = JobTestDDA(c, os.path.dirname(__file__))
+ c.addJob(job2)
+ c.run()
+ print '---------------------------'
+ print job1.fcpError
+ print job2.fcpResult
+ print job2.fcpTime
+ print '---------------------------'
+
+ job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
+ c.addJob(job2)
+ c.run()
+ print '---------------------------'
+ print job1.fcpError
+ print job2.fcpResult
+ print job2.fcpTime
+ print '---------------------------'
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ju...@us...> - 2007-10-15 17:42:21
|
Revision: 4
http://fclient.svn.sourceforge.net/fclient/?rev=4&view=rev
Author: jurner
Date: 2007-10-15 10:42:24 -0700 (Mon, 15 Oct 2007)
Log Message:
-----------
started implementing fcp20 client
Added Paths:
-----------
trunk/fclient/
trunk/fclient/__init__.py
trunk/fclient/fclient_lib/
trunk/fclient/fclient_lib/__init__.py
trunk/fclient/fclient_lib/fcp/
trunk/fclient/fclient_lib/fcp/__init__.py
trunk/fclient/fclient_lib/fcp/fcp20.py
Added: trunk/fclient/__init__.py
===================================================================
--- trunk/fclient/__init__.py (rev 0)
+++ trunk/fclient/__init__.py 2007-10-15 17:42:24 UTC (rev 4)
@@ -0,0 +1 @@
+
Added: trunk/fclient/fclient_lib/__init__.py
===================================================================
--- trunk/fclient/fclient_lib/__init__.py (rev 0)
+++ trunk/fclient/fclient_lib/__init__.py 2007-10-15 17:42:24 UTC (rev 4)
@@ -0,0 +1 @@
+
Added: trunk/fclient/fclient_lib/fcp/__init__.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/__init__.py (rev 0)
+++ trunk/fclient/fclient_lib/fcp/__init__.py 2007-10-15 17:42:24 UTC (rev 4)
@@ -0,0 +1 @@
+
Added: trunk/fclient/fclient_lib/fcp/fcp20.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp20.py (rev 0)
+++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-15 17:42:24 UTC (rev 4)
@@ -0,0 +1,715 @@
+
+import os
+import socket
+import time
+import thread
+import uuid
+#**************************************************************
+# consts
+#**************************************************************
+DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
+DefaultFcpPort = 9481
+try:
+ DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
+except: pass
+SocketTimeout = 0.1
+KeyTypes = ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@')
+
+class JobIdentifiers:
+ # fixed job identifiers
+ # note that the client can only handle one job of these at a time
+ ClientHello = 'ClientHello'
+ ListPeers = 'ListPeers'
+
+class Messages:
+
+ # client messages
+ ClientHello = 'ClientHello'
+ ListPeer = 'ListPeer' # (since 1045)
+ ListPeers = 'ListPeers'
+ ListPeerNotes = 'ListPeerNotes'
+ AddPeer = 'AddPeer'
+ ModifyPeer = 'ModifyPeer'
+ ModifyPeerNote = 'ModifyPeerNote'
+ RemovePeer = 'RemovePeer'
+ GetNode = 'GetNode'
+ GetConfig = 'GetConfig' # (since 1027)
+ ModifyConfig = 'ModifyConfig' # (since 1027)
+ TestDDARequest = 'TestDDARequest' # (since 1027)
+ TestDDAResponse = 'TestDDAResponse' # (since 1027)
+ GenerateSSK = 'GenerateSSK'
+ ClientPut = 'ClientPut'
+ ClientPutDiskDir = 'ClientPutDiskDir'
+ ClientPutComplexDir = 'ClientPutComplexDir'
+ ClientGet = 'ClientGet'
+ SubscribeUSK = 'SubscribeUSK'
+ WatchGlobal = 'WatchGlobal'
+ GetRequestStatus = 'GetRequestStatus'
+ ListPersistentRequests = 'ListPersistentRequests'
+ RemovePersistentRequest = 'RemovePersistentRequest'
+ ModifyPersistentRequest = 'ModifyPersistentRequest'
+ Shutdown = 'Shutdown'
+
+ # node messages
+ NodeHello = 'NodeHello'
+ CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
+ Peer = 'Peer'
+ PeerNote = 'PeerNote'
+ EndListPeers = 'EndListPeers'
+ EndListPeerNotes = 'EndListPeerNotes'
+ PeerRemoved = 'PeerRemoved'
+ NodeData = 'NodeData'
+ ConfigData = 'ConfigData' # (since 1027)
+ TestDDAReply = 'TestDDAReply' # (since 1027)
+ TestDDAComplete = 'TestDDAComplete' # (since 1027)
+ SSKKeypair = 'SSKKeypair'
+ PersistentGet = 'PersistentGet'
+ PersistentPut = 'PersistentPut'
+ PersistentPutDir = 'PersistentPutDir'
+ URIGenerated = 'URIGenerated'
+ PutSuccessful = 'PutSuccessful'
+ PutFetchable = 'PutFetchable'
+ DataFound = 'DataFound'
+ AllData = 'AllData'
+ StartedCompression = 'StartedCompression'
+ FinishedCompression = 'FinishedCompression'
+ SimpleProgress = 'SimpleProgress'
+ EndListPersistentRequests = 'EndListPersistentRequests'
+ PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
+ PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
+ PutFailed = 'PutFailed'
+ GetFailed = 'GetFailed'
+ ProtocolError = 'ProtocolError'
+ IdentifierCollision = 'IdentifierCollision'
+ UnknownNodeIdentifier = 'UnknownNodeIdentifier'
+ UnknownPeerNoteType = 'UnknownPeerNoteType'
+ SubscribedUSKUpdate = 'SubscribedUSKUpdate'
+
+
+class Priorities:
+ Maximum = 0
+ Interactive = 1
+ SemiInteractive = 2
+ Updatable = 3
+ Bulk = 4
+ Prefetch = 5
+ Minimum = 6
+
+ PriorityMin = Minimum
+ PriorityDefault = Bulk
+
+
+# errors
+
+class FetchErrors:
+ MaxArchiveRecursionExceeded = '1'
+ UnknownSplitfileMetadata = '2'
+ UnknownMetadata = '3'
+ InvalidMetadata = '4'
+ ArchiveFailure = '5'
+ BlockDecodeError = '6'
+ MaxMetadataLevelsExceeded = '7'
+ MaxArchiveRestartsExceeded = '8'
+ MaxRecursionLevelExceeded = '9'
+ NotAnArchve = '10'
+ TooManyMetastrings = '11'
+ BucketError = '12'
+ DataNotFound = '13'
+ RouteNotFound = '14'
+ RejectedOverload = '15'
+ TooManyRedirects = '16'
+ InternalError = '17'
+ TransferFailed = '18'
+ SplitfileError = '19'
+ InvalidUri = '20'
+ TooBig = '21'
+ MetadataTooBig = '22'
+ TooManyBlocks = '23'
+ NotEnoughMetastrings = '24'
+ Canceled = '25'
+ ArchiveRestart = '26'
+ PermanentRedirect = '27'
+ NotAllDataFound = '28'
+
+
+class InsertErrors:
+ InvalidUri = '1'
+ BucketError = '2'
+ InternalError = '3'
+ RejectedOverload = '4'
+ RouteNotFound = '5'
+ FatalErrorInBlocks = '6'
+ TooManyRetriesInBlock = '7'
+ RouteReallyNotFound = '8'
+ Collision = '9'
+ Canceled = '10'
+
+
+class ProtocolErrors:
+ ClientHelloMustBeFirst = '1'
+ NoLateClientHellos = '2'
+ MessageParseError = '3'
+ UriParseError = '4'
+ MissingField = '5'
+ ErrorParsingNumber = '6'
+ InvalidMessage = '7'
+ InvalidField = '8'
+ FileNotFound = '9'
+ DiskTargetExists = '10'
+ SameDirectoryExpected = '11'
+ CouldNotCreateFile = '12'
+ CouldNotWriteFile = '13'
+ CouldNotRenameFile = '14'
+ NoSuchIdentifier = '15'
+ NotSupported = '16'
+ InternalError = '17'
+ ShuttingDown = '18'
+ NoSuchNodeIdentifier = '19' # Unused since 995
+ UrlParseError = '20'
+ ReferenceParseError = '21'
+ FileParseError = '22'
+ NotAFile = '23'
+ AccessDenied = '24'
+ DDADenied = '25'
+ CouldNotReadFile = '26'
+ ReferenceSignature = '27'
+ CanNotPeerWithSelf = '28'
+ PeerExists = '29'
+ OpennetDisabled = '30'
+ DarknetOnly = '31'
+
+#**********************************************************************
+# functions
+#**********************************************************************
+def newIdentifier():
+ return str(uuid.uuid4())
+
+def saveReadFile(fpath):
+ """Reads contents of a file in the savest manner possible
+ @param fpath: file to write
+ @return: contents of the file if successful, None otherwise
+ """
+ read = None
+ try:
+ fp = open(fpath, 'rb')
+ except: pass
+ else:
+ try:
+ read = fp.read()
+ except: pass
+ fp.close()
+ return read
+
+
+def saveWriteFile(fpath, data):
+ """Writes data to a file i the savest manner possible
+ @param fpath: file to write
+ @param data: data to write to file
+ @return: True if successful, False otherwise
+ """
+ written = False
+ try:
+ fp = open(fpath, 'wb')
+ except: pass
+ else:
+ try:
+ fp.write(data)
+ written = True
+ except:
+ fp.Close()
+ else:
+ fp.close()
+ return written
+
+def fcpBool(pythonBool):
+ """Converts a python bool to a fcp bool
+ @param pythonBool: (bool)
+ @return: (str) 'true' or 'false'
+ """
+ if pythonBool:
+ return 'true'
+ return 'false'
+
+def pythonBool(fcpBool):
+ """Converts a fcp bool to a python bool
+ @param pythonBool: 'true' or 'false'
+ @return: (bool) True or False
+ """
+ return fcpBool == 'true'
+
+#**********************************************************************
+# classes
+#**********************************************************************
+class FcpSocketError(Exception): pass
+class Message(object):
+ """Class wrapping a freenet message"""
+
+ Name = 'UMessage'
+
+
+ def __init__(self, name, data=None, **params):
+ """
+ @param name: messge name
+ @param data: data associated to the messge (not yet implemented)
+ @param params: {field-name: value, ...} of parameters of the message
+ @note: all params can be accessed as attributes of the class
+ """
+ self.data = data
+ self.name = name
+ self.params = params
+
+
+ def toString(self):
+ """Returns a string with the formated message, ready to be send"""
+
+ # TODO: "Data" not yet implemented
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append('%s=%s' % (param, value))
+ out.append('EndMessage\n')
+ return '\n'.join(out)
+
+
+ def pprint(self):
+ """Returns the message as nicely formated human readable string"""
+
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append(' %s=%s' % (param, value))
+ out.append('EndMessage')
+ out.append('')
+ return '\n'.join(out)
+
+ def __getitem__(self, name):
+ """Returns the message parameter 'name' """
+ return self.params[name]
+
+ def get(self, name, default=None):
+ """Returns the message parameter 'name' or 'default' """
+ return self.params.get(name, default)
+
+ def __setitem__(self, name, value):
+ """Sets the message parameter 'name' to 'value' """
+ self.params[name] = value
+
+
+class MessageSocketTimeout(Message):
+
+ Name = 'USocketTimeout'
+
+ def __init__(self):
+ Message.__init__(self, self.Name)
+
+#**************************************************************************
+# jobs
+#**************************************************************************
+#TODO: do somrthing that this class does not lock the queue
+class JobBase(object):
+ """Base class for jobs"""
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, identifier, message):
+
+ self.fcpClient = fcpClient
+ self.fcpIdentifier = identifier
+ self.fcpMessage = message
+ self.fcpResult = None
+ self.fcpTime = 0
+
+ def start(self):
+ self.fcpTime = time.time()
+ self.fcpClient.sendMessageEx(self.fcpMessage)
+
+ def stop(self, result):
+ self.fcpTime = time.time() - self.fcpTime
+ self.fcpResult = result
+
+
+class JobNodeHello(JobBase):
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, expectedVersion='2.0'):
+ message = Message(
+ Messages.ClientHello,
+ Name=newIdentifier(),
+ ExpectedVersion=expectedVersion,
+ )
+ JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
+
+
+
+class JobListPeers(JobBase):
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
+ message = Message(
+ Messages.ListPeers,
+ WithMetadata='true' if withMetaData else 'false',
+ WithVolatile='true' if withVolantile else 'false',
+ )
+ JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
+
+
+ def handlePeer(self,msg):
+ pass
+
+
+
+class JobFileInfo(JobBase):
+
+ _fcp_auto_remove_ = False
+
+ def __init__(self, fcpClient, uri, **params):
+ """
+ @param fcpClient: FcpClient() instance
+ @param uri: uri of the file to retrieve info for
+ @param params: additional parameters:
+ IgnoreDS='true' / 'false'
+ DSOnly='true' / 'false'
+ MaxRetries=-1 ...N
+ PriorityClass=Priority*
+
+ """
+ identifier = newIdentifier()
+ message = Message(
+ Messages.ClientGet,
+ Identifier=identifier,
+ URI=uri,
+ MaxSize='ase0',
+ ReturnType='none',
+ Verbosity='1',
+ **params
+ )
+ JobBase.__init__(self, fcpClient, identifier, message)
+
+
+ def handleProgress(self, msg):
+ pass
+
+
+ def stop(self, msg):
+ JobBase.stop(self, msg)
+ error = result = None
+ if msg.name == Messages.GetFailed:
+ if msg['Code'] == FetchErrors.TooBig:
+ result = (
+ msg.get('ExpectedMetadata.ContentType', ''),
+ msg.get('ExpectedDataLength', '')
+ )
+ else:
+ error, result = msg['Code'], msg
+
+ elif msg.name == Messages.DataFound:
+ result = (
+ msg.get('Metadata.ContentType', ''),
+ msg.get('DataLength', '')
+ )
+
+ elif msg.name == Messages.ProtocolError:
+ error, result = msg['Code'], msg
+
+ else:
+ raise ValueError('Unhandled message: %s' % msg.name)
+
+ self.fcpResult = error, result
+
+
+#TODO: handle case where directories are registered multiple times
+class JobTestDDA(JobBase):
+
+ _fcp_auto_remove_ = False
+
+ def __init__(self, fcpClient, directory, read=True, write=True):
+ message = Message(
+ Messages.TestDDARequest,
+ Directory=directory,
+ WantReadDirectory=fcpBool(read),
+ WantWriteDirectory=fcpBool(write),
+ )
+ JobBase.__init__(self, fcpClient, directory, message)
+ self.fcpTmpFile = None
+
+
+ def handleTestDDAReply(self, msg):
+ fpathWrite = msg.params.get('WriteFilename', None)
+ fpathRead = msg.params.get('ReadFilename', None)
+ readContent = ''
+ if fpathWrite is not None:
+ written = saveWriteFile(fpathWrite, msg['ContentToWrite'])
+ if not written:
+ if os.path.isfile(fpathWrite):
+ os.remove(fpathWrite)
+ else:
+ self.fcpTmpFile = fpathWrite
+
+ if fpathRead is not None:
+ readContent = saveReadFile(fpathRead)
+ if readContent is None:
+ readContent = ''
+
+ self.fcpClient.sendMessage(
+ Messages.TestDDAResponse,
+ Directory=msg['Directory'],
+ ReadContent=readContent,
+ )
+
+ def stop(self, msg):
+ JobBase.stop(self, msg)
+ if self.fcpTmpFile is not None:
+ if os.path.isfile(self.fcpTmpFile):
+ os.remove(self.fcpTmpFile)
+
+#**************************************************************************
+# fcp client
+#**************************************************************************
+class FcpClient(object):
+
+ def __init__(self):
+
+ self._isConnected = False
+ self._jobs = {
+ 'all': {},
+ 'pending': [],
+ 'running': [],
+ 'complete': [],
+ }
+ self._lock = thread.allocate_lock()
+ self._socket = None
+
+
+
+ def close(self):
+ if self._socket is not None:
+ self._socket.close()
+ self._socket = None
+
+
+ def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
+ # poll untill freenet responds
+ time_elapsed = 0
+ while time_elapsed <= repeat:
+
+ # try to Connect socket
+ self.close()
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.settimeout(SocketTimeout)
+ try:
+ self._socket.connect((host, port))
+ except Exception, d:
+ pass
+ else:
+ #self._isConnected = True
+ return True
+
+ # continue polling
+ time_elapsed += timeout
+ time.sleep(timeout)
+
+ return False
+
+
+ #def __nonzero__(self):
+ # return self._isConnected
+
+ def addJob(self, job):
+ self._lock.acquire(True)
+ try:
+ if job.fcpIdentifier in self._jobs['all']:
+ raise ValueError('Duplicate job: %r' % job.identifier)
+ self._jobs['all'][job.fcpIdentifier] = job
+ self._jobs['running'].append(job)
+ finally:
+ self._lock.release()
+ job.start()
+
+ def finishJob(self, identifier, msg):
+ self._lock.acquire(True)
+ try:
+ job = self._jobs['all'].get(identifier, None)
+ if job is not None:
+ self._jobs['running'].remove(job)
+ if job._fcp_auto_remove_:
+ del self._jobs['all'][identifier]
+ else:
+ self._jobs['complete'].append(job)
+ finally:
+ self._lock.release()
+
+ if job is None:
+ raise ValueError('No such job: %r' % identifier)
+ job.stop(msg)
+
+
+ def notifyJob(self, identifier, handler, msg):
+ self._lock.acquire(True)
+ try:
+ job = self._jobs['all'].get(identifier, None)
+ finally:
+ self._lock.release()
+ if job is None:
+ raise ValueError('No such job: %r' % identifier)
+ getattr(job, handler)(msg)
+
+
+ def run(self):
+
+ # TODO:
+ # x. push pending jobs
+ # x. on error stop this thingy
+
+ n = 0
+ while True:
+ if not self._lock.acquire(False):
+ continue
+
+ try:
+ if not self._jobs['pending'] and not self._jobs['running']:
+ break
+ finally:
+ self._lock.release()
+
+ msg = self.readMessage()
+ self.handleMessage(msg)
+
+
+ n += 1
+ if n > 50: break
+
+
+ def next(self):
+ msg = self.readMessage()
+ self.handleMessage(msg)
+
+
+ def handleMessage(self, msg):
+
+ print msg.pprint()
+
+ if msg.name == Messages.NodeHello:
+ #connectionIdentifier = msg['ConnectionIdentifier']
+ self.finishJob(JobIdentifiers.ClientHello, msg)
+
+ elif msg.name == Messages.ProtocolError:
+ code = msg['Code']
+
+ if code == ProtocolErrors.NoLateClientHellos:
+ self.finishJob(JobIdentifiers.ClientHello, msg)
+
+ else:
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ pass # raise ???
+ else:
+ self.finishJob(identifier, msg)
+
+ elif msg.name == Messages.Peer:
+ self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg)
+
+ elif msg.name == Messages.EndListPeers:
+ self.finishJob(IdentifierListPeers, msg)
+
+ elif msg.name == Messages.GetFailed:
+ self.finishJob(msg['Identifier'], msg)
+
+ elif msg.name == Messages.SimpleProgress:
+ self.notifyJob(msg['Identifier'], 'handleProgress', msg)
+
+ elif msg.name == Messages.TestDDAReply:
+ self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg)
+
+ elif msg.name == Messages.TestDDAComplete:
+ self.finishJob(msg['Directory'], msg)
+
+ elif msg.name == Messages.IdentifierCollision:
+ pass
+
+
+ def readMessage(self):
+ """Reads the next message directly from the socket and dispatches it
+ @return: valid or invalid Message()
+ """
+ msg = Message(None)
+ buf = []
+ while True:
+
+ try:
+ p = self._socket.recv(1)
+ if not p: raise ValueError('Socket is dead')
+ except socket.timeout, d: # no new messages in queue
+ msg = MessageSocketTimeout()
+ break
+ except Exception, d:
+ raise FcpSocketError(d) #!!
+
+ if p == '\r': # ignore
+ continue
+
+ if p != '\n':
+ buf.append(p)
+ continue
+
+ line = ''.join(buf)
+ if line in ('End', "EndMessage"):
+ break
+ buf = []
+
+ if msg.name is None:
+ msg.name = line
+ elif line == 'Data':
+ n = int(msg.params['DataLength'])
+ try:
+ msg.data = self._socket.recv(n)
+ except Exception, d:
+ raise FcpSocketError(d) #!!
+
+ else:
+ head, sep, tail = line.partition('=')
+ msg.params[head] = tail
+ if not sep:
+ # TODO: chek for invalid messages or not
+ pass
+
+ return msg
+
+
+
+ def sendMessage(self, name, data=None, **params):
+ """Sends a message to freenet
+ @param name: name of the message to send
+ @param data: data to atatch to the message
+ @param params: {para-name: param-calue, ...} of parameters to pass along
+ with the message (see freenet protocol)
+ """
+ return self.sendMessageEx(Message(name, data=data, **params))
+
+
+ def sendMessageEx(self, msg):
+ """Sends a message to freenet
+ @param msg: (Message) message to send
+ @return: Message
+ """
+ #self.log.info('SendMessage\n' + msg.pprint())
+ rawMsg = msg.toString()
+ try:
+ self._socket.sendall(rawMsg)
+ except Exception, d:
+ raise FcpSocketError(d)
+ #TODO: allow for an error handler to handle
+ return msg
+
+#*****************************************************************************
+#
+#*****************************************************************************
+if __name__ == '__main__':
+ c = FcpClient()
+ if c.connect():
+ job1 = JobNodeHello(c)
+ c.addJob(job1)
+
+ c.run()
+ print '---------------------------'
+ print job1.fcpResult.pprint()
+
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ju...@us...> - 2007-10-15 13:12:59
|
Revision: 3
http://fclient.svn.sourceforge.net/fclient/?rev=3&view=rev
Author: jurner
Date: 2007-10-15 06:12:58 -0700 (Mon, 15 Oct 2007)
Log Message:
-----------
init
Added Paths:
-----------
trunk/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ju...@us...> - 2007-10-15 13:04:34
|
Revision: 2
http://fclient.svn.sourceforge.net/fclient/?rev=2&view=rev
Author: jurner
Date: 2007-10-15 06:04:38 -0700 (Mon, 15 Oct 2007)
Log Message:
-----------
init
Added Paths:
-----------
tags/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|