SF.net SVN: fclient: [22] trunk/fclient/fclient_lib/fcp/fcp2_0.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <jU...@us...> - 2007-10-30 15:11:01
|
Revision: 22
http://fclient.svn.sourceforge.net/fclient/?rev=22&view=rev
Author: jUrner
Date: 2007-10-30 08:11:02 -0700 (Tue, 30 Oct 2007)
Log Message:
-----------
Another major rewrite. Cut all down to a plain message handler and events.
For now no way for now to wrap the protocol on a higher level without getting
into deep troubles.
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 22:44:22 UTC (rev 21)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-30 15:11:02 UTC (rev 22)
@@ -93,6 +93,11 @@
SocketTimeout = 0.1
+class IdentifierPrefix:
+ """Special purpose identifier prefixes"""
+
+ FileInfo = 'FileInfo::'
+
class Verbosity:
Debug = logging.DEBUG
@@ -100,22 +105,6 @@
Warning = logging.WARNING
-class FixedJobIdentifiers:
- """Fixed job identifiers
- @note: he client can only handle one job of these at a time
- """
- ClientHello = 'ClientHello'
- ListPeers = 'ListPeers'
- ListPeerNotes = 'ListPeerNotes'
- GetNode = 'GetNode'
- GetConfig = 'GetConfig'
- ModifyConfig = 'ModifyConfig'
- WatchGlobal = 'WatchGlobal'
- Shutdown = 'Shutdown'
-
-
-
-
class Priorities:
"""All priorities supported by the client"""
@@ -132,8 +121,8 @@
#TODO: no idea how fcp handles strings as in <Peer volatile.status=CONNECTED>
-# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED
-# in --> freenet/node/PeerManager.java
+# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED
+# in --> freenet/node/PeerManager.java
class PeerNodeStatus:
Connected = 1
RoutingBackedOff = 2
@@ -150,16 +139,10 @@
Disconnecting = 13
-#TODO: see if we can get away with these to avoid collisions. TestDDA uses no prefix
-# cos ProtocolError 7 passes the directory as identifier and there is no other hint
-# that the error is related to TestDDA.
-class IdentifierPrefix:
- """Identifier prefixes"""
-
- ClientGet = 'ClientGet::'
- #TestDDA = ''
- PeerNote = 'PeerNote::'
+class PeerNoteType:
+ """All known peer note types"""
+ Private = '1'
#************************************************************************************
@@ -170,7 +153,7 @@
def __init__(self, msg):
"""
- @param msg: (Message) GetFailed message
+ @param msg: (Message) GetFailed message or its parameters dict
"""
self.value = '%s (%s, %s)' % (
msg.get('CodeDescription', 'Unknown error') ,
@@ -214,7 +197,7 @@
def __init__(self, msg):
"""
- @param msg: (Message) PutFailed message
+ @param msg: (Message) PutFailed message or its parameters dict
"""
self.value = '%s (%s, %s)' % (
msg.get('CodeDescription', 'Unknown error') ,
@@ -240,7 +223,7 @@
def __init__(self, msg):
"""
- @param msg: (Message) ProtocolError message
+ @param msg: (Message) ProtocolError message or its parameters dict
"""
self.value = '%s (%s, %s)' % (
msg.get('CodeDescription', 'Unknown error') ,
@@ -279,22 +262,13 @@
CanNotPeerWithSelf = '28'
PeerExists = '29'
OpennetDisabled = '30'
- DarknetOnly = '31'
+ DarknetPeerOnly = '31'
class SocketError(Exception): pass
+class FcpError(Exception): pass
#**********************************************************************
# functions
#**********************************************************************
-def fcpBool(pythonBool):
- """Converts a python bool to a fcp bool
- @param pythonBool: (bool)
- @return: (str) 'true' or 'false'
- """
- if pythonBool:
- return 'true'
- return 'false'
-
-
def newIdentifier(prefix=None):
"""Returns a new unique identifier
@return: (str) uuid
@@ -304,14 +278,6 @@
return str(uuid.uuid4())
-def pythonBool(fcpBool):
- """Converts a fcp bool to a python bool
- @param pythonBool: 'true' or 'false'
- @return: (bool) True or False
- """
- return fcpBool == 'true'
-
-
def saveReadFile(fpath):
"""Reads contents of a file in the savest manner possible
@param fpath: file to write
@@ -692,463 +658,7 @@
out.append('EndMessage\n')
return '\n'.join(out)
-
#**************************************************************************
-# jobs
-#**************************************************************************
-#TODO: maybe remove syncron functionality and rely only on signals
-# ...if so, remove timeStart, timeStop aswell.. leave up to caller
-class JobBase(object):
- """Base class for jobs"""
-
-
- def __init__(self, fcpClient, identifier, message):
- """
- @param fcpClient: FcpClient() instance
- @param identifier: (str) identifier of the job
- @param message: (Message) to send to the node whne the job ist started
- @ivar jobClient: FcpClient() instance of the job
- @ivar jobIdentifier: identifier of the job
- @ivar jobMessage: message to be send to the node
- @ivar jobResult: if no error was encountered, holding the result of the job when complete
- @ivar jobTimeStart: time the job was started
- @ivar jobTimeStop: time the job was stopped
- """
- self.jobClient = fcpClient
- self.jobIdentifier = identifier
- self.jobMessage = message
- self.jobResult = None
- self.jobTimeStart = 0
- self.jobTimeStop = 0
-
-
- def handleMessage(self, msg):
- return False
-
-
- def handleStart(self):
- """Starts the job"""
- self.jobResult = None
- self.jobTimeStart = time.time()
- self.jobClient.sendMessageEx(self.jobMessage)
-
-
- # XXX
- def handleStop(self, flagError, msg):
- """Called on job completion to stop the job
- @param flagError: True if an error was encountered, False otherwise
- @param msg: (Message) to pass to the job
- """
- self.jobTimeStop = time.time()
- self.jobResult = (flagError, msg)
-
-
-class JobClientHello(JobBase):
- """Sed a ClientHello message to the node
-
- @note: this must be the first message passed to the node. If everything
- goes well, you will get a NodeHello in response.
- """
-
- def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
- """
- @param name: (str) connection name or None, to use an arbitrary name
- @param expectedVersion: (str) node version expected
- """
- message = Message(
- Message.ClientHello,
- Name=name if name is not None else newIdentifier(),
- ExpectedVersion=expectedVersion,
- )
- JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ClientHello, message)
-
- def displayName(self):
- return 'ClientHello'
-
-
- def handleMessage(self, msg):
- if msg.name == Message.NodeHello:
- return self.handleNodeHello(msg)
- elif msg.name == Message.ProtocolError:
- return self.handleProtocolError(msg)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handleNodeHello(self, msg):
- self.jobTimeStop = time.time()
- self.jobResult = msg
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
- def handleProtocolError(self, msg):
- raise ProtocolError(msg)
-
-
-
-
-
-
-class JobListPeers(JobBase):
- """Lists all known peers of the node
- """
-
- def __init__(self, fcpClient, withMetaData=True, withVolantile=True):
- """
- @param withMetaData: include meta data for each peer?
- @param withVolantile: include volantile data for each peer?
- @ivar jobResult: on job completion, will be a list containing all perrs as one 'Peer' message for each peer
- """
- message = Message(
- Message.ListPeers,
- WithMetadata=fcpBool(withMetaData),
- WithVolatile=fcpBool(withVolantile),
- )
- JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
-
-
- def handleStart(self):
- JobBase.handleStart(self)
- self.jobClient.EventListPeers()
-
-
- def handleMessage(self,msg):
- if msg.name == Message.EndListPeers:
- return self.handleEndListPeers(msg)
- elif msg.name == Message.Peer:
- return self.handlePeer(msg)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handlePeer(self, msg):
- self.jobClient.EventListNextPeer(msg.params)
- if self.jobResult is None:
- self.jobResult = [msg.params, ]
- else:
- self.jobResult.append(msg.params)
- return True
-
-
- def handleEndListPeers(self, msg):
- self.jobClient.EventEndListPeers()
- self.jobTimeStop = time.time()
- if self.jobResult is None:
- self.jobResult = []
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
-
-class JobListPeerNotes(JobBase):
- """Lists all notes associated to a peer of the node
- """
-
- def __init__(self, fcpClient, identifier):
- """
- @param identifier: identifier of the peer to list notes for (peer identity)
- @ivar jobResult: on job completion will be a list containing all notes associated to the peer
-
- @note: notes are only available for darknet peers (opennet == false)
- """
-
- message = Message(
- Message.ListPeerNotes,
- NodeIdentifier=identifier
- )
- JobBase.__init__(self, fcpClient, IdentifierPrefix.PeerNote + identifier, message)
-
-
- def handleStart(self):
- JobBase.handleStart(self)
- self.jobClient.EventListPeerNotes()
-
-
- def handleMessage(self,msg):
- if msg.name == Message.EndListPeerNotes:
- return self.handleEndListPeerNotes(msg)
- elif msg.name == Message.PeerNote:
- return self.handlePeerNote(msg)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
- def handlePeerNote(self, msg):
- note = msg.get('NoteText', '')
- self.jobClient.EventListNextPeerNote(note)
- if note:
- note = base64.decodestring(note)
- if self.jobResult is None:
- self.jobResult = [note, ]
- else:
- self.jobResult.append(note)
- return True
-
-
- def handleEndListPeerNotes(self, msg):
- self.jobClient.EventEndListPeerNotes()
- self.jobTimeStop = time.time()
- if self.jobResult is None:
- self.jobResult = []
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
-
-
-#TODO: identifier collisions are not yet handled
-class JobGetFileInfo(JobBase):
- """Tries to retieve information about a file. If everything goes well
-
- On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
- Note, that both members may be '' (empty string)
- """
-
-
- # idea is to provoke a GetFailed message and take mimetype and size from 'GetFailed'
- def __init__(self, fcpClient, uri, **params):
- """
- @param fcpClient: FcpClient() instance
- @param uri: uri of the file to retrieve info for
- @param params: additional parameters:
- IgnoreDS='true' / 'false'
- DSOnly='true' / 'false'
- MaxRetries=-1 ...N
- PriorityClass=Priority*
-
- @ivar jobResult: will be a tuple(bool error, data). If error is True, no information could be
- retrieved and data will be a GetFailed message containing details. If error is False
- data will be a tuple(str metadataContentType, str size). Note that both may be empty
- string and size may not be accurate.
-
- """
- identifier = IdentifierPrefix.ClientGet + newIdentifier()
- message = Message(
- Message.ClientGet,
- Identifier=identifier,
- URI=uri,
- # suggested by Mathew Toseland to use about 32k for mimeType requests
- # basic sizes of keys are: 1k for SSks and 32k for CHKs
- MaxSize='32000',
- ReturnType='none',
- Verbosity='1',
- **params
- )
- JobBase.__init__(self, fcpClient, identifier, message)
-
-
- def getPrority(self):
- return self.jobMessage.get('PriorityClass', Priorities.PriorityDefault)
-
-
- def setPriority(self, priority):
- if not priority in Priorities:
- raise ValueError('Invalid priority: %r' % priority)
- self.jobClient.sendMessage(
- Message.ModifyPersistentRequest,
- Identifier=self.jobIdentifier,
- Global=fcpBool(False),
- PriorityClass=priority,
- )
- # not shure if the response arrives in any case, so set it here
- self.jobMessage['PriorityClass'] = priority
-
-
- def stopRequest(self):
- self.jobClient.sendMessage(
- Message.RemovePersistentRequest,
- Global=fcpBool(False),
- Identifier=self.jobIdentifier,
- )
-
-
- def handleMessage(self, msg):
- if msg.name == Message.DataFound:
- return self.handleDataFound(msg)
- elif msg.name == Message.GetFailed:
- return self.handleGetFailed(msg)
- elif msg.name == Message.IdentifierCollision:
- return self.handleIdentifierCollision(msg)
- elif msg.name == Message.PersistentRequestModified:
- return self.handlePersistentRequestModified(msg)
- elif msg.name == Message.PersistentRequestRemoved:
- return self.handlePersistentRequestRemoved(msg)
- elif msg.name == Message.SimpleProgress:
- return self.handleSimpleProgress(msg)
-
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handleDataFound(self, msg):
- self.jobTimeStop = time.time()
- self.jobResult = (
- False,
- (
- msg.get('Metadata.ContentType', ''),
- msg.get('DataLength', '')
- )
- )
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
- def handleGetFailed(self, msg):
- self.jobTimeStop = time.time()
- if msg['Code'] == FetchError.TooBig:
- self.jobResult = (False, msg)
- self.jobResult = (
- False,
- (
- msg.get('ExpectedMetadata.ContentType', ''),
- msg.get('ExpectedDataLength', '')
- )
- )
- else:
- self.jobResult = (True, msg)
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
- def handleIdentifierCollision(self, msg):
- raise
-
-
- def handleSimpleProgress(self, msg):
- return True
-
-
- def handlePersistentRequestModified(self, msg):
- priorityClass = msg.get('PriorityClass', None)
- if priorityClass is not None:
- self.jobMessage['PriorityClass'] = priorityClass
- return True
-
- def handlePersistentRequestRemoved(self, msg):
- if self.jobClient.jobIsRunning(self.jobIdentifier):
- self.jobClient.jobStop(self.jobIdentifier)
- return True
-
-
-#TODO: handle case where directories are registered multiple times
-class JobTestDDA(JobBase):
- """Tests a directory for read / write accesss
- """
-
-
- def __init__(self, fcpClient, directory, read=False, write=False):
- """
-
- @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed)
- """
- if not os.path.isdir(directory):
- raise ValueError('No such directory: %r' % directory)
-
- message = Message(
- Message.TestDDARequest,
- Directory=directory,
- WantReadDirectory=fcpBool(read),
- WantWriteDirectory=fcpBool(write),
- )
-
- JobBase.__init__(self, fcpClient, directory, message)
- self.jobTmpFile = None
-
-
- def handleMessage(self, msg):
- if msg.name == Message.TestDDAReply:
- return self.handleTestDDAReply(msg)
- elif msg.name == Message.TestDDAComplete:
- return self.handleTestDDAComplete(msg)
- elif msg.name == Message.ProtocolError:
- return self.handleProtocolError(msg)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handleProtocolError(self, msg):
- # most likely code 7 here...
- # "Both WantReadDirectory and WantWriteDirectory are set to false: what's the point of sending a message?"
- # ..a stupid response that is ;-)
- self.jobTimeStop = time.time()
- self.jobClient.jobRemove(self.jobIdentifier)
- if msg['Code'] == ProtocolError.InvalidMessage:
- self.jobResult = (False, False)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handleTestDDAReply(self, msg):
- fpathWrite = msg.params.get('WriteFilename', None)
- fpathRead = msg.params.get('ReadFilename', None)
- readContent = ''
- if fpathWrite is not None:
- written = saveWriteFile(fpathWrite, msg['ContentToWrite'])
- if not written:
- if os.path.isfile(fpathWrite):
- os.remove(fpathWrite)
- else:
- self.jobTmpFile = fpathWrite
-
- if fpathRead is not None:
- readContent = saveReadFile(fpathRead)
- if readContent is None:
- readContent = ''
-
- self.jobClient.sendMessage(
- Message.TestDDAResponse,
- Directory=msg['Directory'],
- ReadContent=readContent,
- )
- return True
-
-
- def handleTestDDAComplete(self, msg):
- self.jobTimeStop = time.time()
- self.jobResult = (
- pythonBool(msg.get('ReadDirectoryAllowed', 'false')),
- pythonBool(msg.get('WriteDirectoryAllowed', 'false')),
- )
- saveRemoveFile(self.jobTmpFile)
- self.jobTmpFile = None
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
-
-class JobGenerateSSK(JobBase):
- """Job to generate a SSK key pair
- """
-
-
- def __init__(self, fcpClient):
- """
- @ivar jobResult: on job completion, a tuple(insertURI, requestURI) of the generated
- SSK key
- """
-
- identifier = newIdentifier()
- message = Message(
- Message.GenerateSSK,
- Identifier=identifier,
- )
- JobBase.__init__(self, fcpClient, identifier, message)
-
-
- def handleMessage(self, msg):
- if msg.name == Message.SSKKeypair:
- return self.handleSSKKeypair(msg)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handleSSKKeypair(self, msg):
- self.jobTimeStop = time.time()
- self.jobResult = (msg['InsertURI'], msg['RequestURI'])
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
-#**************************************************************************
# fcp client
#**************************************************************************
class LogMessages:
@@ -1169,7 +679,7 @@
KeyboardInterrupt = 'Keyboard interrupt'
- SocketDead = 'Socket is dead'
+ SocketDied = 'Socket died'
#TODO: no idea what happens on reconnect if socket died. What about running jobs?
@@ -1181,42 +691,62 @@
"""
_events_ = (
+
+ #Peer related events
'EventListPeers',
- 'EventListNextPeer',
'EventEndListPeers',
+ 'EventPeer',
+ 'EventPeerRemoved',
+ 'EventUnknownIdentifier',
'EventListPeerNotes',
- 'EventListNextPeerNote',
'EventEndListPeerNotes',
+ 'EventPeerNote',
+ 'EventShutdown',
+ 'EventSocketDied',
+
+ # get / put related events
+ 'EventIdentifierCollision',
+
+ 'EventFileInfo',
+ 'EventFileInfoProgress',
+
+ 'EventDataFound',
+ 'EventGetFailed',
+ 'EventSimpleProgress',
+ 'EventPersistentRequestModified',
+ 'EventPersistentRequestRemoved',
+
+
+ # others
+ 'EventSSKKeypair',
+
)
+
+ Version = '2.0'
+ FcpTrue = 'true'
+ FcpFalse = 'false'
def __init__(self,
name='',
- errorHandler=None,
+ connectionName=None,
verbosity=Verbosity.Warning,
logMessages=LogMessages
):
"""
@param name: name of the client instance or '' (for debugging)
- @param errorHandler: will be called if the socket conncetion to the node is dead
- with two params: SocketError + details. When the handler is called the client
- is already closed.
+ @param conectionName: name of the connection
@param verbosity: verbosity level for debugging
@param logMessages: LogMessages class containing message strings
"""
- self._isConnected = False
- self._jobs = {
- 'Jobs': {},
- 'PendingJobs': [],
- 'RegisteredDirectories': [],
- }
- self._errorHandler = errorHandler #TODO: check if necessary!
+ self._connectionName = connectionName
+ self._ddaTmpFiles = []
self._log = logging.getLogger(name)
self._logMessages = logMessages
- self._lock = thread.allocate_lock() # lock when resources are accessed
+ self._lock = thread.allocate_lock()
self._socket = None
self.setVerbosity(verbosity)
@@ -1230,6 +760,10 @@
if self._socket is not None:
self._socket.close()
self._socket = None
+
+ # clean left over tmp files
+ for fpath in self._ddaTmpFiles:
+ saveRemoveFile(fpath)
#TODO: an iterator would be nice to enshure Guis stay responsitive in the call
@@ -1245,8 +779,8 @@
self._log.info(self._logMessages.Connecting)
# poll untill freenet responds
- time_elapsed = 0
- while time_elapsed <= repeat:
+ timeElapsed = 0
+ while timeElapsed <= repeat:
# try to Connect socket
if self._socket is not None:
@@ -1265,12 +799,15 @@
# but instad of responding with ClientHelloMustBeFirst
# as expected when not doing so, the node disconnects.
# So take it over here.
- job = JobClientHello(self)
- self.jobAdd(job, synchron=False)
- while time_elapsed <= repeat:
+ self.sendMessage(
+ Message.ClientHello,
+ Name=self._connectionName if self._connectionName is not None else newIdentifier(),
+ ExpectedVersion=self.Version,
+ )
+ while timeElapsed <= repeat:
msg = self.next()
if msg.name == Message.ClientSocketTimeout:
- time_elapsed += SocketTimeout
+ timeElapsed += SocketTimeout
elif msg.name == Message.NodeHello:
return msg.params
else:
@@ -1279,7 +816,7 @@
# continue polling
self._log.info(self._logMessages.ConnectionRetry)
- time_elapsed += timeout
+ timeElapsed += timeout
time.sleep(timeout)
self._log.info(self._logMessages.ConnectingFailed)
@@ -1294,190 +831,172 @@
if msg.name == Message.ClientSocketTimeout:
return True
-
self._log.debug(self._logMessages.MessageReceived + msg.pprint())
-
-
+
if msg.name == Message.ProtocolError:
code = msg['Code']
- if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst:
- return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
-
- elif code == ProtocolError.ShuttingDown:
-
- #TODO: ??? why dispatch to ClientHello.. can't remember
- if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg):
-
- # ########################################
- #TODO: ???
-
- return True
-
- else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- #TODO: inform caller
- raise ProtocolError(msg)
- else:
- return self.jobDispatchMessage(identifier, msg)
-
- else:
+ if code == ProtocolError.ShuttingDown:
+ self.close()
+ self.EventShutdown(msg.params)
+ return True
- # check if the is something like an identifier in the message
- if msg.name == Message.TestDDAReply:
- identifier = msg['Directory']
- elif msg.name == Message.TestDDAComplete:
- identifier = msg['Directory']
- elif msg.name == Message.PeerNote:
- identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier']
- elif msg.name == Message.EndListPeerNotes:
- identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier']
- else:
- identifier = msg.get('Identifier', None)
-
- # dispatch to jobs with fixed identifiers
- if identifier is None:
- if msg.name == Message.NodeHello:
- return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
- elif msg.name == Message.EndListPeers:
- return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
- elif msg.name == Message.Peer:
- return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
-
- # more here.....
-
+ raise ProtocolError(msg)
+
+ ####################################################
+ ##
+ ## TestDDA
+ ##
+ ## Note: if both, ReadDirectoryAllowed and WriteDirectoryAllowed are
+ ## set to false, the node sends a ProtocolError (7, 'Invalid message').
+ ## Have to handle this!
+ ##
+ ####################################################
+ elif msg.name == Message.TestDDAReply:
+ fpathWrite = msg.params.get('WriteFilename', None)
+ fpathRead = msg.params.get('ReadFilename', None)
+ readContent = ''
+ if fpathWrite is not None:
+ written = saveWriteFile(fpathWrite, msg['ContentToWrite'])
+ if not written:
+ saveRemoveFile(fpathWrite)
else:
- raise ValueError('Unhandled message: ' + msg.name)
+ self._ddaTmpFiles.append(fpathWrite)
- else:
- return self.jobDispatchMessage(identifier, msg)
+ if fpathRead is not None:
+ readContent = saveReadFile(fpathRead)
+ if readContent is None:
+ readContent = ''
- raise RuntimeError('We should not have gotten here: %s' % msg.name)
-
+ self.sendMessage(
+ Message.TestDDAResponse,
+ Directory=msg['Directory'],
+ ReadContent=readContent,
+ )
+ return True
+
-
- #########################################################
- ## jobs
- #########################################################
- def hasJobsRunning(self):
- """Checks if the client has running jobs
- @return: (bool) True if so, False otherwise
- """
- self._lock.acquire(True)
- try:
- result = self._jobs['Jobs'] or self._jobs['PendingJobs']
- finally:
- self._lock.release()
+ elif msg.name == Message.TestDDAComplete:
+ # clean tmp files
+ for fpath in self._ddaTmpFiles:
+ saveRemoveFile(fpath)
+ self._ddaTmpFiles = []
+ return True
+
+ ####################################################
+ ##
+ ## Peer related messages
+ ##
+ ####################################################
+ elif msg.name == Message.EndListPeers:
+ self.EventEndListPeers(msg.params)
+ return True
+
+ elif msg.name == Message.EndListPeerNotes:
+ self.EventEndListPeerNotes(msg.params)
+ return True
+ elif msg.name == Message.Peer:
+ self.EventPeer(msg.params)
+ return True
+ elif msg.name == Message.PeerNote:
+ note = msg.get('NoteText', '')
+ if note:
+ note = base64.decodestring(note)
+ msg['NoteText'] = note
+ self.EventPeerNote(msg.params, note)
+ return True
+
+ elif msg.name == Message.PeerRemoved:
+ self.EventPeerRemoved(msg.params)
+ return True
+
+ elif msg.name == Message.UnknownNodeIdentifier:
+ self.EventUnknownIdentifier(msg.params)
+ return True
+
+ ####################################################
+ ##
+ ## Get related messages
+ ##
+ ####################################################
+ elif msg.name == Message.DataFound:
+ if msg['Identifier'].startswith(IdentifierPrefix.FileInfo):
+ self.EventFileInfo(msg.params)
+ return True
+
+ #TODO:
+ self.EventDataFound(msg.params)
+ return True
+
- return result
-
-
- #TODO: not quite clear about the consequences of a synchron job. Have to think this over
- def jobAdd(self, job, synchron=False):
- """Adds a job to the client
- @param job: (Job*) job to add
- @param synchron: if True, wait untill the job is completed, if False return emidiately
- """
- self._lock.acquire(True)
- try:
- if job.jobIdentifier in self._jobs['Jobs']:
- raise ValueError('Duplicate job: %r' % job.jobIdentifier)
- self._jobs['Jobs'][job.jobIdentifier] = job
- finally:
- self._lock.release()
-
- self._log.info(self._logMessages.JobStart + job.jobMessage.name)
- job.handleStart()
- if synchron:
- while self.jobGet(job.jobIdentifier):
- self.next()
+ elif msg.name == Message.GetFailed:
+ code = msg['Code']
+ if code == FetchError.TooBig:
+ if msg['Identifier'].startswith(IdentifierPrefix.FileInfo):
+ params = {
+ 'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''),
+ 'DataLength': msg.get('ExpectedDataLength', '')
+ }
+ self.EventFileInfo(params)
+ return True
+
+ self.EventGetFailed(msg.params)
+ return True
+
+
+ elif msg.name == Message.SimpleProgress:
+ if msg['Identifier'].startswith(IdentifierPrefix.FileInfo):
+ self.EventFileInfoProgress(msg.params)
+ else:
+ self.EventSimpleProgress(msg.params)
+ return True
+
+
+ elif msg.name == Message.IdentifierCollision:
+ self.EventIdentifierCollision(msg.params)
+ return True
+
+ elif msg.name == Message.PersistentRequestModified:
+ self.EventPersistentRequestModified(msg.params)
+ return True
- def jobDispatchMessage(self, identifier, msg):
- """Dispatches a message to a job
- @param identifier: identifier of the job
- @param msg: (Message) message to dispatch
- @return: True if the message was handled, False otherwise
- """
- job = self.jobGet(identifier)
- if job is not None:
- return job.handleMessage(msg)
- return False
+ elif msg.name == Message.PersistentRequestRemoved:
+ self.EventPersistentRequestRemoved(msg.params)
+ return True
-
- def jobGet(self, identifier):
- """Returns a job given its identifier
- @param identifier: identifier of the job
- @return: (Job*) instance or None, if no corrosponding job was found
- """
- self._lock.acquire(True)
- try:
- result = self._jobs['Jobs'].get(identifier, None)
- finally:
- self._lock.release()
- return result
-
-
- def jobIsRunning(self, identifier):
- """Checks if a job is running
- @param identifier: identifier of the job
- @return: True if so, False otherwise
- """
- self._lock.acquire(True)
- try:
- result = identifier in self._jobs['Jobs']
- finally:
- self._lock.release()
- return result
+ ####################################################
+ ##
+ ## Others
+ ##
+ ####################################################
+ elif msg.name == Message.SSKKeypair:
+ self.EventSSKKeypair(msg.params)
+ return True
- def jobRemove(self, identifier):
- """Removes a job unconditionally
- @param identifier: identifier of the job to remove
- @return: True if the job was found, False otherwise
- """
- self._lock.acquire(True)
- try:
- job = self._jobs['Jobs'].get(identifier, None)
- if job is not None:
- del self._jobs['Jobs'][identifier]
- finally:
- self._lock.release()
- if job is None:
- return False
- self._log.info(self._logMessages.JobStop + job.jobMessage.name)
- return True
+ ## default
+ return False
- #TODO: some info when all jobs are completed?
+ #########################################################
+ ##
+ ##
+ ##
+ #########################################################
def next(self):
"""Pumps the next message waiting
@note: use this method instead of run() to run the client step by step
"""
msg = Message.fromSocket(self._socket)
if msg.name == Message.ClientSocketDied:
+ self.EventSocketDied(msg['Exception'], msg['Details'])
raise SocketError(msg['Details'])
self.handleMessage(msg)
return msg
-
- def run(self):
- """Runs the client untill all jobs passed to it are completed
- @note: use KeyboardInterrupt to stop prematurely
- """
- try:
- #n = 0
- while self.hasJobsRunning():
- #n += 1
- #if n > 40: break
- self.next()
- except KeyboardInterrupt:
- self._log(self._logMessages.KeyboardInterrupt)
- self.close()
-
def sendMessage(self, name, data=None, **params):
"""Sends a message to freenet
@param name: name of the message to send
@@ -1503,10 +1022,9 @@
try:
msg.send(self._socket)
except socket.error, d:
- self._log.info(self._logMessages.SocketDead)
+ self._log.info(self._logMessages.SocketDied)
self.close()
- if self._errorHandler is not None:
- self._errorHandler(SocketError, d)
+ self.EventSocketDied(socket.error, d)
raise SocketError(d)
return msg
@@ -1520,16 +1038,120 @@
""""""
self._log.setLevel(verbosity)
+ #########################################################
+ ##
+ ##
+ ##
+ #########################################################
+ def fcpBool(self, pythonBool):
+ """Converts a python bool to a fcp bool
+ @param pythonBool: (bool)
+ @return: (str) 'true' or 'false'
+ """
+ return self.FcpTrue if pythonBool else self.FcpFalse
+ def pythonBool(self, fcpBool):
+ """Converts a fcp bool to a python bool
+ @param pythonBool: 'true' or 'false'
+ @return: (bool) True or False
+ """
+ return fcpBool == self.FcpTrue
+
########################################################
##
+ ## Peer related methods
+ ##
########################################################
- def getFileInfo(self, job):
- pass
+ def listPeer(self, identifier):
+ self.jobClient.sendMessage(
+ Message.ListPeer,
+ NodeIdentifier=identifier,
+ )
+
+
+ def listPeerNotes(self, identifier):
+ """
+ @param identifier: identifier of the peer to list notes for
+ """
+ self.sendMessage(
+ Message.ListPeerNotes,
+ NodeIdentifier=identifier
+ )
+
+
+ def listPeers(self, withMetaData=True, withVolantile=True):
+ """
+ @param withMetaData: include meta data for each peer?
+ @param withVolantile: include volantile data for each peer?
+ """
+ self.sendMessage(
+ Message.ListPeers,
+ WithMetadata=self.fcpBool(withMetaData),
+ WithVolatile=self.fcpBool(withVolantile),
+ )
+ def modifyPeer(self, identifier, allowLocalAddresses=None, isDisabled=None, isListenOnly=None):
+ msg = Message(
+ Message.ModifyPeer,
+ NodeIdentifier=identifier,
+ )
+ if allowLocalAddresses is not None:
+ msg['AllowLocalAddresses'] = self.fcpBool(allowLocalAddresses)
+ if isDisabled is not None:
+ msg['isDisabled'] = self.fcpBool(isDisabled)
+ if isListenOnly is not None:
+ msg['isListenOnly'] = self.fcpBool(isListenOnly)
+ self.jobClient.sendMessageEx(msg)
+ self.sendMessageEx(msg)
+
+ def modifyPeerNote(self, identifier, note):
+ self.sendMessage(
+ Message.ModifyPeerNote,
+ NodeIdentifier=identifier,
+ #NOTE: currently fcp supports only this one type
+ PeerNoteType=PeerNoteType.Private,
+ NoteText=note
+ )
+
+
+ def removePeer(self, identifier):
+ self.sendMessage(
+ Message.RemovePeer,
+ NodeIdentifier=identifier,
+ )
+
+ ##########################################################
+ ##
+ ## get / put related methods
+ ##
+ ##########################################################
+ def fileInfo(self, uri, **params):
+ """Requests info about a file
+ @param uri: uri of the file to request info about
+ @event: FileInfo(event, params). If success, params will contain a key 'Metadata.ContentType'
+ and 'DataLength'. Both may be '' (empty string)
+ @event: FileInfoProgress(event, params). Triggered instead of EventSimpleProgress
+ @note: for other events see: L{clientGet}
+ @return: (str) identifier of the request
+ """
+ identifier = IdentifierPrefix.FileInfo + newIdentifier()
+ self.sendMessage(
+ Message.ClientGet,
+ Identifier=identifier,
+ URI=uri,
+ # suggested by Mathew Toseland to use about 32k for mimeType requests
+ # basic sizes of keys are: 1k for SSks and 32k for CHKs
+ MaxSize='32000',
+ ReturnType='none',
+ Verbosity='1',
+ **params
+ )
+ return identifier
+
+
#########################################################
## how to tackle TestDDA?
##
@@ -1638,7 +1260,7 @@
@event: EventListNextPeerNote(event, note).
@event: EventEndListPeerNotes(event).
"""
- if pythonBool(peer['opennet']): # opennet peers do not have any notes associated
+ if self.pythonBool(peer['opennet']): # opennet peers do not have any notes associated
return []
job = JobListPeerNotes(self, peer['identity'])
self.jobAdd(job, synchron=synchron)
@@ -1649,9 +1271,9 @@
#
#*****************************************************************************
if __name__ == '__main__':
- c = FcpClient(name='test', verbosity=logging.DEBUG)
+ c = FcpClient(name='test', verbosity=Verbosity.Warning)
nodeHello = c.connect()
- if nodeHello is not None or 1:
+ if nodeHello is not None:
@@ -1666,7 +1288,24 @@
# should raise
#foo()
+
+ #ModifyPeer not ok
+
+ #RemovePeer not ok
+
+ #ModifyPeerNote ok
+
+ #ListPeer not ok
+
+
def foo():
+ job = JobListPeer(c, '123456')
+ c.jobAdd(job, synchron=True)
+ print job.jobResult
+ #foo()
+
+
+ def foo():
job = JobGenerateSSK(c)
c.jobAdd(job, synchron=True)
print job.jobResult
@@ -1686,30 +1325,26 @@
#foo()
def foo():
- peers = c.peerList(synchron=True)
- for peer in peers:
- print c.peerNotes(peer, synchron=True)
+ def cb(event, params):
+ #print params.get('opennet', 'true'), c.pythonBool(params.get('Opennet', 'true')), params['identity']
+ if params['opennet'] == c.FcpFalse:
+ c.listPeerNotes(params['identity'])
+
+ c.EventPeer += cb
+
+
+ c.listPeers()
+ for i in xrange(80):
+ c.next()
#foo()
def foo():
#job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip')
+ identifier = c.fileInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
+ for i in xrange(20):
+ c.next()
- job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
- #job.jobIdentifier = job.jobMessage['Identifier'] = 1
- #job.jobMessage['Identifier'] = 1
- #job.jobIdentifier = 1
- c.jobAdd(job)
-
- #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%201%281%29.jpg')
- #job.jobMessage['Identifier'] = 1
- #job.jobIdentifier = 1
- #c.jobAdd(job)
-
- c.run()
- print '---------------------------'
- print job.jobResult
- print '---------------------------'
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|