SF.net SVN: fclient: [22] trunk/fclient/fclient_lib/fcp/fcp2_0.py
Status: Pre-Alpha
Brought to you by:
jurner
From: <jU...@us...> - 2007-10-30 15:11:01
|
Revision: 22 http://fclient.svn.sourceforge.net/fclient/?rev=22&view=rev Author: jUrner Date: 2007-10-30 08:11:02 -0700 (Tue, 30 Oct 2007) Log Message: ----------- Another major rewrite. Cut all down to a plain message handler and events. For now no way for now to wrap the protocol on a higher level without getting into deep troubles. Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 22:44:22 UTC (rev 21) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-30 15:11:02 UTC (rev 22) @@ -93,6 +93,11 @@ SocketTimeout = 0.1 +class IdentifierPrefix: + """Special purpose identifier prefixes""" + + FileInfo = 'FileInfo::' + class Verbosity: Debug = logging.DEBUG @@ -100,22 +105,6 @@ Warning = logging.WARNING -class FixedJobIdentifiers: - """Fixed job identifiers - @note: he client can only handle one job of these at a time - """ - ClientHello = 'ClientHello' - ListPeers = 'ListPeers' - ListPeerNotes = 'ListPeerNotes' - GetNode = 'GetNode' - GetConfig = 'GetConfig' - ModifyConfig = 'ModifyConfig' - WatchGlobal = 'WatchGlobal' - Shutdown = 'Shutdown' - - - - class Priorities: """All priorities supported by the client""" @@ -132,8 +121,8 @@ #TODO: no idea how fcp handles strings as in <Peer volatile.status=CONNECTED> -# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED -# in --> freenet/node/PeerManager.java +# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED +# in --> freenet/node/PeerManager.java class PeerNodeStatus: Connected = 1 RoutingBackedOff = 2 @@ -150,16 +139,10 @@ Disconnecting = 13 -#TODO: see if we can get away with these to avoid collisions. TestDDA uses no prefix -# cos ProtocolError 7 passes the directory as identifier and there is no other hint -# that the error is related to TestDDA. -class IdentifierPrefix: - """Identifier prefixes""" - - ClientGet = 'ClientGet::' - #TestDDA = '' - PeerNote = 'PeerNote::' +class PeerNoteType: + """All known peer note types""" + Private = '1' #************************************************************************************ @@ -170,7 +153,7 @@ def __init__(self, msg): """ - @param msg: (Message) GetFailed message + @param msg: (Message) GetFailed message or its parameters dict """ self.value = '%s (%s, %s)' % ( msg.get('CodeDescription', 'Unknown error') , @@ -214,7 +197,7 @@ def __init__(self, msg): """ - @param msg: (Message) PutFailed message + @param msg: (Message) PutFailed message or its parameters dict """ self.value = '%s (%s, %s)' % ( msg.get('CodeDescription', 'Unknown error') , @@ -240,7 +223,7 @@ def __init__(self, msg): """ - @param msg: (Message) ProtocolError message + @param msg: (Message) ProtocolError message or its parameters dict """ self.value = '%s (%s, %s)' % ( msg.get('CodeDescription', 'Unknown error') , @@ -279,22 +262,13 @@ CanNotPeerWithSelf = '28' PeerExists = '29' OpennetDisabled = '30' - DarknetOnly = '31' + DarknetPeerOnly = '31' class SocketError(Exception): pass +class FcpError(Exception): pass #********************************************************************** # functions #********************************************************************** -def fcpBool(pythonBool): - """Converts a python bool to a fcp bool - @param pythonBool: (bool) - @return: (str) 'true' or 'false' - """ - if pythonBool: - return 'true' - return 'false' - - def newIdentifier(prefix=None): """Returns a new unique identifier @return: (str) uuid @@ -304,14 +278,6 @@ return str(uuid.uuid4()) -def pythonBool(fcpBool): - """Converts a fcp bool to a python bool - @param pythonBool: 'true' or 'false' - @return: (bool) True or False - """ - return fcpBool == 'true' - - def saveReadFile(fpath): """Reads contents of a file in the savest manner possible @param fpath: file to write @@ -692,463 +658,7 @@ out.append('EndMessage\n') return '\n'.join(out) - #************************************************************************** -# jobs -#************************************************************************** -#TODO: maybe remove syncron functionality and rely only on signals -# ...if so, remove timeStart, timeStop aswell.. leave up to caller -class JobBase(object): - """Base class for jobs""" - - - def __init__(self, fcpClient, identifier, message): - """ - @param fcpClient: FcpClient() instance - @param identifier: (str) identifier of the job - @param message: (Message) to send to the node whne the job ist started - @ivar jobClient: FcpClient() instance of the job - @ivar jobIdentifier: identifier of the job - @ivar jobMessage: message to be send to the node - @ivar jobResult: if no error was encountered, holding the result of the job when complete - @ivar jobTimeStart: time the job was started - @ivar jobTimeStop: time the job was stopped - """ - self.jobClient = fcpClient - self.jobIdentifier = identifier - self.jobMessage = message - self.jobResult = None - self.jobTimeStart = 0 - self.jobTimeStop = 0 - - - def handleMessage(self, msg): - return False - - - def handleStart(self): - """Starts the job""" - self.jobResult = None - self.jobTimeStart = time.time() - self.jobClient.sendMessageEx(self.jobMessage) - - - # XXX - def handleStop(self, flagError, msg): - """Called on job completion to stop the job - @param flagError: True if an error was encountered, False otherwise - @param msg: (Message) to pass to the job - """ - self.jobTimeStop = time.time() - self.jobResult = (flagError, msg) - - -class JobClientHello(JobBase): - """Sed a ClientHello message to the node - - @note: this must be the first message passed to the node. If everything - goes well, you will get a NodeHello in response. - """ - - def __init__(self, fcpClient, name=None, expectedVersion='2.0'): - """ - @param name: (str) connection name or None, to use an arbitrary name - @param expectedVersion: (str) node version expected - """ - message = Message( - Message.ClientHello, - Name=name if name is not None else newIdentifier(), - ExpectedVersion=expectedVersion, - ) - JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ClientHello, message) - - def displayName(self): - return 'ClientHello' - - - def handleMessage(self, msg): - if msg.name == Message.NodeHello: - return self.handleNodeHello(msg) - elif msg.name == Message.ProtocolError: - return self.handleProtocolError(msg) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handleNodeHello(self, msg): - self.jobTimeStop = time.time() - self.jobResult = msg - self.jobClient.jobRemove(self.jobIdentifier) - return True - - - def handleProtocolError(self, msg): - raise ProtocolError(msg) - - - - - - -class JobListPeers(JobBase): - """Lists all known peers of the node - """ - - def __init__(self, fcpClient, withMetaData=True, withVolantile=True): - """ - @param withMetaData: include meta data for each peer? - @param withVolantile: include volantile data for each peer? - @ivar jobResult: on job completion, will be a list containing all perrs as one 'Peer' message for each peer - """ - message = Message( - Message.ListPeers, - WithMetadata=fcpBool(withMetaData), - WithVolatile=fcpBool(withVolantile), - ) - JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) - - - def handleStart(self): - JobBase.handleStart(self) - self.jobClient.EventListPeers() - - - def handleMessage(self,msg): - if msg.name == Message.EndListPeers: - return self.handleEndListPeers(msg) - elif msg.name == Message.Peer: - return self.handlePeer(msg) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handlePeer(self, msg): - self.jobClient.EventListNextPeer(msg.params) - if self.jobResult is None: - self.jobResult = [msg.params, ] - else: - self.jobResult.append(msg.params) - return True - - - def handleEndListPeers(self, msg): - self.jobClient.EventEndListPeers() - self.jobTimeStop = time.time() - if self.jobResult is None: - self.jobResult = [] - self.jobClient.jobRemove(self.jobIdentifier) - return True - - - -class JobListPeerNotes(JobBase): - """Lists all notes associated to a peer of the node - """ - - def __init__(self, fcpClient, identifier): - """ - @param identifier: identifier of the peer to list notes for (peer identity) - @ivar jobResult: on job completion will be a list containing all notes associated to the peer - - @note: notes are only available for darknet peers (opennet == false) - """ - - message = Message( - Message.ListPeerNotes, - NodeIdentifier=identifier - ) - JobBase.__init__(self, fcpClient, IdentifierPrefix.PeerNote + identifier, message) - - - def handleStart(self): - JobBase.handleStart(self) - self.jobClient.EventListPeerNotes() - - - def handleMessage(self,msg): - if msg.name == Message.EndListPeerNotes: - return self.handleEndListPeerNotes(msg) - elif msg.name == Message.PeerNote: - return self.handlePeerNote(msg) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - def handlePeerNote(self, msg): - note = msg.get('NoteText', '') - self.jobClient.EventListNextPeerNote(note) - if note: - note = base64.decodestring(note) - if self.jobResult is None: - self.jobResult = [note, ] - else: - self.jobResult.append(note) - return True - - - def handleEndListPeerNotes(self, msg): - self.jobClient.EventEndListPeerNotes() - self.jobTimeStop = time.time() - if self.jobResult is None: - self.jobResult = [] - self.jobClient.jobRemove(self.jobIdentifier) - return True - - - - -#TODO: identifier collisions are not yet handled -class JobGetFileInfo(JobBase): - """Tries to retieve information about a file. If everything goes well - - On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file. - Note, that both members may be '' (empty string) - """ - - - # idea is to provoke a GetFailed message and take mimetype and size from 'GetFailed' - def __init__(self, fcpClient, uri, **params): - """ - @param fcpClient: FcpClient() instance - @param uri: uri of the file to retrieve info for - @param params: additional parameters: - IgnoreDS='true' / 'false' - DSOnly='true' / 'false' - MaxRetries=-1 ...N - PriorityClass=Priority* - - @ivar jobResult: will be a tuple(bool error, data). If error is True, no information could be - retrieved and data will be a GetFailed message containing details. If error is False - data will be a tuple(str metadataContentType, str size). Note that both may be empty - string and size may not be accurate. - - """ - identifier = IdentifierPrefix.ClientGet + newIdentifier() - message = Message( - Message.ClientGet, - Identifier=identifier, - URI=uri, - # suggested by Mathew Toseland to use about 32k for mimeType requests - # basic sizes of keys are: 1k for SSks and 32k for CHKs - MaxSize='32000', - ReturnType='none', - Verbosity='1', - **params - ) - JobBase.__init__(self, fcpClient, identifier, message) - - - def getPrority(self): - return self.jobMessage.get('PriorityClass', Priorities.PriorityDefault) - - - def setPriority(self, priority): - if not priority in Priorities: - raise ValueError('Invalid priority: %r' % priority) - self.jobClient.sendMessage( - Message.ModifyPersistentRequest, - Identifier=self.jobIdentifier, - Global=fcpBool(False), - PriorityClass=priority, - ) - # not shure if the response arrives in any case, so set it here - self.jobMessage['PriorityClass'] = priority - - - def stopRequest(self): - self.jobClient.sendMessage( - Message.RemovePersistentRequest, - Global=fcpBool(False), - Identifier=self.jobIdentifier, - ) - - - def handleMessage(self, msg): - if msg.name == Message.DataFound: - return self.handleDataFound(msg) - elif msg.name == Message.GetFailed: - return self.handleGetFailed(msg) - elif msg.name == Message.IdentifierCollision: - return self.handleIdentifierCollision(msg) - elif msg.name == Message.PersistentRequestModified: - return self.handlePersistentRequestModified(msg) - elif msg.name == Message.PersistentRequestRemoved: - return self.handlePersistentRequestRemoved(msg) - elif msg.name == Message.SimpleProgress: - return self.handleSimpleProgress(msg) - - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handleDataFound(self, msg): - self.jobTimeStop = time.time() - self.jobResult = ( - False, - ( - msg.get('Metadata.ContentType', ''), - msg.get('DataLength', '') - ) - ) - self.jobClient.jobRemove(self.jobIdentifier) - return True - - def handleGetFailed(self, msg): - self.jobTimeStop = time.time() - if msg['Code'] == FetchError.TooBig: - self.jobResult = (False, msg) - self.jobResult = ( - False, - ( - msg.get('ExpectedMetadata.ContentType', ''), - msg.get('ExpectedDataLength', '') - ) - ) - else: - self.jobResult = (True, msg) - self.jobClient.jobRemove(self.jobIdentifier) - return True - - - def handleIdentifierCollision(self, msg): - raise - - - def handleSimpleProgress(self, msg): - return True - - - def handlePersistentRequestModified(self, msg): - priorityClass = msg.get('PriorityClass', None) - if priorityClass is not None: - self.jobMessage['PriorityClass'] = priorityClass - return True - - def handlePersistentRequestRemoved(self, msg): - if self.jobClient.jobIsRunning(self.jobIdentifier): - self.jobClient.jobStop(self.jobIdentifier) - return True - - -#TODO: handle case where directories are registered multiple times -class JobTestDDA(JobBase): - """Tests a directory for read / write accesss - """ - - - def __init__(self, fcpClient, directory, read=False, write=False): - """ - - @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed) - """ - if not os.path.isdir(directory): - raise ValueError('No such directory: %r' % directory) - - message = Message( - Message.TestDDARequest, - Directory=directory, - WantReadDirectory=fcpBool(read), - WantWriteDirectory=fcpBool(write), - ) - - JobBase.__init__(self, fcpClient, directory, message) - self.jobTmpFile = None - - - def handleMessage(self, msg): - if msg.name == Message.TestDDAReply: - return self.handleTestDDAReply(msg) - elif msg.name == Message.TestDDAComplete: - return self.handleTestDDAComplete(msg) - elif msg.name == Message.ProtocolError: - return self.handleProtocolError(msg) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handleProtocolError(self, msg): - # most likely code 7 here... - # "Both WantReadDirectory and WantWriteDirectory are set to false: what's the point of sending a message?" - # ..a stupid response that is ;-) - self.jobTimeStop = time.time() - self.jobClient.jobRemove(self.jobIdentifier) - if msg['Code'] == ProtocolError.InvalidMessage: - self.jobResult = (False, False) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handleTestDDAReply(self, msg): - fpathWrite = msg.params.get('WriteFilename', None) - fpathRead = msg.params.get('ReadFilename', None) - readContent = '' - if fpathWrite is not None: - written = saveWriteFile(fpathWrite, msg['ContentToWrite']) - if not written: - if os.path.isfile(fpathWrite): - os.remove(fpathWrite) - else: - self.jobTmpFile = fpathWrite - - if fpathRead is not None: - readContent = saveReadFile(fpathRead) - if readContent is None: - readContent = '' - - self.jobClient.sendMessage( - Message.TestDDAResponse, - Directory=msg['Directory'], - ReadContent=readContent, - ) - return True - - - def handleTestDDAComplete(self, msg): - self.jobTimeStop = time.time() - self.jobResult = ( - pythonBool(msg.get('ReadDirectoryAllowed', 'false')), - pythonBool(msg.get('WriteDirectoryAllowed', 'false')), - ) - saveRemoveFile(self.jobTmpFile) - self.jobTmpFile = None - self.jobClient.jobRemove(self.jobIdentifier) - return True - - - -class JobGenerateSSK(JobBase): - """Job to generate a SSK key pair - """ - - - def __init__(self, fcpClient): - """ - @ivar jobResult: on job completion, a tuple(insertURI, requestURI) of the generated - SSK key - """ - - identifier = newIdentifier() - message = Message( - Message.GenerateSSK, - Identifier=identifier, - ) - JobBase.__init__(self, fcpClient, identifier, message) - - - def handleMessage(self, msg): - if msg.name == Message.SSKKeypair: - return self.handleSSKKeypair(msg) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handleSSKKeypair(self, msg): - self.jobTimeStop = time.time() - self.jobResult = (msg['InsertURI'], msg['RequestURI']) - self.jobClient.jobRemove(self.jobIdentifier) - return True - - -#************************************************************************** # fcp client #************************************************************************** class LogMessages: @@ -1169,7 +679,7 @@ KeyboardInterrupt = 'Keyboard interrupt' - SocketDead = 'Socket is dead' + SocketDied = 'Socket died' #TODO: no idea what happens on reconnect if socket died. What about running jobs? @@ -1181,42 +691,62 @@ """ _events_ = ( + + #Peer related events 'EventListPeers', - 'EventListNextPeer', 'EventEndListPeers', + 'EventPeer', + 'EventPeerRemoved', + 'EventUnknownIdentifier', 'EventListPeerNotes', - 'EventListNextPeerNote', 'EventEndListPeerNotes', + 'EventPeerNote', + 'EventShutdown', + 'EventSocketDied', + + # get / put related events + 'EventIdentifierCollision', + + 'EventFileInfo', + 'EventFileInfoProgress', + + 'EventDataFound', + 'EventGetFailed', + 'EventSimpleProgress', + 'EventPersistentRequestModified', + 'EventPersistentRequestRemoved', + + + # others + 'EventSSKKeypair', + ) + + Version = '2.0' + FcpTrue = 'true' + FcpFalse = 'false' def __init__(self, name='', - errorHandler=None, + connectionName=None, verbosity=Verbosity.Warning, logMessages=LogMessages ): """ @param name: name of the client instance or '' (for debugging) - @param errorHandler: will be called if the socket conncetion to the node is dead - with two params: SocketError + details. When the handler is called the client - is already closed. + @param conectionName: name of the connection @param verbosity: verbosity level for debugging @param logMessages: LogMessages class containing message strings """ - self._isConnected = False - self._jobs = { - 'Jobs': {}, - 'PendingJobs': [], - 'RegisteredDirectories': [], - } - self._errorHandler = errorHandler #TODO: check if necessary! + self._connectionName = connectionName + self._ddaTmpFiles = [] self._log = logging.getLogger(name) self._logMessages = logMessages - self._lock = thread.allocate_lock() # lock when resources are accessed + self._lock = thread.allocate_lock() self._socket = None self.setVerbosity(verbosity) @@ -1230,6 +760,10 @@ if self._socket is not None: self._socket.close() self._socket = None + + # clean left over tmp files + for fpath in self._ddaTmpFiles: + saveRemoveFile(fpath) #TODO: an iterator would be nice to enshure Guis stay responsitive in the call @@ -1245,8 +779,8 @@ self._log.info(self._logMessages.Connecting) # poll untill freenet responds - time_elapsed = 0 - while time_elapsed <= repeat: + timeElapsed = 0 + while timeElapsed <= repeat: # try to Connect socket if self._socket is not None: @@ -1265,12 +799,15 @@ # but instad of responding with ClientHelloMustBeFirst # as expected when not doing so, the node disconnects. # So take it over here. - job = JobClientHello(self) - self.jobAdd(job, synchron=False) - while time_elapsed <= repeat: + self.sendMessage( + Message.ClientHello, + Name=self._connectionName if self._connectionName is not None else newIdentifier(), + ExpectedVersion=self.Version, + ) + while timeElapsed <= repeat: msg = self.next() if msg.name == Message.ClientSocketTimeout: - time_elapsed += SocketTimeout + timeElapsed += SocketTimeout elif msg.name == Message.NodeHello: return msg.params else: @@ -1279,7 +816,7 @@ # continue polling self._log.info(self._logMessages.ConnectionRetry) - time_elapsed += timeout + timeElapsed += timeout time.sleep(timeout) self._log.info(self._logMessages.ConnectingFailed) @@ -1294,190 +831,172 @@ if msg.name == Message.ClientSocketTimeout: return True - self._log.debug(self._logMessages.MessageReceived + msg.pprint()) - - + if msg.name == Message.ProtocolError: code = msg['Code'] - if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst: - return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - - elif code == ProtocolError.ShuttingDown: - - #TODO: ??? why dispatch to ClientHello.. can't remember - if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg): - - # ######################################## - #TODO: ??? - - return True - - else: - identifier = msg.get('Identifier', None) - if identifier is None: - #TODO: inform caller - raise ProtocolError(msg) - else: - return self.jobDispatchMessage(identifier, msg) - - else: + if code == ProtocolError.ShuttingDown: + self.close() + self.EventShutdown(msg.params) + return True - # check if the is something like an identifier in the message - if msg.name == Message.TestDDAReply: - identifier = msg['Directory'] - elif msg.name == Message.TestDDAComplete: - identifier = msg['Directory'] - elif msg.name == Message.PeerNote: - identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier'] - elif msg.name == Message.EndListPeerNotes: - identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier'] - else: - identifier = msg.get('Identifier', None) - - # dispatch to jobs with fixed identifiers - if identifier is None: - if msg.name == Message.NodeHello: - return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - elif msg.name == Message.EndListPeers: - return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Message.Peer: - return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - - # more here..... - + raise ProtocolError(msg) + + #################################################### + ## + ## TestDDA + ## + ## Note: if both, ReadDirectoryAllowed and WriteDirectoryAllowed are + ## set to false, the node sends a ProtocolError (7, 'Invalid message'). + ## Have to handle this! + ## + #################################################### + elif msg.name == Message.TestDDAReply: + fpathWrite = msg.params.get('WriteFilename', None) + fpathRead = msg.params.get('ReadFilename', None) + readContent = '' + if fpathWrite is not None: + written = saveWriteFile(fpathWrite, msg['ContentToWrite']) + if not written: + saveRemoveFile(fpathWrite) else: - raise ValueError('Unhandled message: ' + msg.name) + self._ddaTmpFiles.append(fpathWrite) - else: - return self.jobDispatchMessage(identifier, msg) + if fpathRead is not None: + readContent = saveReadFile(fpathRead) + if readContent is None: + readContent = '' - raise RuntimeError('We should not have gotten here: %s' % msg.name) - + self.sendMessage( + Message.TestDDAResponse, + Directory=msg['Directory'], + ReadContent=readContent, + ) + return True + - - ######################################################### - ## jobs - ######################################################### - def hasJobsRunning(self): - """Checks if the client has running jobs - @return: (bool) True if so, False otherwise - """ - self._lock.acquire(True) - try: - result = self._jobs['Jobs'] or self._jobs['PendingJobs'] - finally: - self._lock.release() + elif msg.name == Message.TestDDAComplete: + # clean tmp files + for fpath in self._ddaTmpFiles: + saveRemoveFile(fpath) + self._ddaTmpFiles = [] + return True + + #################################################### + ## + ## Peer related messages + ## + #################################################### + elif msg.name == Message.EndListPeers: + self.EventEndListPeers(msg.params) + return True + + elif msg.name == Message.EndListPeerNotes: + self.EventEndListPeerNotes(msg.params) + return True + elif msg.name == Message.Peer: + self.EventPeer(msg.params) + return True + elif msg.name == Message.PeerNote: + note = msg.get('NoteText', '') + if note: + note = base64.decodestring(note) + msg['NoteText'] = note + self.EventPeerNote(msg.params, note) + return True + + elif msg.name == Message.PeerRemoved: + self.EventPeerRemoved(msg.params) + return True + + elif msg.name == Message.UnknownNodeIdentifier: + self.EventUnknownIdentifier(msg.params) + return True + + #################################################### + ## + ## Get related messages + ## + #################################################### + elif msg.name == Message.DataFound: + if msg['Identifier'].startswith(IdentifierPrefix.FileInfo): + self.EventFileInfo(msg.params) + return True + + #TODO: + self.EventDataFound(msg.params) + return True + - return result - - - #TODO: not quite clear about the consequences of a synchron job. Have to think this over - def jobAdd(self, job, synchron=False): - """Adds a job to the client - @param job: (Job*) job to add - @param synchron: if True, wait untill the job is completed, if False return emidiately - """ - self._lock.acquire(True) - try: - if job.jobIdentifier in self._jobs['Jobs']: - raise ValueError('Duplicate job: %r' % job.jobIdentifier) - self._jobs['Jobs'][job.jobIdentifier] = job - finally: - self._lock.release() - - self._log.info(self._logMessages.JobStart + job.jobMessage.name) - job.handleStart() - if synchron: - while self.jobGet(job.jobIdentifier): - self.next() + elif msg.name == Message.GetFailed: + code = msg['Code'] + if code == FetchError.TooBig: + if msg['Identifier'].startswith(IdentifierPrefix.FileInfo): + params = { + 'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''), + 'DataLength': msg.get('ExpectedDataLength', '') + } + self.EventFileInfo(params) + return True + + self.EventGetFailed(msg.params) + return True + + + elif msg.name == Message.SimpleProgress: + if msg['Identifier'].startswith(IdentifierPrefix.FileInfo): + self.EventFileInfoProgress(msg.params) + else: + self.EventSimpleProgress(msg.params) + return True + + + elif msg.name == Message.IdentifierCollision: + self.EventIdentifierCollision(msg.params) + return True + + elif msg.name == Message.PersistentRequestModified: + self.EventPersistentRequestModified(msg.params) + return True - def jobDispatchMessage(self, identifier, msg): - """Dispatches a message to a job - @param identifier: identifier of the job - @param msg: (Message) message to dispatch - @return: True if the message was handled, False otherwise - """ - job = self.jobGet(identifier) - if job is not None: - return job.handleMessage(msg) - return False + elif msg.name == Message.PersistentRequestRemoved: + self.EventPersistentRequestRemoved(msg.params) + return True - - def jobGet(self, identifier): - """Returns a job given its identifier - @param identifier: identifier of the job - @return: (Job*) instance or None, if no corrosponding job was found - """ - self._lock.acquire(True) - try: - result = self._jobs['Jobs'].get(identifier, None) - finally: - self._lock.release() - return result - - - def jobIsRunning(self, identifier): - """Checks if a job is running - @param identifier: identifier of the job - @return: True if so, False otherwise - """ - self._lock.acquire(True) - try: - result = identifier in self._jobs['Jobs'] - finally: - self._lock.release() - return result + #################################################### + ## + ## Others + ## + #################################################### + elif msg.name == Message.SSKKeypair: + self.EventSSKKeypair(msg.params) + return True - def jobRemove(self, identifier): - """Removes a job unconditionally - @param identifier: identifier of the job to remove - @return: True if the job was found, False otherwise - """ - self._lock.acquire(True) - try: - job = self._jobs['Jobs'].get(identifier, None) - if job is not None: - del self._jobs['Jobs'][identifier] - finally: - self._lock.release() - if job is None: - return False - self._log.info(self._logMessages.JobStop + job.jobMessage.name) - return True + ## default + return False - #TODO: some info when all jobs are completed? + ######################################################### + ## + ## + ## + ######################################################### def next(self): """Pumps the next message waiting @note: use this method instead of run() to run the client step by step """ msg = Message.fromSocket(self._socket) if msg.name == Message.ClientSocketDied: + self.EventSocketDied(msg['Exception'], msg['Details']) raise SocketError(msg['Details']) self.handleMessage(msg) return msg - - def run(self): - """Runs the client untill all jobs passed to it are completed - @note: use KeyboardInterrupt to stop prematurely - """ - try: - #n = 0 - while self.hasJobsRunning(): - #n += 1 - #if n > 40: break - self.next() - except KeyboardInterrupt: - self._log(self._logMessages.KeyboardInterrupt) - self.close() - def sendMessage(self, name, data=None, **params): """Sends a message to freenet @param name: name of the message to send @@ -1503,10 +1022,9 @@ try: msg.send(self._socket) except socket.error, d: - self._log.info(self._logMessages.SocketDead) + self._log.info(self._logMessages.SocketDied) self.close() - if self._errorHandler is not None: - self._errorHandler(SocketError, d) + self.EventSocketDied(socket.error, d) raise SocketError(d) return msg @@ -1520,16 +1038,120 @@ """""" self._log.setLevel(verbosity) + ######################################################### + ## + ## + ## + ######################################################### + def fcpBool(self, pythonBool): + """Converts a python bool to a fcp bool + @param pythonBool: (bool) + @return: (str) 'true' or 'false' + """ + return self.FcpTrue if pythonBool else self.FcpFalse + def pythonBool(self, fcpBool): + """Converts a fcp bool to a python bool + @param pythonBool: 'true' or 'false' + @return: (bool) True or False + """ + return fcpBool == self.FcpTrue + ######################################################## ## + ## Peer related methods + ## ######################################################## - def getFileInfo(self, job): - pass + def listPeer(self, identifier): + self.jobClient.sendMessage( + Message.ListPeer, + NodeIdentifier=identifier, + ) + + + def listPeerNotes(self, identifier): + """ + @param identifier: identifier of the peer to list notes for + """ + self.sendMessage( + Message.ListPeerNotes, + NodeIdentifier=identifier + ) + + + def listPeers(self, withMetaData=True, withVolantile=True): + """ + @param withMetaData: include meta data for each peer? + @param withVolantile: include volantile data for each peer? + """ + self.sendMessage( + Message.ListPeers, + WithMetadata=self.fcpBool(withMetaData), + WithVolatile=self.fcpBool(withVolantile), + ) + def modifyPeer(self, identifier, allowLocalAddresses=None, isDisabled=None, isListenOnly=None): + msg = Message( + Message.ModifyPeer, + NodeIdentifier=identifier, + ) + if allowLocalAddresses is not None: + msg['AllowLocalAddresses'] = self.fcpBool(allowLocalAddresses) + if isDisabled is not None: + msg['isDisabled'] = self.fcpBool(isDisabled) + if isListenOnly is not None: + msg['isListenOnly'] = self.fcpBool(isListenOnly) + self.jobClient.sendMessageEx(msg) + self.sendMessageEx(msg) + + def modifyPeerNote(self, identifier, note): + self.sendMessage( + Message.ModifyPeerNote, + NodeIdentifier=identifier, + #NOTE: currently fcp supports only this one type + PeerNoteType=PeerNoteType.Private, + NoteText=note + ) + + + def removePeer(self, identifier): + self.sendMessage( + Message.RemovePeer, + NodeIdentifier=identifier, + ) + + ########################################################## + ## + ## get / put related methods + ## + ########################################################## + def fileInfo(self, uri, **params): + """Requests info about a file + @param uri: uri of the file to request info about + @event: FileInfo(event, params). If success, params will contain a key 'Metadata.ContentType' + and 'DataLength'. Both may be '' (empty string) + @event: FileInfoProgress(event, params). Triggered instead of EventSimpleProgress + @note: for other events see: L{clientGet} + @return: (str) identifier of the request + """ + identifier = IdentifierPrefix.FileInfo + newIdentifier() + self.sendMessage( + Message.ClientGet, + Identifier=identifier, + URI=uri, + # suggested by Mathew Toseland to use about 32k for mimeType requests + # basic sizes of keys are: 1k for SSks and 32k for CHKs + MaxSize='32000', + ReturnType='none', + Verbosity='1', + **params + ) + return identifier + + ######################################################### ## how to tackle TestDDA? ## @@ -1638,7 +1260,7 @@ @event: EventListNextPeerNote(event, note). @event: EventEndListPeerNotes(event). """ - if pythonBool(peer['opennet']): # opennet peers do not have any notes associated + if self.pythonBool(peer['opennet']): # opennet peers do not have any notes associated return [] job = JobListPeerNotes(self, peer['identity']) self.jobAdd(job, synchron=synchron) @@ -1649,9 +1271,9 @@ # #***************************************************************************** if __name__ == '__main__': - c = FcpClient(name='test', verbosity=logging.DEBUG) + c = FcpClient(name='test', verbosity=Verbosity.Warning) nodeHello = c.connect() - if nodeHello is not None or 1: + if nodeHello is not None: @@ -1666,7 +1288,24 @@ # should raise #foo() + + #ModifyPeer not ok + + #RemovePeer not ok + + #ModifyPeerNote ok + + #ListPeer not ok + + def foo(): + job = JobListPeer(c, '123456') + c.jobAdd(job, synchron=True) + print job.jobResult + #foo() + + + def foo(): job = JobGenerateSSK(c) c.jobAdd(job, synchron=True) print job.jobResult @@ -1686,30 +1325,26 @@ #foo() def foo(): - peers = c.peerList(synchron=True) - for peer in peers: - print c.peerNotes(peer, synchron=True) + def cb(event, params): + #print params.get('opennet', 'true'), c.pythonBool(params.get('Opennet', 'true')), params['identity'] + if params['opennet'] == c.FcpFalse: + c.listPeerNotes(params['identity']) + + c.EventPeer += cb + + + c.listPeers() + for i in xrange(80): + c.next() #foo() def foo(): #job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip') + identifier = c.fileInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + for i in xrange(20): + c.next() - job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') - #job.jobIdentifier = job.jobMessage['Identifier'] = 1 - #job.jobMessage['Identifier'] = 1 - #job.jobIdentifier = 1 - c.jobAdd(job) - - #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%201%281%29.jpg') - #job.jobMessage['Identifier'] = 1 - #job.jobIdentifier = 1 - #c.jobAdd(job) - - c.run() - print '---------------------------' - print job.jobResult - print '---------------------------' #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |