SF.net SVN: fclient: [142] trunk/sandbox/fcp/fcp2_0_client.py
Status: Pre-Alpha
Brought to you by:
jurner
From: <ju...@us...> - 2008-02-05 12:09:29
|
Revision: 142 http://fclient.svn.sourceforge.net/fclient/?rev=142&view=rev Author: jurner Date: 2008-02-05 04:09:32 -0800 (Tue, 05 Feb 2008) Log Message: ----------- added iterConnect and run methods ++ some fixes Modified Paths: -------------- trunk/sandbox/fcp/fcp2_0_client.py Modified: trunk/sandbox/fcp/fcp2_0_client.py =================================================================== --- trunk/sandbox/fcp/fcp2_0_client.py 2008-02-05 12:08:45 UTC (rev 141) +++ trunk/sandbox/fcp/fcp2_0_client.py 2008-02-05 12:09:32 UTC (rev 142) @@ -8,7 +8,58 @@ @note: The client implementation never uses or watches the global queue. No implementation should ever do so. Global is evil. + + +Sample code, request data of a freenet key:: + + client = FcpClient() + nodeHello = client.connect() + if nodeHello is None: + pass + # something went wrong ..could not connect to the freenet node + else: + # request data associated to a freenet key + myRequestIdentifier = client.getData('CHK@ABCDE.......') + myRequest = c.getRequest(myIdentifier) + c.run() + print myRequest.data + +Usually you would connect handlers to client events to do processing or handle errors:: + + def handleSuccess(event, request): + print 'Here is the data:', request.data + + def handleFailure(event, request): + print 'Too bad, something went wrong' + + client.events.RequestCompleted += handleSuccess + client.events.RequestFailed += handleFailure + + client.getData('CHK@ABCDE.......') + c.run() + + +Instead of calling run() you may run the client step by step:: + + client.getData('CHK@ABCDE.......') + for i in xrange(50): + client.next() + +You may disconnect event handlers aswell:: + + client.events.RequestCompleted -= handleSuccess + client.events.RequestFailed -= handleFailure + + +Multiple event handlers may be connected / disconnected at once:: + + client.events += ( + (client.events.RequestCompleted, handleSuccess), + (client.events.RequestFailed, handleFailure) + ) + + """ @@ -115,7 +166,6 @@ import fcp2_0_params as FcParams from fcp2_0_requests import Upload from fcp2_0_uri import Uri - logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) @@ -199,6 +249,11 @@ ExpectedFcpVersion = 2.0 ExpectedNodeBuild = 1107 + IdentifierPrefixRequest = 'request:' + IdentifierPrefixGenerateSSK = 'ssk:' + IdentifierPrefixGenerateUSK = 'usk:' + IdentifierPrefixPluginMessage = 'plugin:' + consts = consts Config = Config Message = Message @@ -219,7 +274,6 @@ """ self._connectionName = self.setConnectionName(connectionName) self._ddaTests = [] # currently running DDA tests (request0, ... requestN) - self._sskRequests = [] # currently pending ssk requests (identifier1... identiferN) self._requests = {} # currently running requests (requestIdentifier --> request) self._log = logging.getLogger(self.__class__.__name__) @@ -257,7 +311,7 @@ 'FcPersistentUserData': persistentUserData, # any user defined persistent data # non persistent params - 'FcStatus': consts.MessageStatus.Pending, + 'FcStatus': consts.RequestStatus.Pending, 'FcErrorMessage': None, # did an error occur? 'FcUserData': userData, # any user defined runtime data here @@ -305,7 +359,7 @@ @return: (str) uuid @note: the identifier returned is unique to the client but may not be unique to the node """ - identifier = self.FcParams.newUuid(uuids=self._requests) + identifier = self.FcParams.newUuid(prefix=self.IdentifierPrefixRequest, uuids=self._requests) # add additional params to msg msg = self._addFcParamsToRequest( @@ -372,14 +426,29 @@ saveRemoveFile(initialRequest['FcTestDDA']['TmpFile']) self._ddaTests = [] - self._sskRequests = [] self._requests = {} def closeFreenet(self): """Shuts down the freenet node""" self.sendMessage(consts.Message.Shutdown) + def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, duration=20, timeout=0.5): + """Connects to the freenet node + @param host: (str) host of th node + @param port: (int) port of the node + @param duration: (int) how many seconds try to connect before giving up + @param timeout: (int) how much time to wait before another attempt to connect + + @return: (L{fcp2_0_message.Message}) NodeHello or None if no connection could be established + """ + nodeHello = None + for nodeHello in self.iterConnect(host=host, port=port, duration=duration, timeout=timeout): + pass + return nodeHello + + + def iterConnect(self, host=DefaultFcpHost, port=DefaultFcpPort, duration=20, timeout=0.5): """Iterator to stablish a connection to a freenet node @param host: (str) host of th node @param port: (int) port of the node @@ -527,7 +596,7 @@ @return: True if the message was handled, False otherwise """ - CancelPersistentRequests = 0 # for testing... if True, cancels all PersistentRequests + CancelPersistentRequests = 1 # for testing... if True, cancels all PersistentRequests if msg.name == consts.Message.ClientSocketTimeout: return True @@ -650,7 +719,7 @@ # NOTE: Fcp already removed the request del self._requests[requestIdentifier] initialRequest['FcErrorMessage'] = msg - initialRequest['FcStatus'] = consts.MessageStatus.Error | consts.MessageStatus.Removed + initialRequest['FcStatus'] = consts.RequestStatus.Complete | consts.RequestStatus.Error | consts.RequestStatus.Removed self.events.RequestFailed(initialRequest) return True @@ -733,7 +802,7 @@ #TODO: check if errorMsg gives reasonable feedback del self._requests[initialRequest['Identifier']] - initialRequest['FcStatus'] = consts.MessageStatus.Error | consts.MessageStatus.Removed + initialRequest['FcStatus'] = consts.RequestStatus.Error | consts.RequestStatus.Removed initialRequest['FcErrorMessage'] = initialRequest['FcTestDDA']['ErrorMsg'] self.events.ProtocolError(initialRequest) return True @@ -768,10 +837,10 @@ if initialRequest is None: return False - initialRequest['FcStatus'] = consts.MessageStatus.Complete + initialRequest['FcStatus'] = consts.RequestStatus.Complete # Fcp removes requests from queue with Persistence.Connection.. so do we if initialRequest.params.get('Persistence', consts.Persistence.Connection) == consts.Persistence.Connection: - initialRequest['FcStatus'] |= consts.MessageStatus.Removed + initialRequest['FcStatus'] |= consts.RequestStatus.Removed del self._requests[requestIdentifier] initialRequest.data = msg.data @@ -782,11 +851,11 @@ if initialRequest is None: return False - initialRequest['FcStatus'] = consts.MessageStatus.Complete + initialRequest['FcStatus'] = consts.RequestStatus.Complete initialRequest['FcMetadataContentType'] = msg.get('Metadata.ContentType', '') initialRequest['FcDataLength'] = msg.get('DataLength', '') - if initialRequest['FcSubType'] != consts.MessageSubType.GetData: - initialRequest['FcStatus'] |= consts.MessageStatus.Removed + if initialRequest['FcSubType'] != consts.RequestSubType.GetData: + initialRequest['FcStatus'] |= consts.RequestStatus.Removed # Fcp removes requests from queue with Persistence.Connection.. so do we if initialRequest.params.get('Persistence', consts.Persistence.Connection) == consts.Persistence.Connection: del self._requests[requestIdentifier] @@ -798,23 +867,23 @@ if initialRequest is None: return False - initialRequest['FcStatus'] = consts.MessageStatus.Null + initialRequest['FcStatus'] = consts.RequestStatus.Null # Fcp removes requests from queue with Persistence.Connection.. so do we if initialRequest.params.get('Persistence', consts.Persistence.Connection) == consts.Persistence.Connection: - initialRequest['FcStatus'] = consts.MessageStatus.Removed + initialRequest['FcStatus'] = consts.RequestStatus.Removed del self._requests[requestIdentifier] # check if it is one of our requests for key information code = msg['Code'] - if code == consts.FetchError.TooBig and initialRequest['FcSubType'] == consts.MessageSubType.GetKeyInfo: - initialRequest['FcStatus'] |= consts.MessageStatus.Complete + if code == consts.FetchError.TooBig and initialRequest['FcSubType'] == consts.RequestSubType.GetKeyInfo: + initialRequest['FcStatus'] |= consts.RequestStatus.Complete initialRequest['FcMetadataContentType'] = msg.get('ExpectedMetadata.ContentType', '') initialRequest['FcDataLength'] = msg.get('ExpectedDataLength', -1) self.events.RequestCompleted(initialRequest) else: initialRequest['FcErrorMessage'] = msg - initialRequest['FcStatus'] |= consts.MessageStatus.Error + initialRequest['FcStatus'] |= consts.RequestStatus.Complete | consts.RequestStatus.Error self.events.RequestFailed(initialRequest) return True @@ -840,13 +909,13 @@ # restore request self._requests[requestIdentifier] = restoredRequest - restoredRequest['FcStatus'] = consts.MessageStatus.Started + restoredRequest['FcStatus'] = consts.RequestStatus.Started self.events.RequestRestored(restoredRequest) return True # known request... filter out multiple PersistentGets - if initialRequest['FcStatus'] == consts.MessageStatus.Pending: - initialRequest['FcStatus'] = consts.MessageStatus.Started + if initialRequest['FcStatus'] == consts.RequestStatus.Pending: + initialRequest['FcStatus'] = consts.RequestStatus.Started #TODO: update initialRequest with params from PersistentGet? @@ -871,22 +940,22 @@ return True # determine initial message name - if restoredRequest['FcSubType'] == consts.MessageSubType.Put: + if restoredRequest['FcSubType'] == consts.RequestSubType.Put: restoredRequest.name = consts.Message.ClientPut - elif restoredRequest['FcSubType'] == consts.MessageSubType.PutDiskDir: + elif restoredRequest['FcSubType'] == consts.RequestSubType.PutDiskDir: restoredRequest.name = consts.Message.ClientPutDiskDir - elif restoredRequest['FcSubType'] == consts.MessageSubType.PutComplexDir: + elif restoredRequest['FcSubType'] == consts.RequestSubType.PutComplexDir: restoredRequest.name = consts.Message.ClientPutComplexDir # restore request self._requests[requestIdentifier] = restoredRequest - restoredRequest['FcStatus'] = consts.MessageStatus.Started + restoredRequest['FcStatus'] = consts.RequestStatus.Started self.events.RequestRestored(restoredRequest) return True # known request... filter out multiple PersistentGets - if initialRequest['FcStatus'] == consts.MessageStatus.Pending: - initialRequest['FcStatus'] = consts.MessageStatus.Started + if initialRequest['FcStatus'] == consts.RequestStatus.Pending: + initialRequest['FcStatus'] = consts.RequestStatus.Started #TODO: update initialRequest with params from PersistentPut? #TODO: update initialRequest with params from PersistentPut? @@ -961,11 +1030,11 @@ if initialRequest is None: return False - initialRequest['FcStatus'] = consts.MessageStatus.Error + initialRequest['FcStatus'] = consts.RequestStatus.Complete | consts.RequestStatus.Error # Fcp removes requests from queue with Persistence.Connection.. so do we if initialRequest.params.get('Persistence', consts.Persistence.Connection) == consts.Persistence.Connection: - initialRequest['FcStatus'] |= consts.MessageStatus.Removed + initialRequest['FcStatus'] |= consts.RequestStatus.Removed del self._requests[requestIdentifier] initialRequest['FcErrorMessage'] = msg @@ -989,10 +1058,10 @@ # TODO: StartupTime and CompletionTime are passed, but # as long as no corrosponding params are passed in DataFound # we ignore them - initialRequest['FcStatus'] = consts.MessageStatus.Complete + initialRequest['FcStatus'] = consts.RequestStatus.Complete if initialRequest.params.get('Persistence', consts.Persistence.Connection) == consts.Persistence.Connection: - initialRequest['FcStatus'] |= consts.MessageStatus.Removed + initialRequest['FcStatus'] |= consts.RequestStatus.Removed del self._requests[requestIdentifier] initialRequest['URI'] = msg['URI'] @@ -1038,7 +1107,7 @@ if initialRequest is None: return False - self._pluginRequests[requestIdentifier] + del self._pluginRequests[requestIdentifier] self.events.PluginInfo(msg) return True @@ -1066,17 +1135,22 @@ elif msg.name == consts.Message.SSKKeypair: - if requestIdentifier not in self._sskRequests: + if requestIdentifier not in self._requests: return False - self._sskRequests.remove(requestIdentifier) + del self._requests[requestIdentifier] #TODO:no idea if the node may pass uris with prefixes like 'freenet:'... strip it anyways insertURI = self.Uri(msg['InsertURI']).uri requestURI = self.Uri(msg['RequestURI']).uri - keyType = consts.KeyType.USK if requestIdentifier.startswith(consts.KeyType.USK) else consts.KeyType.SSK - if keyType == consts.KeyType.USK: + + + createUSK = True if requestIdentifier.startswith(self.IdentifierPrefixGenerateUSK) else False + if createUSK: insertURI = requestURI.replace(consts.KeyType.SSK, consts.KeyType.USK, 1) requestURI = requestURI.replace(consts.KeyType.SSK, consts.KeyType.USK, 1) + keyType = consts.KeyType.USK + else: + keyType = consts.KeyType.SSK msg['InsertURI'] = insertURI msg['RequestURI'] = requestURI @@ -1099,9 +1173,12 @@ def next(self, dispatch=True): """Pumps the next message waiting @param dispatch: if True the message is dispatched to L{handleMessage} + @note: use this method to run the client step by step. If you want to run the + client unconditionally use L{run} """ msg = self.Message.fromSocket(self._socket) if msg.name == consts.Message.ClientSocketDied: + self._log.info(consts.LogMessages.SocketDied) if dispatch: msg = self.Message( consts.Message.ClientDisconnected, @@ -1121,6 +1198,40 @@ return msg + def run(self): + """Runs the client unconditionally untill all requests have completed + @note: a KeyboardInterrupt will stop the client + """ + + #n = 0 + while True: + #n += 1 + #if n > 50: break + + # check if we have running requests + + # assert all requests have completed + status = consts.RequestStatus.Complete + for request in self._requests.values(): + tmp_status = request.params.get('FcStatus', consts.RequestStatus.Null) + if not tmp_status & consts.RequestStatus.Complete: + status = consts.RequestStatus.Null + break + + if status == consts.RequestStatus.Complete: + self._log.info(consts.LogMessages.AllRequestsCompleted) + break + + try: + msg = self.next() + except KeyboardInterrupt: + self._log.info(consts.LogMessages.KeyboardInterrupt) + break + + if msg.name == consts.Message.ClientSocketDied: + break + + def sendMessage(self, name, data=None, **params): """Sends a message to freenet @param name: name of the message to send @@ -1217,7 +1328,7 @@ ): """Requests a key from the node @param uri: (str) uri of the file to request (may contain prefixes like 'freenet:' or 'http://') - @param messageSubType: (L{consts.MessageSubType}) sub type of the message + @param messageSubType: (L{consts.RequestSubType}) sub type of the message @param userData: any non persistent data to associate to the request or None @param persistentUserData: any string to associate to the request as persistent data or None @param filenameCollision: what to do if the disk target alreaady exists. One of the FilenameCollision.* consts @@ -1290,7 +1401,7 @@ """ return self.clientGet( uri, - consts.MessageSubType.GetData, + consts.RequestSubType.GetData, userData, persistentUserData, consts.FilenameCollision.HandleNever, @@ -1359,7 +1470,7 @@ """ return self.clientGet( uri, - consts.MessageSubType.GetFile, + consts.RequestSubType.GetFile, userData, persistentUserData, filenameCollision, @@ -1420,7 +1531,7 @@ # how to retrieve meta info about a key? ...idea is to provoke a GetFailed (TooBig) return self.clientGet( uri, - consts.MessageSubType.GetKeyInfo, + consts.RequestSubType.GetKeyInfo, userData, persistentUserData, consts.FilenameCollision.HandleNever, @@ -1461,40 +1572,9 @@ ######################################################## ## - ## ClientPut related methods + ## CHK ClientPut related methods ## ######################################################## - def putUpload(self, upload, userData=None, persistentUserData=''): - - msg = upload.getMessage(self.Message) - if msg is None: - raise ValueError('Nothing to upload') - - # determine SubType - if msg.name == consts.Message.ClientPut: - messageSubType = consts.MessageSubType.Put - elif msg.name == consts.Message.ClientPutDiskDir: - messageSubType = consts.MessageSubType.PutDiskDir - else: - messageSubType = consts.MessageSubType.PutComplexDir - - self._registerRequest( - msg, - userData, - messageSubType, - time.time(), - persistentUserData, - #filenameCollision=filenameCollision, - ) - - if upload.keyType in (consts.KeyType.SSK, consts.KeyType.USK): - msg['FcInsertUri'] = upload.privateKey - #NOTE: the caller may use the 'FcInsertUri' member to store the private key - - self.sendMessageEx(msg) - return msg['Identifier'] - - def clientPut(self, uri, messageSubType, @@ -1506,7 +1586,7 @@ msg = self.Message(consts.Message.ClientPut, URI=uri) for paramName, value in messageParams.items(): if value is not None: - if param == 'ContentType': + if paramName == 'ContentType': param = 'Metadata.ContentType' msg[paramName] = value if data is not None: @@ -1529,7 +1609,7 @@ #CHK - def putData(self, + def chkPutData(self, data, contentType=None, @@ -1546,7 +1626,7 @@ return self.clientPut( consts.KeyType.CHK, - consts.MessageSubType.Put, + consts.RequestSubType.Put, userData, persistentUserData, data, @@ -1566,7 +1646,12 @@ Verbosity=consts.Verbosity.ReportProgress | consts.Verbosity.ReportCompression, ) - def putFile(self, + def chkPutDir(self): + + pass + + + def chkPutFile(self, filename, contentType=None, @@ -1581,8 +1666,8 @@ ): return self.clientPut( - concts.KeyType.CHK, - consts.MessageSubType.Put, + consts.KeyType.CHK, + consts.RequestSubType.Put, userData, persistentUserData, None, @@ -1602,9 +1687,61 @@ Verbosity=consts.Verbosity.ReportProgress | consts.Verbosity.ReportCompression, ) + + def chkPutMultiple(self, + items, + defaultName=None, + ): + + msg = self.Message( + consts.PutComplexDir, + URI=consts.KeyType.CHK + + ) + + for n, item in enumerate(items): + + for paramName, value in item.items(): + pass + + ######################################################## + ## + ## SSK ClientPut related methods + ## + ######################################################## + def sskPutData(self): + pass + + def sskPutDir(self): + pass + def sskPutFile(self): + pass + + + def sskPutMultiple(self): + pass + ######################################################## ## + ## USK ClientPut related methods + ## + ######################################################## + def uskPutData(self): + pass + + def uskPutDir(self): + pass + + def uskPutFile(self): + pass + + + def uskPutMultiple(self): + pass + + ######################################################## + ## ## request related methods ## ######################################################## @@ -1662,7 +1799,7 @@ """ initialRequest = self._requests[requestIdentifier] if initialRequest.name in consts.Message.ClientRequestMessages: - initialRequest['FcStatus'] = consts.MessageStatus.Removed + initialRequest['FcStatus'] = consts.RequestStatus.Removed self.sendMessage( consts.Message.RemovePersistentRequest, Global=False, @@ -1813,15 +1950,13 @@ ## plugins ## ########################################################## - #TODO: curently it is just a guess the a plugin may respond with a IdentifierCollision. To make - # shure we register plugin related stuff as request def getPluginInfo(self, pluginName, detailed=False): """Requests information about a plugin @param pluginName: (str) name of the plugin to request info for @param detailed: (bool) If True, detailed information is returned @return: (str) request identifier """ - identifier = self.FcParam.newUuid(uuids=self._requests) + identifier = self.FcParam.newUuid(prefix=self.IdentifierPrefixPluginMessage, uuids=self._requests) msg = self.Message( consts.Message.GetPluginInfo, Identifier=identifier, @@ -1840,7 +1975,7 @@ @param data: (str) data to pass along with the messaage or None @return: (str) request identifier """ - identifier = self.FcParam.newUuid(uuids=self._requests) + identifier = self.FcParam.newUuid(prefix=self.IdentifierPrefixPluginMessage, uuids=self._requests) msg = self.Message( consts.Message.GetPluginInfo, Identifier=identifier, @@ -1855,9 +1990,6 @@ self.sendMessageEx(msg) return identifier - - - ########################################################## ## ## others @@ -1873,12 +2005,14 @@ if keypairType not in (consts.KeyType.SSK, consts.KeyType.USK): raise ValueError('keypairType must be %s or %s' % (consts.KeyType.SSK, consts.KeyType.USK)) - identifier = keypairType + self.FcParams.newUuid(self._sskRequests) - self._sskRequests.append(identifier) - self.sendMessage( - consts.Message.GenerateSSK, - Identifier=identifier, - ) + prefix = self.IdentifierPrefixGenerateSSK if keypairType == consts.KeyType.SSK else self.IdentifierPrefixGenerateUSK + identifier = self.FcParams.newUuid(prefix=prefix, uuids=self._requests) + msg = self.Message( + consts.Message.GenerateSSK, + Identifier=identifier, + ) + self._requests[identifier] = msg + self.sendMessageEx(msg) return identifier @@ -1891,7 +2025,7 @@ debugVerbosity=consts.DebugVerbosity.Debug ) - for nodeHello in c.connect(): pass + nodeHello = c.connect() if nodeHello is not None: @@ -1901,16 +2035,22 @@ def testGetData(): def cb(event, request): - print request['FcData'] - c.events.RequestCompleted += cb + pass + #print request.data + #c.modifyRequest(request['Identifier'], persistentUserData='foo') + + c.events.RequestCompleted += cb + c.events.RequestFailed += cb identifier = c.getData( 'CHK@q4~2soHTd9SOINIoXmg~dn7LNUAOYzN1tHNHT3j4c9E,gcVRtoglEhgqN-DJolXPqJ4yX1f~1gBGh89HNWlFMWQ,AAIC--8/snow_002%20%2810%29.jpg', + #persistence=consts.Persistence.Forever, #binaryBlob=True, ) - for i in xrange(50): - c.next() + c.run() + #for i in xrange(50): + # c.next() #c.removeRequest(identifier) #for i in xrange(5): @@ -1967,17 +2107,12 @@ #testGetKeyInfo() - def testPutData(): - myIdentifier = c.putData( + def testChkPutData(): + myIdentifier = c.chkPutData( 'test123', #persistence=c.Persistence.Reboot, ) - #u = c.Upload(c.consts.KeyType.USK, privateKey='USK@eeqMkAamPTUz983Sfr4Ce-ckPUwFgpuTwB~wce0BK3E,rMfH3jUrLRz23fltO-LGEEjnni9DwNKlPzWzaDqOTe8,AQACAAE/') - #u.addData('foo/0/', 'test1234') - #u.addData('bar', 'test12345678') - #u.addData('baz', 'test12345678') - #c.putUpload(u) for i in xrange(500): c.next() @@ -1985,26 +2120,24 @@ #for i in xrange(5): # c.next() - #testPutData() + #testChkPutData() - def testPutFile(): + def testChkPutFile(): fpath = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test.jpg') - #identifier = c.putFile( - # fpath, - # ) - u = c.Upload(c.consts.KeyType.CHK) - u.addFile('', fpath) - c.putUpload(u) + identifier = c.chkPutFile( + fpath, + ) + - for i in xrange(1000): + for i in xrange(500): c.next() #c.removeRequest(identifier) #for i in xrange(5): # c.next() - #testPutFile() + #testChkPutFile() @@ -2040,7 +2173,7 @@ for i in xrange(10): c.next() - testConfigData() + #testConfigData() @@ -2067,8 +2200,11 @@ c.events.KeypairGenerated += cb c.generateKeypair('SSK@') c.generateKeypair('USK@') - for i in xrange(10): - c.next() + + + c.run() + #for i in xrange(10): + # c.next() #testGenerateKeypair() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |