Thread: SF.net SVN: fclient: [9] trunk/fclient/fclient_lib/fcp/fcp2_0.py
Status: Pre-Alpha
Brought to you by:
jurner
From: <jU...@us...> - 2007-10-20 13:37:47
|
Revision: 9 http://fclient.svn.sourceforge.net/fclient/?rev=9&view=rev Author: jUrner Date: 2007-10-20 06:37:51 -0700 (Sat, 20 Oct 2007) Log Message: ----------- bit of refactoring Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 10:02:28 UTC (rev 8) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 13:37:51 UTC (rev 9) @@ -22,79 +22,14 @@ except: pass SocketTimeout = 0.1 -class JobIdentifiers: +class FixedJobIdentifiers: """Fixed job identifiers @note: he client can only handle one job of these at a time """ ClientHello = 'ClientHello' ListPeers = 'ListPeers' -class Messages: - """All messages supported by the client""" - - # client messages - ClientHello = 'ClientHello' - ListPeer = 'ListPeer' # (since 1045) - ListPeers = 'ListPeers' - ListPeerNotes = 'ListPeerNotes' - AddPeer = 'AddPeer' - ModifyPeer = 'ModifyPeer' - ModifyPeerNote = 'ModifyPeerNote' - RemovePeer = 'RemovePeer' - GetNode = 'GetNode' - GetConfig = 'GetConfig' # (since 1027) - ModifyConfig = 'ModifyConfig' # (since 1027) - TestDDARequest = 'TestDDARequest' # (since 1027) - TestDDAResponse = 'TestDDAResponse' # (since 1027) - GenerateSSK = 'GenerateSSK' - ClientPut = 'ClientPut' - ClientPutDiskDir = 'ClientPutDiskDir' - ClientPutComplexDir = 'ClientPutComplexDir' - ClientGet = 'ClientGet' - SubscribeUSK = 'SubscribeUSK' - WatchGlobal = 'WatchGlobal' - GetRequestStatus = 'GetRequestStatus' - ListPersistentRequests = 'ListPersistentRequests' - RemovePersistentRequest = 'RemovePersistentRequest' - ModifyPersistentRequest = 'ModifyPersistentRequest' - Shutdown = 'Shutdown' - - # node messages - NodeHello = 'NodeHello' - CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName' - Peer = 'Peer' - PeerNote = 'PeerNote' - EndListPeers = 'EndListPeers' - EndListPeerNotes = 'EndListPeerNotes' - PeerRemoved = 'PeerRemoved' - NodeData = 'NodeData' - ConfigData = 'ConfigData' # (since 1027) - TestDDAReply = 'TestDDAReply' # (since 1027) - TestDDAComplete = 'TestDDAComplete' # (since 1027) - SSKKeypair = 'SSKKeypair' - PersistentGet = 'PersistentGet' - PersistentPut = 'PersistentPut' - PersistentPutDir = 'PersistentPutDir' - URIGenerated = 'URIGenerated' - PutSuccessful = 'PutSuccessful' - PutFetchable = 'PutFetchable' - DataFound = 'DataFound' - AllData = 'AllData' - StartedCompression = 'StartedCompression' - FinishedCompression = 'FinishedCompression' - SimpleProgress = 'SimpleProgress' - EndListPersistentRequests = 'EndListPersistentRequests' - PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016) - PersistentRequestModified = 'PersistentRequestModified' # (since 1016) - PutFailed = 'PutFailed' - GetFailed = 'GetFailed' - ProtocolError = 'ProtocolError' - IdentifierCollision = 'IdentifierCollision' - UnknownNodeIdentifier = 'UnknownNodeIdentifier' - UnknownPeerNoteType = 'UnknownPeerNoteType' - SubscribedUSKUpdate = 'SubscribedUSKUpdate' - class Priorities: """All priorities supported by the client""" @@ -109,12 +44,23 @@ PriorityMin = Minimum PriorityDefault = Bulk - -# errors - -class FetchErrors: +#************************************************************************************ +# exceptions +#************************************************************************************ +class FetchError(Exception): """All fetch errors supported by the client""" + def __init__(self, msg): + """ + @param msg: (Message) GetFailed message + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + def __str__(self): return self.value + MaxArchiveRecursionExceeded = '1' UnknownSplitfileMetadata = '2' UnknownMetadata = '3' @@ -145,9 +91,20 @@ NotAllDataFound = '28' -class InsertErrors: +class InsertError(Exception): """All insert errors supported by the client""" + def __init__(self, msg): + """ + @param msg: (Message) PutFailed message + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + def __str__(self): return self.value + InvalidUri = '1' BucketError = '2' InternalError = '3' @@ -160,9 +117,20 @@ Canceled = '10' -class ProtocolErrors: +class ProtocolError(Exception): """All protocol errors supported by the client""" + def __init__(self, msg): + """ + @param msg: (Message) ProtocolError message + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + def __str__(self): return self.value + ClientHelloMustBeFirst = '1' NoLateClientHellos = '2' MessageParseError = '3' @@ -195,15 +163,35 @@ OpennetDisabled = '30' DarknetOnly = '31' +class SocketError(Exception): pass #********************************************************************** # functions #********************************************************************** +def fcpBool(pythonBool): + """Converts a python bool to a fcp bool + @param pythonBool: (bool) + @return: (str) 'true' or 'false' + """ + if pythonBool: + return 'true' + return 'false' + + def newIdentifier(): """Returns a new unique identifier @return: (str) uuid """ return str(uuid.uuid4()) + +def pythonBool(fcpBool): + """Converts a fcp bool to a python bool + @param pythonBool: 'true' or 'false' + @return: (bool) True or False + """ + return fcpBool == 'true' + + def saveReadFile(fpath): """Reads contents of a file in the savest manner possible @param fpath: file to write @@ -267,42 +255,9 @@ stdout, stderr = p.communicate() return stdout - -def fcpBool(pythonBool): - """Converts a python bool to a fcp bool - @param pythonBool: (bool) - @return: (str) 'true' or 'false' - """ - if pythonBool: - return 'true' - return 'false' - -def pythonBool(fcpBool): - """Converts a fcp bool to a python bool - @param pythonBool: 'true' or 'false' - @return: (bool) True or False - """ - return fcpBool == 'true' - #********************************************************************** # classes #********************************************************************** -class FcpSocketError(Exception): pass -class FcpProtocolError(Exception): - def __init__(self, msg): - """ - @param msg: (Message) ProtocolError message - """ - self.value = '%s (%s, %s)' % ( - msg.get('CodeDescription', 'Unknown error') , - msg['Code'], - msg.get('ExtraDescription', '...'), - ) - - def __str__(self): return self.value - - - class FcpUri(object): """Wrapper class for freenet uris""" @@ -397,8 +352,68 @@ class Message(object): """Class wrapping a freenet message""" - Name = 'UMessage' + # client messages + ClientHello = 'ClientHello' + ListPeer = 'ListPeer' # (since 1045) + ListPeers = 'ListPeers' + ListPeerNotes = 'ListPeerNotes' + AddPeer = 'AddPeer' + ModifyPeer = 'ModifyPeer' + ModifyPeerNote = 'ModifyPeerNote' + RemovePeer = 'RemovePeer' + GetNode = 'GetNode' + GetConfig = 'GetConfig' # (since 1027) + ModifyConfig = 'ModifyConfig' # (since 1027) + TestDDARequest = 'TestDDARequest' # (since 1027) + TestDDAResponse = 'TestDDAResponse' # (since 1027) + GenerateSSK = 'GenerateSSK' + ClientPut = 'ClientPut' + ClientPutDiskDir = 'ClientPutDiskDir' + ClientPutComplexDir = 'ClientPutComplexDir' + ClientGet = 'ClientGet' + SubscribeUSK = 'SubscribeUSK' + WatchGlobal = 'WatchGlobal' + GetRequestStatus = 'GetRequestStatus' + ListPersistentRequests = 'ListPersistentRequests' + RemovePersistentRequest = 'RemovePersistentRequest' + ModifyPersistentRequest = 'ModifyPersistentRequest' + Shutdown = 'Shutdown' + # node messages + NodeHello = 'NodeHello' + CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName' + Peer = 'Peer' + PeerNote = 'PeerNote' + EndListPeers = 'EndListPeers' + EndListPeerNotes = 'EndListPeerNotes' + PeerRemoved = 'PeerRemoved' + NodeData = 'NodeData' + ConfigData = 'ConfigData' # (since 1027) + TestDDAReply = 'TestDDAReply' # (since 1027) + TestDDAComplete = 'TestDDAComplete' # (since 1027) + SSKKeypair = 'SSKKeypair' + PersistentGet = 'PersistentGet' + PersistentPut = 'PersistentPut' + PersistentPutDir = 'PersistentPutDir' + URIGenerated = 'URIGenerated' + PutSuccessful = 'PutSuccessful' + PutFetchable = 'PutFetchable' + DataFound = 'DataFound' + AllData = 'AllData' + StartedCompression = 'StartedCompression' + FinishedCompression = 'FinishedCompression' + SimpleProgress = 'SimpleProgress' + EndListPersistentRequests = 'EndListPersistentRequests' + PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016) + PersistentRequestModified = 'PersistentRequestModified' # (since 1016) + PutFailed = 'PutFailed' + GetFailed = 'GetFailed' + ProtocolError = 'ProtocolError' + IdentifierCollision = 'IdentifierCollision' + UnknownNodeIdentifier = 'UnknownNodeIdentifier' + UnknownPeerNoteType = 'UnknownPeerNoteType' + SubscribedUSKUpdate = 'SubscribedUSKUpdate' + def __init__(self, name, data=None, **params): """ @@ -410,29 +425,25 @@ self.data = data self.name = name self.params = params - - + def toString(self): """Returns a string with the formated message, ready to be send""" - # TODO: "Data" not yet implemented out = [self.name, ] for param, value in self.params.items(): out.append('%s=%s' % (param, value)) out.append('EndMessage\n') return '\n'.join(out) - - + def pprint(self): """Returns the message as nicely formated human readable string""" - - out = [self.name, ] + out = ['', '>>' + self.name, ] for param, value in self.params.items(): - out.append(' %s=%s' % (param, value)) - out.append('EndMessage') + out.append('>> %s=%s' % (param, value)) + out.append('>>EndMessage') out.append('') return '\n'.join(out) - + def __getitem__(self, name): """Returns the message parameter 'name' """ return self.params[name] @@ -444,14 +455,13 @@ def __setitem__(self, name, value): """Sets the message parameter 'name' to 'value' """ self.params[name] = value - + + class MessageSocketTimeout(Message): - Name = 'USocketTimeout' - def __init__(self): - Message.__init__(self, self.Name) + Message.__init__(self, 'USocketTimeOut') @@ -474,47 +484,29 @@ @ivar fcpIdentifier: identifier of the job @ivar fcpMessage: initial message send to the node @ivar fcpResult: if no error was encountered, holding the result of the job when complete - @ivar fcpTime: when the job is complete, holding the time the job took to complete + @ivar fcpTimeStart: time the job was started + @ivar fcpTimeStop: time the job was stopped """ - self.fcpClient = fcpClient # FcpClient() instance the job belongs to - self.fcpError = None # last error (either this is set or dcpResult) - self.fcpIdentifier = identifier # + self.fcpIdentifier = identifier # job identifier self.fcpMessage = message # message send to node - self.fcpResult = None # job result + self.fcpResult = None # job result (bool error, Message msg) self.fcpTimeStart = 0 # time the job was started self.fcpTimeStop = 0 # time the job was stopped - self.fcpStopped = False - - def displayName(self): - """Returns the display name of the job - @return: (str) display name - """ - return 'JobBase' - - def start(self): + + def handleStart(self): """Starts the job""" - self.fcpStopped = False + self.fcpResult = None self.fcpTimeStart = time.time() self.fcpClient.sendMessageEx(self.fcpMessage) - - def error(self, msg): - """Called on job completion if an error was encounterd while runnng the job - @param msg: (Message) to pass to the job - """ - self.fcpStopped = True - self.fcpTimeStop = time.time() - self.fcpError = msg - self.fcpResult = None - - def stop(self, msg): + + def handleStop(self, flagError, msg): """Called on job completion to stop the job + @param flagError: True if an error was encountered, False otherwise @param msg: (Message) to pass to the job """ - self.fcpStopped = True self.fcpTimeStop = time.time() - self.fcpError = None - self.fcpResult = msg + self.fcpResult = (flagError, msg) class JobClientHello(JobBase): @@ -532,11 +524,11 @@ @param expectedVersion: (str) node version expected """ message = Message( - Messages.ClientHello, + Message.ClientHello, Name=name if name is not None else newIdentifier(), ExpectedVersion=expectedVersion, ) - JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) + JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ClientHello, message) def displayName(self): return 'ClientHello' @@ -550,16 +542,13 @@ def __init__(self, fcpClient, withMetaData=False, withVolantile=False): message = Message( - Messages.ListPeers, + Message.ListPeers, WithMetadata=fcpBool(withMetaData), WithVolatile=fcpBool(withVolantile), ) - JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) + JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) - def displayName(self): - return 'ListPeers' - def handlePeer(self,msg): """Handles the next peer send by the node in form of a 'Peer' message while the job is running. Overwrite to process. @@ -592,7 +581,7 @@ """ identifier = newIdentifier() message = Message( - Messages.ClientGet, + Message.ClientGet, Identifier=identifier, URI=uri, MaxSize='0', @@ -602,37 +591,25 @@ ) JobBase.__init__(self, fcpClient, identifier, message) - - def displayName(self): - return 'GetFileInfo' - + def handleProgress(self, msg): """Handles the next progress made. Overwrite to process. """ - - - def error(self, msg): - JobBase.error(self, msg) - if msg.name == Messages.GetFailed: - if msg['Code'] == FetchErrors.TooBig: - self.fcpError = None - self.fcpResult = ( - msg.get('ExpectedMetadata.ContentType', ''), - msg.get('ExpectedDataLength', '') - ) - #else: - # raise ValueError('Unhandled message: %s' % msg.name) - - def stop(self, msg): - JobBase.stop(self, msg) - if msg.name == Messages.DataFound: + def handleStop(self, flagError, msg): + JobBase.handleStop(self,flagError, msg) + if msg.name == Message.DataFound: self.fcpResult = ( msg.get('Metadata.ContentType', ''), msg.get('DataLength', '') ) - else: - raise ValueError('Unhandled message: %s' % msg.name) + elif msg.name == Message.GetFailed: + if msg['Code'] == FetchError.TooBig: + self.fcpResult = (False, msg) + self.fcpResult = ( + msg.get('ExpectedMetadata.ContentType', ''), + msg.get('ExpectedDataLength', '') + ) @@ -648,7 +625,7 @@ raise ValueError('No such directory: %r' % directory) message = Message( - Messages.TestDDARequest, + Message.TestDDARequest, Directory=directory, WantReadDirectory=fcpBool(read), WantWriteDirectory=fcpBool(write), @@ -656,9 +633,7 @@ JobBase.__init__(self, fcpClient, directory, message) self.fcpTmpFile = None - def displayName(self): - return 'TestDDA' - + def handleTestDDAReply(self, msg): fpathWrite = msg.params.get('WriteFilename', None) fpathRead = msg.params.get('ReadFilename', None) @@ -677,22 +652,16 @@ readContent = '' self.fcpClient.sendMessage( - Messages.TestDDAResponse, + Message.TestDDAResponse, Directory=msg['Directory'], ReadContent=readContent, ) - def error(self, msg): - JobBase.error(self, msg) + def handleStop(self, flagError, msg): + JobBase.handleStop(self, flagError, msg) saveRemoveFile(self.fcpTmpFile) self.fcpTmpFile = None - - - def stop(self, msg): - JobBase.stop(self, msg) - saveRemoveFile(self.fcpTmpFile) - self.fcpTmpFile = None #************************************************************************** # fcp client @@ -706,8 +675,8 @@ ClientClose = 'Closing client' - MessageSend = 'Message send' - MessageReceived = 'Message received' + MessageSend = 'SendMessage' + MessageReceived = 'ReceivedMessage' JobStart = 'Starting job: ' JobStop = 'Stopping job: ' @@ -733,7 +702,7 @@ """ @param name: name of the client instance or '' (for debugging) @param errorHandler: will be called if the socket conncetion to the node is dead - with two params: FcpSocketError + details. When the handler is called the client + with two params: SocketError + details. When the handler is called the client is already closed. @param verbosity: verbosity level for debugging @param logMessages: LogMessages class containing messages @@ -747,7 +716,7 @@ 'complete': [], # ??? } self._errorHandler = errorHandler - self._log = logging.getLogger(NameClient + ':' + name) + self._log = logging.getLogger(name) self._logMessages = logMessages self._lock = thread.allocate_lock() self._socket = None @@ -792,10 +761,15 @@ pass else: self._log.info(self._logMessages.Connected) + + #NOTE: thought I could leave ClientHelloing up to the caller + # but instad of responding with ClientHelloMustBeFirst + # as expected the socket simply breaks. So take it over. job = JobClientHello(self) self.jobAdd(job, synchron=True) - assert job.fcpError is None, 'Error should have been caught by handleMessage()' - return job.fcpResult + error, msg = job.fcpResult + assert not error, 'Error should have been caught by handleMessage()' + return msg self._log.info(self._logMessages.ConnectionRetry) @@ -811,45 +785,45 @@ """Handles the next message from the freenet node @param msg: Message() to handle """ - self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint()) + self._log.debug(self._logMessages.MessageReceived + msg.pprint()) - if msg.name == Messages.NodeHello: + if msg.name == Message.NodeHello: #connectionIdentifier = msg['ConnectionIdentifier'] - self.jobStop(JobIdentifiers.ClientHello, msg) + self.jobStop(FixedJobIdentifiers.ClientHello, msg) - elif msg.name == Messages.ProtocolError: + elif msg.name == Message.ProtocolError: code = msg['Code'] - #if code == ProtocolErrors.NoLateClientHellos: - # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) - #elif code == ProtocolErrors.ClientHelloMustBeFirst: - # self.jobStop(JobIdentifiers.ClientHello, msg, error=True) + #if code == ProtocolError.NoLateClientHellos: + # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True) + #elif code == ProtocolError.ClientHelloMustBeFirst: + # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True) #else: identifier = msg.get('Identifier', None) if identifier is None: #TODO: inform caller - raise FcpProtocolError(msg) + raise ProtocolError(msg) else: - self.jobStop(identifier, msg, error=True) + self.jobStop(identifier, msg, flagError=True) - elif msg.name == Messages.Peer: - self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg) + elif msg.name == Message.Peer: + self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg) - elif msg.name == Messages.EndListPeers: - self.jobStop(IdentifierListPeers, msg) + elif msg.name == Message.EndListPeers: + self.jobStop(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Messages.GetFailed: - self.jobStop(msg['Identifier'], msg, error=True) + elif msg.name == Message.GetFailed: + self.jobStop(msg['Identifier'], msg, flagError=True) - elif msg.name == Messages.SimpleProgress: + elif msg.name == Message.SimpleProgress: self.jobNotify(msg['Identifier'], 'handleProgress', msg) - elif msg.name == Messages.TestDDAReply: + elif msg.name == Message.TestDDAReply: self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) - elif msg.name == Messages.TestDDAComplete: + elif msg.name == Message.TestDDAComplete: self.jobStop(msg['Directory'], msg) - elif msg.name == Messages.IdentifierCollision: + elif msg.name == Message.IdentifierCollision: pass @@ -867,10 +841,10 @@ finally: self._lock.release() - self._log.info(self._logMessages.JobStart + job.displayName()) - job.start() + self._log.info(self._logMessages.JobStart + job.fcpMessage.name) + job.handleStart() if synchron: - while not job.fcpStopped: + while job.fcpResult is None: self.next() @@ -892,11 +866,11 @@ #TODO: quite unclear when to remove a job - def jobStop(self, identifier, msg, error=False): + def jobStop(self, identifier, msg, flagError=False): """Stops a job @param identifier: identifier of the job to stop @param msg: Message() to pass to the job as result - @param error: set to True to indicate unsuccessful completion of the job, True otherwisse + @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse """ self._lock.acquire(True) try: @@ -912,11 +886,8 @@ if job is None: raise ValueError('No such job: %r' % identifier) - self._log.info(self._logMessages.JobStop + job.displayName()) - if error: - job.error(msg) - else: - job.stop(msg) + self._log.info(self._logMessages.JobStop + job.fcpMessage.name) + job.handleStop(flagError, msg) #TODO: some info when all jobs are completed @@ -931,7 +902,7 @@ def readMessage(self): """Reads the next message directly from the socket and dispatches it @return: (Message) the next message read from the socket - @raise FcpSocketError: if the socket connection to the node dies unexpectedly + @raise SocketError: if the socket connection to the node dies unexpectedly If an error handler is passed to the client it is called emidiately before the error is raised. """ @@ -949,8 +920,8 @@ self._log.info(self._logMessages.SocketDead) self.close() if self._errorHandler is not None: - self._errorHandler(FcpSocketError, d) - raise FcpSocketError(d) #!! + self._errorHandler(SocketError, d) + raise SocketError(d) #!! if p == '\r': # ignore continue @@ -975,8 +946,8 @@ self._log.info(self._logMessages.SocketDead) self.close() if self._errorHandler is not None: - self._errorHandler(FcpSocketError, d) - raise FcpSocketError(d) #!! + self._errorHandler(SocketError, d) + raise SocketError(d) #!! else: head, sep, tail = line.partition('=') @@ -1019,7 +990,7 @@ @param data: data to atatch to the message @param params: {para-name: param-calue, ...} of parameters to pass along with the message (see freenet protocol) - @raise FcpSocketError: if the socket connection to the node dies unexpectedly + @raise SocketError: if the socket connection to the node dies unexpectedly If an error handler is passed to the client it is called emidiately before the error is raised. """ @@ -1030,20 +1001,19 @@ """Sends a message to freenet @param msg: (Message) message to send @return: Message - @raise FcpSocketError: if the socket connection to the node dies unexpectedly. + @raise SocketError: if the socket connection to the node dies unexpectedly. If an error handler is passed to the client it is called emidiately before the error is raised. """ - rawMsg = msg.toString() - self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint()) + self._log.debug(self._logMessages.MessageSend + msg.pprint()) try: - self._socket.sendall(rawMsg) - except Exception, d: + self._socket.sendall(msg.toString()) + except socket.error, d: self._log.info(self._logMessages.SocketDead) self.close() if self._errorHandler is not None: - self._errorHandler(FcpSocketError, d) - raise FcpSocketError(d) + self._errorHandler(SocketError, d) + raise SocketError(d) return msg @@ -1061,7 +1031,8 @@ #***************************************************************************** if __name__ == '__main__': c = FcpClient(name='test', verbosity=logging.DEBUG) - if c.connect(): + nodeHello = c.connect() + if nodeHello is not None: def foo(): job1 = JobClientHello(c) c.jobAdd(job1) @@ -1070,8 +1041,8 @@ print '---------------------------' print job1.fcpError print job1.fcpResult - print job1.fcpTime print '---------------------------' + # should raise #foo() @@ -1081,18 +1052,25 @@ c.jobAdd(job2) c.run() print '---------------------------' - print job2.fcpError print job2.fcpResult - print job2.fcpTime print '---------------------------' - + #foo() + def foo(): + job2 = JobListPeers(c) + c.jobAdd(job2) + c.run() + print '---------------------------' + print job2.fcpResult + print '---------------------------' + #foo() + + + def foo(): job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') c.jobAdd(job2) c.run() print '---------------------------' - print job2.fcpError print job2.fcpResult - print job2.fcpTime print '---------------------------' #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-22 09:02:02
|
Revision: 10 http://fclient.svn.sourceforge.net/fclient/?rev=10&view=rev Author: jUrner Date: 2007-10-22 02:02:04 -0700 (Mon, 22 Oct 2007) Log Message: ----------- continued working on protocol implementation Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 13:37:51 UTC (rev 9) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10) @@ -16,10 +16,10 @@ #************************************************************** NameClient = 'Fcp20Client' DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() -DefaultFcpPort = 9481 try: DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) -except: pass +except ValueError: + DefaultFcpPort = 9481 SocketTimeout = 0.1 class FixedJobIdentifiers: @@ -33,13 +33,13 @@ class Priorities: """All priorities supported by the client""" - Maximum = 0 - Interactive = 1 - SemiInteractive = 2 - Updatable = 3 - Bulk = 4 - Prefetch = 5 - Minimum = 6 + Maximum = '0' + Interactive = '1' + SemiInteractive = '2' + Updatable = '3' + Bulk = '4' + Prefetch = '5' + Minimum = '6' PriorityMin = Minimum PriorityDefault = Bulk @@ -471,42 +471,40 @@ class JobBase(object): """Base class for jobs""" - _fcp_auto_remove_ = True - + def __init__(self, fcpClient, identifier, message): """ @param fcpClient: FcpClient() instance @param identifier: (str) identifier of the job @param message: (Message) to send to the node whne the job ist started - @ivar fcpClient: FcpClient() instance - @ivar fcpError: holding the error message if an error was encountered while running - the job, None otherwise - @ivar fcpIdentifier: identifier of the job - @ivar fcpMessage: initial message send to the node - @ivar fcpResult: if no error was encountered, holding the result of the job when complete - @ivar fcpTimeStart: time the job was started - @ivar fcpTimeStop: time the job was stopped + @ivar jobClient: FcpClient() instance of the job + @ivar jobIdentifier: identifier of the job + @ivar jobMessage: message to be send to the node + @ivar jobResult: if no error was encountered, holding the result of the job when complete + @ivar jobTimeStart: time the job was started + @ivar jobTimeStop: time the job was stopped """ - self.fcpClient = fcpClient # FcpClient() instance the job belongs to - self.fcpIdentifier = identifier # job identifier - self.fcpMessage = message # message send to node - self.fcpResult = None # job result (bool error, Message msg) - self.fcpTimeStart = 0 # time the job was started - self.fcpTimeStop = 0 # time the job was stopped + self.jobClient = fcpClient + self.jobIdentifier = identifier + self.jobMessage = message + self.jobResult = None + self.jobTimeStart = 0 + self.jobTimeStop = 0 + def handleStart(self): """Starts the job""" - self.fcpResult = None - self.fcpTimeStart = time.time() - self.fcpClient.sendMessageEx(self.fcpMessage) + self.jobResult = None + self.jobTimeStart = time.time() + self.jobClient.sendMessageEx(self.jobMessage) def handleStop(self, flagError, msg): """Called on job completion to stop the job @param flagError: True if an error was encountered, False otherwise @param msg: (Message) to pass to the job """ - self.fcpTimeStop = time.time() - self.fcpResult = (flagError, msg) + self.jobTimeStop = time.time() + self.jobResult = (flagError, msg) class JobClientHello(JobBase): @@ -516,8 +514,7 @@ goes well, you will get a NodeHello in response. """ - _fcp_auto_remove_ = True - + def __init__(self, fcpClient, name=None, expectedVersion='2.0'): """ @param name: (str) connection name or None, to use an arbitrary name @@ -538,8 +535,7 @@ """Lists all known peers of the node """ - _fcp_auto_remove_ = True - + def __init__(self, fcpClient, withMetaData=False, withVolantile=False): message = Message( Message.ListPeers, @@ -561,13 +557,10 @@ On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file. Note, that both members may be '' (empty string) - """ - _fcp_auto_remove_ = False - - - # idea is to provoke a GetFailed message and take mimetype and size from it + + # idea is to provoke a GetFailed message and take mimetype and size from 'GetFailed' def __init__(self, fcpClient, uri, **params): """ @param fcpClient: FcpClient() instance @@ -584,7 +577,9 @@ Message.ClientGet, Identifier=identifier, URI=uri, - MaxSize='0', + # suggested by Mathew Toseland to use about 32k for mimeType requests + # basic sizes of keys are: 1k for SSks and 32k for CHKs + MaxSize='32000', ReturnType='none', Verbosity='1', **params @@ -592,34 +587,77 @@ JobBase.__init__(self, fcpClient, identifier, message) + def getPrority(self): + return self.jobMessage.get('PriorityClass', Priorities.PriorityDefault) + + + def setPriority(self, priority): + if not priority in Priorities: + raise ValueError('Invalid priority: %r' % priority) + self.jobClient.sendMessage( + Message.ModifyPersistentRequest, + Identifier=self.jobIdentifier, + Global=fcpBool(False), + PriorityClass=priority, + ) + # not shure if the response arrives in any case, so set it here + self.jobMessage['PriorityClass'] = priority + + + def stopRequest(self): + self.jobClient.sendMessage( + Message.RemovePersistentRequest, + Global=fcpBool(False), + Identifier=self.jobIdentifier, + ) + + + def handlePriorityChanged(self): + self.jobMessage['PriorityClass'] = priority + def handleProgress(self, msg): """Handles the next progress made. Overwrite to process. """ + + def handleRequestRemoved(self, msg): + if self.jobClient.jobIsRunning(self.jobIdentifier): + self.jobClient.jobStop(self.jobIdentifier) + def handleStop(self, flagError, msg): + """called when a job is finished + @note: jobResult may contain an 'IdentifierCollision' message. Make shure + to handle this appropriately + """ JobBase.handleStop(self,flagError, msg) if msg.name == Message.DataFound: - self.fcpResult = ( + self.jobResult = ( msg.get('Metadata.ContentType', ''), msg.get('DataLength', '') ) elif msg.name == Message.GetFailed: if msg['Code'] == FetchError.TooBig: - self.fcpResult = (False, msg) - self.fcpResult = ( + self.jobResult = (False, msg) + self.jobResult = ( msg.get('ExpectedMetadata.ContentType', ''), msg.get('ExpectedDataLength', '') ) - + # not shure when removing the request is actually required + # so send it anyway + self.jobClient.sendMessage( + Message.RemovePersistentRequest, + Global=fcpBool(False), + Identifier=self.jobIdentifier, + ) + - + #TODO: handle case where directories are registered multiple times class JobTestDDA(JobBase): """Tests a directory for read / write accesss """ - _fcp_auto_remove_ = False - + def __init__(self, fcpClient, directory, read=True, write=True): if not os.path.isdir(directory): raise ValueError('No such directory: %r' % directory) @@ -631,7 +669,7 @@ WantWriteDirectory=fcpBool(write), ) JobBase.__init__(self, fcpClient, directory, message) - self.fcpTmpFile = None + self.jobTmpFile = None def handleTestDDAReply(self, msg): @@ -644,14 +682,14 @@ if os.path.isfile(fpathWrite): os.remove(fpathWrite) else: - self.fcpTmpFile = fpathWrite + self.jobTmpFile = fpathWrite if fpathRead is not None: readContent = saveReadFile(fpathRead) if readContent is None: readContent = '' - self.fcpClient.sendMessage( + self.jobClient.sendMessage( Message.TestDDAResponse, Directory=msg['Directory'], ReadContent=readContent, @@ -660,8 +698,8 @@ def handleStop(self, flagError, msg): JobBase.handleStop(self, flagError, msg) - saveRemoveFile(self.fcpTmpFile) - self.fcpTmpFile = None + saveRemoveFile(self.jobTmpFile) + self.jobTmpFile = None #************************************************************************** # fcp client @@ -682,6 +720,7 @@ JobStop = 'Stopping job: ' JobsCompleted = 'All jobs completed' + KeyboardInterrupt = 'Keyboard interrupt' SocketDead = 'Socket is dead' @@ -710,10 +749,10 @@ self._isConnected = False self._jobs = { - 'all': {}, - 'pending': [], # ??? - 'running': [], - 'complete': [], # ??? + #TODO: check if JobList is still required + 'JobMapping': {}, + 'JobList': [], + 'RegisteredDirectories': [], } self._errorHandler = errorHandler self._log = logging.getLogger(name) @@ -734,7 +773,7 @@ self._socket = None - #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call + #TODO: an iterator would be nice to enshure Guis stay responsitive in the call def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): """Establishes the connection to a freenet node @param host: (str) host of th node @@ -767,7 +806,7 @@ # as expected the socket simply breaks. So take it over. job = JobClientHello(self) self.jobAdd(job, synchron=True) - error, msg = job.fcpResult + error, msg = job.jobResult assert not error, 'Error should have been caught by handleMessage()' return msg @@ -782,9 +821,13 @@ def handleMessage(self, msg): - """Handles the next message from the freenet node - @param msg: Message() to handle + """Handles a message from the freenet node + @param msg: (Message) to handle + @return: True if the message was handled, False otherwise """ + + + #if msg.name != 'USocketTimeOut': self._log.debug(self._logMessages.MessageReceived + msg.pprint()) if msg.name == Message.NodeHello: @@ -803,30 +846,57 @@ #TODO: inform caller raise ProtocolError(msg) else: - self.jobStop(identifier, msg, flagError=True) + return self.jobStop(identifier, msg, flagError=True) - elif msg.name == Message.Peer: - self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg) + if msg.name == Message.DataFound: + return self.jobStop(msg['Identifier'], msg, flagError=True) elif msg.name == Message.EndListPeers: - self.jobStop(FixedJobIdentifiers.ListPeers, msg) + return self.jobStop(FixedJobIdentifiers.ListPeers, msg) elif msg.name == Message.GetFailed: - self.jobStop(msg['Identifier'], msg, flagError=True) + return self.jobStop(msg['Identifier'], msg, flagError=True) + elif msg.name == Message.PersistentRequestRemoved: + return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg) + elif msg.name == Message.SimpleProgress: - self.jobNotify(msg['Identifier'], 'handleProgress', msg) + return self.jobNotify(msg['Identifier'], 'handleProgress', msg) + elif msg.name == Message.TestDDAComplete: + return self.jobStop(msg['Directory'], msg) + elif msg.name == Message.TestDDAReply: - self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) + return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) - elif msg.name == Message.TestDDAComplete: - self.jobStop(msg['Directory'], msg) + elif msg.name == Message.IdentifierCollision: + # identifier is unique to us, but for some reason not unique to the node + #TODO: maybe for future implementations find a way to automise + # reinsertion and a notification via a handleJobChanged() for the + # caller to overwrite + return self.jobStop(msg['Identifier'], msg, flagError=True) - elif msg.name == Message.IdentifierCollision: - pass + elif msg.name == Message.Peer: + return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg) + + return False + ######################################################### + ## jobs + ######################################################### + def hasJobsRunning(self): + """Checks if the client has running jobs + @return: (bool) True if so, False otherwise + """ + self._lock.acquire(True) + try: + result = bool(self._jobs['JobList']) + finally: + self._lock.release() + return result + + def jobAdd(self, job, synchron=False): """Adds a job to the client @param job: (Job*) job to add @@ -834,35 +904,62 @@ """ self._lock.acquire(True) try: - if job.fcpIdentifier in self._jobs['all']: - raise ValueError('Duplicate job: %r' % job.identifier) - self._jobs['all'][job.fcpIdentifier] = job - self._jobs['running'].append(job) + if job.jobIdentifier in self._jobs['JobMapping']: + raise ValueError('Duplicate job: %r' % job.jobIdentifier) + self._jobs['JobMapping'][job.jobIdentifier] = job + self._jobs['JobList'].append(job) finally: self._lock.release() - self._log.info(self._logMessages.JobStart + job.fcpMessage.name) + self._log.info(self._logMessages.JobStart + job.jobMessage.name) job.handleStart() if synchron: - while job.fcpResult is None: + while job.jobResult is None: self.next() + def jobGet(self, identifier): + """Returns a job given its identifier + @param identifier: identifier of the job + @return: (Job*) instance or None, if no corrosponding job was found + """ + self._lock.acquire(True) + try: + result = self._jobs['JobMapping'].get(identifier, None) + finally: + self._lock.release() + return result + + def jobIsRunning(self, identifier): + """Checks if a job is running + @param identifier: identifier of the job + @return: True if so, False otherwise + """ + self._lock.acquire(True) + try: + result = identifier in self._jobs['JobMapping'] + finally: + self._lock.release() + return result + + def jobNotify(self, identifier, handler, msg): """Notifies a job about an event while it is running @param identifier: identifier of the job to notify @param handler: (str) method of the job to call to handle the notification @param msg: Message() to pass to the job + @return: True if a job was found to be notified, False otherwise """ self._lock.acquire(True) try: - job = self._jobs['all'].get(identifier, None) + job = self._jobs['JobMapping'].get(identifier, None) finally: self._lock.release() if job is None: - raise ValueError('No such job: %r' % identifier) + return False getattr(job, handler)(msg) + return True #TODO: quite unclear when to remove a job @@ -871,25 +968,23 @@ @param identifier: identifier of the job to stop @param msg: Message() to pass to the job as result @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse + @return: True if a job was found to be stopped, False otherwise """ self._lock.acquire(True) try: - job = self._jobs['all'].get(identifier, None) + job = self._jobs['JobMapping'].get(identifier, None) if job is not None: - self._jobs['running'].remove(job) - if job._fcp_auto_remove_: - del self._jobs['all'][identifier] - else: - self._jobs['complete'].append(job) + self._jobs['JobList'].remove(job) + del self._jobs['JobMapping'][identifier] finally: self._lock.release() - if job is None: - raise ValueError('No such job: %r' % identifier) - self._log.info(self._logMessages.JobStop + job.fcpMessage.name) + return False + self._log.info(self._logMessages.JobStop + job.jobMessage.name) job.handleStop(flagError, msg) + return True + - #TODO: some info when all jobs are completed def next(self): """Pumps the next message waiting @@ -963,21 +1058,8 @@ """Runs the client untill all jobs passed to it are completed @note: use KeyboardInterrupt to stop prematurely """ - - # TODO: - # x. push pending jobs try: - while True: - if not self._lock.acquire(False): - continue - - try: - if not self._jobs['pending'] and not self._jobs['running']: - self._log.info(self._logMessages.JobsCompleted) - break - finally: - self._lock.release() - + while self.hasJobsRunning(): self.next() except KeyboardInterrupt: self._log(self._logMessages.KeyboardInterrupt) @@ -1026,6 +1108,16 @@ """""" self._log.setLevel(verbosity) + + ######################################################## + ## + ######################################################## + def getFileInfo(self, job): + pass + + def getFile(self, job): + pass + #***************************************************************************** # #***************************************************************************** @@ -1039,8 +1131,7 @@ c.run() print '---------------------------' - print job1.fcpError - print job1.fcpResult + print job1.jobResult print '---------------------------' # should raise #foo() @@ -1052,7 +1143,7 @@ c.jobAdd(job2) c.run() print '---------------------------' - print job2.fcpResult + print job2.jobResult print '---------------------------' #foo() @@ -1061,7 +1152,7 @@ c.jobAdd(job2) c.run() print '---------------------------' - print job2.fcpResult + print job2.jobResult print '---------------------------' #foo() @@ -1071,6 +1162,6 @@ c.jobAdd(job2) c.run() print '---------------------------' - print job2.fcpResult + print job2.jobResult print '---------------------------' #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-23 09:19:10
|
Revision: 11 http://fclient.svn.sourceforge.net/fclient/?rev=11&view=rev Author: jUrner Date: 2007-10-23 02:19:12 -0700 (Tue, 23 Oct 2007) Log Message: ----------- combed over message dispatch Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11) @@ -14,7 +14,6 @@ #************************************************************** # consts #************************************************************** -NameClient = 'Fcp20Client' DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() try: DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) @@ -28,6 +27,13 @@ """ ClientHello = 'ClientHello' ListPeers = 'ListPeers' + ListPeerNotes = 'ListPeerNotes' + GetNode = 'GetNode' + GetConfig = 'GetConfig' + ModifyConfig = 'ModifyConfig' + WatchGlobal = 'WatchGlobal' + Shutdown = 'Shutdown' + class Priorities: @@ -492,6 +498,10 @@ self.jobTimeStop = 0 + def handleMessage(self, msg): + return False + + def handleStart(self): """Starts the job""" self.jobResult = None @@ -514,7 +524,6 @@ goes well, you will get a NodeHello in response. """ - def __init__(self, fcpClient, name=None, expectedVersion='2.0'): """ @param name: (str) connection name or None, to use an arbitrary name @@ -529,13 +538,36 @@ def displayName(self): return 'ClientHello' + + + def handleMessage(self, msg): + if msg.name == Message.NodeHello: + return self.handleNodeHello(msg) + elif msg.name == Message.ProtocolError: + return self.handleProtocolError(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handleNodeHello(self, msg): + self.jobTimeStop = time.time() + self.jobResult = msg + self.jobClient.jobRemove(self.jobIdentifier) + return True + + + def handleProtocolError(self, msg): + raise ProtocolError(msg) + + + + + class JobListPeers(JobBase): """Lists all known peers of the node """ - - + def __init__(self, fcpClient, withMetaData=False, withVolantile=False): message = Message( Message.ListPeers, @@ -545,13 +577,26 @@ JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) - def handlePeer(self,msg): - """Handles the next peer send by the node in form of a 'Peer' message - while the job is running. Overwrite to process. - """ + def handleMessage(self,msg): + if msg.name == Message.EndListPeers: + return self.handleEndListPeers(msg) + elif msg.name == Message.Peer: + return self.handlePeer(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handlePeer(self, msg): + return True + + def handleEndListPeers(self, msg): + self.jobTimeStop = time.time() + self.jobResult = msg + self.jobClient.jobRemove(self.jobIdentifier) + return True + - +#TODO: identifier collisions are not yet handled class JobGetFileInfo(JobBase): """Tries to retieve information about a file. If everything goes well @@ -571,6 +616,11 @@ MaxRetries=-1 ...N PriorityClass=Priority* + @ivar jobResult: will be a tuple(bool error, data). If error is True, no information could be + retrieved and data will be a GetFailed message containing details. If error is False + data will be a tuple(str metadataContentType, str size). Note that both may be empty + string and size may not be accurate. + """ identifier = newIdentifier() message = Message( @@ -612,46 +662,73 @@ ) - def handlePriorityChanged(self): - self.jobMessage['PriorityClass'] = priority + def handleMessage(self, msg): + if msg.name == Message.DataFound: + return self.handleDataFound(msg) + elif msg.name == Message.GetFailed: + return self.handleGetFailed(msg) + elif msg.name == Message.IdentifierCollision: + return self.handleIdentifierCollision(msg) + elif msg.name == Message.PersistentRequestModified: + return self.handlePersistentRequestModified(msg) + elif msg.name == Message.PersistentRequestRemoved: + return self.handlePersistentRequestRemoved(msg) + elif msg.name == Message.SimpleProgress: + return self.handleSimpleProgress(msg) + + else: + raise ValueError('Unexpected message: %s' % msg.name) + - def handleProgress(self, msg): - """Handles the next progress made. Overwrite to process. - """ + def handleDataFound(self, msg): + self.jobTimeStop = time.time() + self.jobResult = ( + False, + ( + msg.get('Metadata.ContentType', ''), + msg.get('DataLength', '') + ) + ) + self.jobClient.jobRemove(self.jobIdentifier) + return True - def handleRequestRemoved(self, msg): + def handleGetFailed(self, msg): + self.jobTimeStop = time.time() + if msg['Code'] == FetchError.TooBig: + self.jobResult = (False, msg) + self.jobResult = ( + False, + ( + msg.get('ExpectedMetadata.ContentType', ''), + msg.get('ExpectedDataLength', '') + ) + ) + else: + self.jobResult = (True, msg) + self.jobClient.jobRemove(self.jobIdentifier) + return True + + + def handleIdentifierCollision(self, msg): + raise + + + def handleSimpleProgress(self, msg): + return True + + + def handlePersistentRequestModified(self, msg): + priorityClass = msg.get('PriorityClass', None) + if priorityClass is not None: + self.jobMessage['PriorityClass'] = priorityClass + return True + + def handlePersistentRequestRemoved(self, msg): if self.jobClient.jobIsRunning(self.jobIdentifier): self.jobClient.jobStop(self.jobIdentifier) + return True + - - def handleStop(self, flagError, msg): - """called when a job is finished - @note: jobResult may contain an 'IdentifierCollision' message. Make shure - to handle this appropriately - """ - JobBase.handleStop(self,flagError, msg) - if msg.name == Message.DataFound: - self.jobResult = ( - msg.get('Metadata.ContentType', ''), - msg.get('DataLength', '') - ) - elif msg.name == Message.GetFailed: - if msg['Code'] == FetchError.TooBig: - self.jobResult = (False, msg) - self.jobResult = ( - msg.get('ExpectedMetadata.ContentType', ''), - msg.get('ExpectedDataLength', '') - ) - # not shure when removing the request is actually required - # so send it anyway - self.jobClient.sendMessage( - Message.RemovePersistentRequest, - Global=fcpBool(False), - Identifier=self.jobIdentifier, - ) - - - #TODO: handle case where directories are registered multiple times class JobTestDDA(JobBase): """Tests a directory for read / write accesss @@ -672,6 +749,16 @@ self.jobTmpFile = None + + def handleMessage(self, msg): + if msg.name == Message.TestDDAReply: + return self.handleTestDDAReply(msg) + elif msg.name == Message.TestDDAComplete: + return self.handleTestDDAComplete(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handleTestDDAReply(self, msg): fpathWrite = msg.params.get('WriteFilename', None) fpathRead = msg.params.get('ReadFilename', None) @@ -693,13 +780,18 @@ Message.TestDDAResponse, Directory=msg['Directory'], ReadContent=readContent, - ) + ) + return True + - - def handleStop(self, flagError, msg): - JobBase.handleStop(self, flagError, msg) + def handleTestDDAComplete(self, msg): + self.jobTimeStop = time.time() + self.jobResult = msg saveRemoveFile(self.jobTmpFile) self.jobTmpFile = None + self.jobClient.jobRemove(self.jobIdentifier) + return True + #************************************************************************** # fcp client @@ -728,6 +820,10 @@ #TODO: no idea what happens on reconnect if socket died. What about running jobs? #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. #TODO: no idea if to add support for pending jobs and queue management here + +#TODO: do not mix directories as identifiers with identifiers (might lead to collisions) +#TODO: how to handle (ProtocolError code 18: Shutting down)? + class FcpClient(object): """Fcp client implementation""" @@ -757,7 +853,7 @@ self._errorHandler = errorHandler self._log = logging.getLogger(name) self._logMessages = logMessages - self._lock = thread.allocate_lock() + self._lock = thread.allocate_lock() # lovk when resources are accessed self._socket = None self.setVerbosity(verbosity) @@ -806,9 +902,8 @@ # as expected the socket simply breaks. So take it over. job = JobClientHello(self) self.jobAdd(job, synchron=True) - error, msg = job.jobResult - assert not error, 'Error should have been caught by handleMessage()' - return msg + assert job.jobResult is not None, 'ClientHello is not working as expected' + return job.jobResult self._log.info(self._logMessages.ConnectionRetry) @@ -827,59 +922,66 @@ """ - #if msg.name != 'USocketTimeOut': + if msg.name == 'USocketTimeOut': + return True + self._log.debug(self._logMessages.MessageReceived + msg.pprint()) - if msg.name == Message.NodeHello: - #connectionIdentifier = msg['ConnectionIdentifier'] - self.jobStop(FixedJobIdentifiers.ClientHello, msg) - - elif msg.name == Message.ProtocolError: + + if msg.name == Message.ProtocolError: code = msg['Code'] - #if code == ProtocolError.NoLateClientHellos: - # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True) - #elif code == ProtocolError.ClientHelloMustBeFirst: - # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True) - #else: - identifier = msg.get('Identifier', None) - if identifier is None: - #TODO: inform caller - raise ProtocolError(msg) + if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst: + return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) + + elif code == ProtocolError.Shutdown: + if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg): + + # ######################################## + #TODO: ??? + + return True + else: - return self.jobStop(identifier, msg, flagError=True) + identifier = msg.get('Identifier', None) + if identifier is None: + #TODO: inform caller + raise ProtocolError(msg) + else: + return self.jobDispatchMessage(identifier, msg) + + else: + + # check if the is something like an identifier in the message + if msg.name == Message.TestDDAReply: + identifier = msg['Directory'] + elif msg.name == Message.TestDDAComplete: + identifier = msg['Directory'] + else: + identifier = msg.get('Identifier', None) - if msg.name == Message.DataFound: - return self.jobStop(msg['Identifier'], msg, flagError=True) - - elif msg.name == Message.EndListPeers: - return self.jobStop(FixedJobIdentifiers.ListPeers, msg) - - elif msg.name == Message.GetFailed: - return self.jobStop(msg['Identifier'], msg, flagError=True) + # dispatch to jobs with fixed identifiers + if identifier is None: + + if msg.name == Message.NodeHello: + return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - elif msg.name == Message.PersistentRequestRemoved: - return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg) - - elif msg.name == Message.SimpleProgress: - return self.jobNotify(msg['Identifier'], 'handleProgress', msg) + elif msg.name == Message.EndListPeers: + return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Message.TestDDAComplete: - return self.jobStop(msg['Directory'], msg) + elif msg.name == Message.Peer: + return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Message.TestDDAReply: - return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg) + # more here..... + + else: + raise ValueError('Unhandled message: ' + msg.name) + + else: + return self.jobDispatchMessage(identifier, msg) + + raise RuntimeError('Should not have endet here: %s' % msg.name) - elif msg.name == Message.IdentifierCollision: - # identifier is unique to us, but for some reason not unique to the node - #TODO: maybe for future implementations find a way to automise - # reinsertion and a notification via a handleJobChanged() for the - # caller to overwrite - return self.jobStop(msg['Identifier'], msg, flagError=True) - - elif msg.name == Message.Peer: - return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg) - - return False + ######################################################### @@ -905,6 +1007,9 @@ self._lock.acquire(True) try: if job.jobIdentifier in self._jobs['JobMapping']: + raise ValueError('Job with that identifier already present') + + if job.jobIdentifier in self._jobs['JobMapping']: raise ValueError('Duplicate job: %r' % job.jobIdentifier) self._jobs['JobMapping'][job.jobIdentifier] = job self._jobs['JobList'].append(job) @@ -916,7 +1021,19 @@ if synchron: while job.jobResult is None: self.next() + + def jobDispatchMessage(self, identifier, msg): + """Dispatches a message to a job + @param identifier: identifier of the job + @param msg: (Message) message to dispatch + @return: True if the message was handled, False otherwise + """ + job = self.jobGet(identifier) + if job is not None: + return job.handleMessage(msg) + return False + def jobGet(self, identifier): """Returns a job given its identifier @@ -943,36 +1060,15 @@ self._lock.release() return result - - def jobNotify(self, identifier, handler, msg): - """Notifies a job about an event while it is running - @param identifier: identifier of the job to notify - @param handler: (str) method of the job to call to handle the notification - @param msg: Message() to pass to the job - @return: True if a job was found to be notified, False otherwise + + def jobRemove(self, identifier): + """Removes a job unconditionally + @param identifier: identifier of the job to remove + @return: True if the job was found, False otherwise """ self._lock.acquire(True) try: job = self._jobs['JobMapping'].get(identifier, None) - finally: - self._lock.release() - if job is None: - return False - getattr(job, handler)(msg) - return True - - - #TODO: quite unclear when to remove a job - def jobStop(self, identifier, msg, flagError=False): - """Stops a job - @param identifier: identifier of the job to stop - @param msg: Message() to pass to the job as result - @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse - @return: True if a job was found to be stopped, False otherwise - """ - self._lock.acquire(True) - try: - job = self._jobs['JobMapping'].get(identifier, None) if job is not None: self._jobs['JobList'].remove(job) del self._jobs['JobMapping'][identifier] @@ -981,10 +1077,9 @@ if job is None: return False self._log.info(self._logMessages.JobStop + job.jobMessage.name) - job.handleStop(flagError, msg) return True - + #TODO: some info when all jobs are completed def next(self): """Pumps the next message waiting @@ -1059,8 +1154,13 @@ @note: use KeyboardInterrupt to stop prematurely """ try: + #n = 0 + while self.hasJobsRunning(): + #n += 1 + #if n > 40: break self.next() + except KeyboardInterrupt: self._log(self._logMessages.KeyboardInterrupt) self.close() @@ -1117,7 +1217,9 @@ def getFile(self, job): pass + + #***************************************************************************** # #***************************************************************************** @@ -1158,10 +1260,22 @@ def foo(): - job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') - c.jobAdd(job2) + job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip') + + + #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + #job.jobIdentifier = job.jobMessage['Identifier'] = 1 + #job.jobMessage['Identifier'] = 1 + #job.jobIdentifier = 1 + c.jobAdd(job) + + #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%201%281%29.jpg') + #job.jobMessage['Identifier'] = 1 + #job.jobIdentifier = 1 + #c.jobAdd(job) + c.run() print '---------------------------' - print job2.jobResult + print job.jobResult print '---------------------------' #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-25 14:04:41
|
Revision: 12 http://fclient.svn.sourceforge.net/fclient/?rev=12&view=rev Author: jUrner Date: 2007-10-25 05:59:16 -0700 (Thu, 25 Oct 2007) Log Message: ----------- bit more work on protocol Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-25 12:59:16 UTC (rev 12) @@ -1,5 +1,16 @@ +'''Freenet client protocol 2.0 implementation +''' +#NOTE: +# +# downloading data to disk is not supported st the moment. TestDDA code is quite unwritable +# and as far as I can see there are plans to get rid of it. So wait... +# +# + + import atexit +import base64 import logging import os import re @@ -187,7 +198,7 @@ """Returns a new unique identifier @return: (str) uuid """ - return str(uuid.uuid4()) + return 'fclient::' + str(uuid.uuid4()) def pythonBool(fcpBool): @@ -251,7 +262,7 @@ @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat') @return: (string) whatever freenet returns """ - #TODO: on windows it may be necessary to hide the comand window + #TODO: on windows it may be necessary to hide the command window p = subprocess.Popen( args=cmdline, shell=True, @@ -368,10 +379,10 @@ ModifyPeerNote = 'ModifyPeerNote' RemovePeer = 'RemovePeer' GetNode = 'GetNode' - GetConfig = 'GetConfig' # (since 1027) + GetConfig = 'GetConfig' # (since 1027) ModifyConfig = 'ModifyConfig' # (since 1027) - TestDDARequest = 'TestDDARequest' # (since 1027) - TestDDAResponse = 'TestDDAResponse' # (since 1027) + TestDDARequest = 'TestDDARequest' # (since 1027) + TestDDAResponse = 'TestDDAResponse' # (since 1027) GenerateSSK = 'GenerateSSK' ClientPut = 'ClientPut' ClientPutDiskDir = 'ClientPutDiskDir' @@ -433,7 +444,7 @@ self.params = params def toString(self): - """Returns a string with the formated message, ready to be send""" + """Returns the message as formated string ready to be send""" # TODO: "Data" not yet implemented out = [self.name, ] for param, value in self.params.items(): @@ -447,7 +458,6 @@ for param, value in self.params.items(): out.append('>> %s=%s' % (param, value)) out.append('>>EndMessage') - out.append('') return '\n'.join(out) def __getitem__(self, name): @@ -508,6 +518,8 @@ self.jobTimeStart = time.time() self.jobClient.sendMessageEx(self.jobMessage) + + # XXX def handleStop(self, flagError, msg): """Called on job completion to stop the job @param flagError: True if an error was encountered, False otherwise @@ -569,13 +581,18 @@ """ def __init__(self, fcpClient, withMetaData=False, withVolantile=False): + """ + @param withMetaData: include meta data for each peer? + @param withVolantile: include volantile data for each peer? + @ivar jobResult: on job completion, will be a list containing all perrs as one 'Peer' message for each peer + """ message = Message( Message.ListPeers, WithMetadata=fcpBool(withMetaData), WithVolatile=fcpBool(withVolantile), ) JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) - + def handleMessage(self,msg): if msg.name == Message.EndListPeers: @@ -585,17 +602,72 @@ else: raise ValueError('Unexpected message: %s' % msg.name) + def handlePeer(self, msg): + if self.jobResult is None: + self.jobResult = [msg, ] + else: + self.jobResult.append(msg) return True def handleEndListPeers(self, msg): self.jobTimeStop = time.time() - self.jobResult = msg + if self.jobResult is None: + self.jobResult = [] self.jobClient.jobRemove(self.jobIdentifier) return True + +class JobListPeerNotes(JobBase): + """Lists all notes associated to a peer of the node + """ + + def __init__(self, fcpClient, identifier): + """ + @param identifier: identifier of the peer to list notes for (peer identity) + @ivar jobResult: on job completion will be a list containing all notes associated to the peer + + @note: notes are only available for darknet peers (opennet == false) + """ + + message = Message( + Message.ListPeerNotes, + NodeIdentifier=identifier + ) + JobBase.__init__(self, fcpClient, identifier, message) + + + def handleMessage(self,msg): + if msg.name == Message.EndListPeerNotes: + return self.handleEndListPeerNotes(msg) + elif msg.name == Message.PeerNote: + return self.handlePeerNote(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handlePeerNote(self, msg): + note = msg.get('NoteText', '') + if note: + note = base64.decodestring(note) + if self.jobResult is None: + self.jobResult = [note, ] + else: + self.jobResult.append(note) + return True + + + def handleEndListPeerNotes(self, msg): + self.jobTimeStop = time.time() + if self.jobResult is None: + self.jobResult = [] + self.jobClient.jobRemove(self.jobIdentifier) + return True + + + + #TODO: identifier collisions are not yet handled class JobGetFileInfo(JobBase): """Tries to retieve information about a file. If everything goes well @@ -735,7 +807,13 @@ """ - def __init__(self, fcpClient, directory, read=True, write=True): + def __init__(self, fcpClient, directory, read=False, write=False): + """ + + @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed) + """ + + if not os.path.isdir(directory): raise ValueError('No such directory: %r' % directory) @@ -747,8 +825,7 @@ ) JobBase.__init__(self, fcpClient, directory, message) self.jobTmpFile = None - - + def handleMessage(self, msg): if msg.name == Message.TestDDAReply: @@ -786,13 +863,50 @@ def handleTestDDAComplete(self, msg): self.jobTimeStop = time.time() - self.jobResult = msg + self.jobResult = ( + pythonBool(msg.get('ReadDirectoryAllowed', 'false')), + pythonBool(msg.get('WriteDirectoryAllowed', 'false')), + ) saveRemoveFile(self.jobTmpFile) self.jobTmpFile = None self.jobClient.jobRemove(self.jobIdentifier) return True + +class JobGenerateSSK(JobBase): + """Job to generate a SSK key pair + """ + + + def __init__(self, fcpClient): + """ + @ivar jobResult: on job completion, a tuple(insertURI, requestURI) of the generated + SSK key + """ + + identifier = newIdentifier() + message = Message( + Message.GenerateSSK, + Identifier=identifier, + ) + JobBase.__init__(self, fcpClient, identifier, message) + + + def handleMessage(self, msg): + if msg.name == Message.SSKKeypair: + return self.handleSSKKeypair(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + + def handleSSKKeypair(self, msg): + self.jobTimeStop = time.time() + self.jobResult = (msg['InsertURI'], msg['RequestURI']) + self.jobClient.jobRemove(self.jobIdentifier) + return True + + #************************************************************************** # fcp client #************************************************************************** @@ -840,20 +954,19 @@ with two params: SocketError + details. When the handler is called the client is already closed. @param verbosity: verbosity level for debugging - @param logMessages: LogMessages class containing messages + @param logMessages: LogMessages class containing message strings """ self._isConnected = False self._jobs = { - #TODO: check if JobList is still required - 'JobMapping': {}, - 'JobList': [], + 'Jobs': {}, + 'PendingJobs': [], 'RegisteredDirectories': [], } - self._errorHandler = errorHandler + self._errorHandler = errorHandler #TODO: check! self._log = logging.getLogger(name) self._logMessages = logMessages - self._lock = thread.allocate_lock() # lovk when resources are accessed + self._lock = thread.allocate_lock() # lock when resources are accessed self._socket = None self.setVerbosity(verbosity) @@ -933,7 +1046,7 @@ if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst: return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - elif code == ProtocolError.Shutdown: + elif code == ProtocolError.ShuttingDown: if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg): # ######################################## @@ -956,6 +1069,11 @@ identifier = msg['Directory'] elif msg.name == Message.TestDDAComplete: identifier = msg['Directory'] + elif msg.name == Message.PeerNote: + identifier = msg['NodeIdentifier'] + elif msg.name == Message.EndListPeerNotes: + identifier = msg['NodeIdentifier'] + else: identifier = msg.get('Identifier', None) @@ -971,6 +1089,13 @@ elif msg.name == Message.Peer: return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) + #elif msg.name == Message.PeerNote: + # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg) + + #elif msg.name == Message.EndListPeerNotes: + # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg) + + # more here..... else: @@ -993,9 +1118,12 @@ """ self._lock.acquire(True) try: - result = bool(self._jobs['JobList']) + result = self._jobs['Jobs'] or self._jobs['PendingJobs'] finally: self._lock.release() + + + return result @@ -1006,20 +1134,16 @@ """ self._lock.acquire(True) try: - if job.jobIdentifier in self._jobs['JobMapping']: - raise ValueError('Job with that identifier already present') - - if job.jobIdentifier in self._jobs['JobMapping']: + if job.jobIdentifier in self._jobs['Jobs']: raise ValueError('Duplicate job: %r' % job.jobIdentifier) - self._jobs['JobMapping'][job.jobIdentifier] = job - self._jobs['JobList'].append(job) + self._jobs['Jobs'][job.jobIdentifier] = job finally: self._lock.release() self._log.info(self._logMessages.JobStart + job.jobMessage.name) job.handleStart() if synchron: - while job.jobResult is None: + while self.jobGet(job.jobIdentifier): self.next() @@ -1042,7 +1166,7 @@ """ self._lock.acquire(True) try: - result = self._jobs['JobMapping'].get(identifier, None) + result = self._jobs['Jobs'].get(identifier, None) finally: self._lock.release() return result @@ -1055,7 +1179,7 @@ """ self._lock.acquire(True) try: - result = identifier in self._jobs['JobMapping'] + result = identifier in self._jobs['Jobs'] finally: self._lock.release() return result @@ -1068,10 +1192,9 @@ """ self._lock.acquire(True) try: - job = self._jobs['JobMapping'].get(identifier, None) + job = self._jobs['Jobs'].get(identifier, None) if job is not None: - self._jobs['JobList'].remove(job) - del self._jobs['JobMapping'][identifier] + del self._jobs['Jobs'][identifier] finally: self._lock.release() if job is None: @@ -1209,17 +1332,86 @@ self._log.setLevel(verbosity) + ######################################################## ## ######################################################## def getFileInfo(self, job): pass - def getFile(self, job): - pass + + + ######################################################### + ## boilerplate code to tackle TestDDA + ## + ## ...but I don't trust it ;-) I was not yet alble to wrap my head around + ## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not. + ## + ## Another problem is that there is no way to know when a directory is no longer + ## needed. And I fon't want to write code in a Gui to tackle a problem that will hopefully + ## go away in the near future. + ## + ## see: https://bugs.freenetproject.org/view.php?id=1753 + ## + ######################################################### + def testWriteAccess(self, directory): + canRead, canWrite = False, False + result = self._jobs.get('RegisteredDirectories', None) + if result is not None: + canRead, canWrite = result + if not canWrite: + job = JobTestDDA(directory, read=canRead, write=True) + self.addJob(job, synchron=True) + canWrite = job.jobResult[1] + self._jobs['RegisteredDirectories'] = (canRead, canWrite) + return canWrite + + def testReadAccess(self, directory): + canRead, canWrite = False, False + result = self._jobs.get('RegisteredDirectories', None) + if result is not None: + canRead, canWrite = result + if not canRead: + job = JobTestDDA(directory, read=True, write=canWrite) + self.addJob(job, synchron=True) + canRead = job.jobResult[0] + self._jobs['RegisteredDirectories'] = (canRead, canWrite) + return canRead + + + def downloadFile(self, directory, job): + if not os.path.isdir(directory): + raise IOError('No such directory') + + self._jobs['PendingJobs'].append(job) + try: + result = self.testWriteAccess(directory) + if result: + self.addJob(job) + finally: + self._jobs['PendingJobs'].remove(job) + return result + + def uploadFile(self, directory, job): + if not os.path.isdir(directory): + raise IOError('No such directory') + + self._jobs['PendingJobs'].append(job) + try: + result = self.testReadAccess(directory) + if result: + self.addJob(job) + finally: + self._jobs['PendingJobs'].remove(job) + return result + + + + + #***************************************************************************** # #***************************************************************************** @@ -1239,7 +1431,16 @@ #foo() + def foo(): + job = JobGenerateSSK(c) + c.jobAdd(job, synchron=True) + print job.jobResult + foo() + + + + def foo(): d = os.path.dirname(os.path.abspath(__file__)) job2 = JobTestDDA(c, d) c.jobAdd(job2) @@ -1250,12 +1451,21 @@ #foo() def foo(): - job2 = JobListPeers(c) - c.jobAdd(job2) + job = JobListPeers(c) + c.jobAdd(job) c.run() print '---------------------------' - print job2.jobResult + print job.jobResult print '---------------------------' + + + for peer in job.jobResult: + if not pythonBool(peer['opennet']): + job = JobListPeerNotes(c, peer['identity']) + c.jobAdd(job, synchron=True) + print '>>', job.jobResult + #.get('NoteText') + #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-27 17:00:16
|
Revision: 15 http://fclient.svn.sourceforge.net/fclient/?rev=15&view=rev Author: jUrner Date: 2007-10-27 10:00:21 -0700 (Sat, 27 Oct 2007) Log Message: ----------- started implementing public methods and events Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 16:59:07 UTC (rev 14) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 17:00:21 UTC (rev 15) @@ -21,6 +21,24 @@ import thread import uuid + +#--> rel import hack +def parentdir(n, fpath): + fpath = os.path.abspath(fpath) + for i in xrange(n): + fpath = os.path.dirname(fpath) + return fpath +sys.path.insert(0, parentdir(3, __file__)) + + +from fclient_lib.pyex import events + + +sys.path.pop(0) +del parentdir +#<-- rel import hack + + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) #************************************************************** # consts @@ -32,6 +50,14 @@ DefaultFcpPort = 9481 SocketTimeout = 0.1 + + +class Verbosity: + Debug = logging.DEBUG + Info = logging.INFO + Warning = logging.WARNING + + class FixedJobIdentifiers: """Fixed job identifiers @note: he client can only handle one job of these at a time @@ -44,6 +70,7 @@ ModifyConfig = 'ModifyConfig' WatchGlobal = 'WatchGlobal' Shutdown = 'Shutdown' + @@ -61,6 +88,23 @@ PriorityMin = Minimum PriorityDefault = Bulk + +class PeerNodeStatus: + Connected = 1 + RoutingBackedOff = 2 + TooNew = 3 + TooOld = 4 + Disconnected = 5 + NeverConnected = 6 + Disabled = 7 + Bursting = 8 + Listening = 9 + ListenOnly = 10 + ClockProblem = 11 + ConnError = 12 + Disconnecting = 13 + + #************************************************************************************ # exceptions #************************************************************************************ @@ -194,11 +238,13 @@ return 'false' -def newIdentifier(): +def newIdentifier(prefix=None): """Returns a new unique identifier @return: (str) uuid """ - return 'fclient::' + str(uuid.uuid4()) + if prefix: + return prefix + str(uuid.uuid4()) + return str(uuid.uuid4()) def pythonBool(fcpBool): @@ -580,7 +626,7 @@ """Lists all known peers of the node """ - def __init__(self, fcpClient, withMetaData=False, withVolantile=False): + def __init__(self, fcpClient, withMetaData=True, withVolantile=True): """ @param withMetaData: include meta data for each peer? @param withVolantile: include volantile data for each peer? @@ -594,6 +640,11 @@ JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) + def handleStart(self): + JobBase.handleStart(self) + self.jobClient.EventListPeers() + + def handleMessage(self,msg): if msg.name == Message.EndListPeers: return self.handleEndListPeers(msg) @@ -604,14 +655,16 @@ def handlePeer(self, msg): + self.jobClient.EventListNextPeer(msg.params) if self.jobResult is None: - self.jobResult = [msg, ] + self.jobResult = [msg.params, ] else: - self.jobResult.append(msg) + self.jobResult.append(msg.params) return True def handleEndListPeers(self, msg): + self.jobClient.EventEndListPeers() self.jobTimeStop = time.time() if self.jobResult is None: self.jobResult = [] @@ -639,6 +692,11 @@ JobBase.__init__(self, fcpClient, identifier, message) + def handleStart(self): + JobBase.handleStart(self) + self.jobClient.EventListPeerNotes() + + def handleMessage(self,msg): if msg.name == Message.EndListPeerNotes: return self.handleEndListPeerNotes(msg) @@ -649,6 +707,7 @@ def handlePeerNote(self, msg): note = msg.get('NoteText', '') + self.jobClient.EventListNextPeerNote(note) if note: note = base64.decodestring(note) if self.jobResult is None: @@ -659,6 +718,7 @@ def handleEndListPeerNotes(self, msg): + self.jobClient.EventEndListPeerNotes() self.jobTimeStop = time.time() if self.jobResult is None: self.jobResult = [] @@ -812,8 +872,6 @@ @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed) """ - - if not os.path.isdir(directory): raise ValueError('No such directory: %r' % directory) @@ -823,6 +881,7 @@ WantReadDirectory=fcpBool(read), WantWriteDirectory=fcpBool(write), ) + JobBase.__init__(self, fcpClient, directory, message) self.jobTmpFile = None @@ -832,10 +891,24 @@ return self.handleTestDDAReply(msg) elif msg.name == Message.TestDDAComplete: return self.handleTestDDAComplete(msg) + elif msg.name == Message.ProtocolError: + return self.handleProtocolError(msg) else: raise ValueError('Unexpected message: %s' % msg.name) + def handleProtocolError(self, msg): + # most likely code 7 here... + # "Both WantReadDirectory and WantWriteDirectory are set to false: what's the point of sending a message?" + # ..a stupid response that is ;-) + self.jobTimeStop = time.time() + self.jobClient.jobRemove(self.jobIdentifier) + if msg['Code'] == ProtocolError.InvalidMessage: + self.jobResult = (False, False) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handleTestDDAReply(self, msg): fpathWrite = msg.params.get('WriteFilename', None) fpathRead = msg.params.get('ReadFilename', None) @@ -938,14 +1011,25 @@ #TODO: do not mix directories as identifiers with identifiers (might lead to collisions) #TODO: how to handle (ProtocolError code 18: Shutting down)? -class FcpClient(object): +class FcpClient(events.Events): """Fcp client implementation""" + _events_ = ( + 'EventListPeers', + 'EventListNextPeer', + 'EventEndListPeers', + + 'EventListPeerNotes', + 'EventListNextPeerNote', + 'EventEndListPeerNotes', + + ) + def __init__(self, name='', errorHandler=None, - verbosity=logging.WARNING, + verbosity=Verbosity.Warning, logMessages=LogMessages ): """ @@ -963,7 +1047,7 @@ 'PendingJobs': [], 'RegisteredDirectories': [], } - self._errorHandler = errorHandler #TODO: check! + self._errorHandler = errorHandler #TODO: check if necessary! self._log = logging.getLogger(name) self._logMessages = logMessages self._lock = thread.allocate_lock() # lock when resources are accessed @@ -1065,6 +1149,7 @@ else: # check if the is something like an identifier in the message + #TODO: we run into troubles when using directories and NodeIdentifiers as identifiers if msg.name == Message.TestDDAReply: identifier = msg['Directory'] elif msg.name == Message.TestDDAComplete: @@ -1405,13 +1490,27 @@ finally: self._jobs['PendingJobs'].remove(job) return result - - - + + ################################################# + ## + ## public methods + ## + ################################################# + def peerList(self, synchron=False): + job = JobListPeers(self) + self.jobAdd(job, synchron=synchron) + return job.jobResult + + def peerNotes(self, peer, synchron=False): + if pythonBool(peer['opennet']): + return [] + job = JobListPeerNotes(self, peer['identity']) + self.jobAdd(job, synchron=synchron) + return job.jobResult + - #***************************************************************************** # #***************************************************************************** @@ -1436,7 +1535,7 @@ job = JobGenerateSSK(c) c.jobAdd(job, synchron=True) print job.jobResult - foo() + #foo() @@ -1451,29 +1550,18 @@ #foo() def foo(): - job = JobListPeers(c) - c.jobAdd(job) - c.run() - print '---------------------------' - print job.jobResult - print '---------------------------' + peers = c.peerList(synchron=True) + for peer in peers: + print c.peerNotes(peer, synchron=True) + + foo() - for peer in job.jobResult: - if not pythonBool(peer['opennet']): - job = JobListPeerNotes(c, peer['identity']) - c.jobAdd(job, synchron=True) - print '>>', job.jobResult - #.get('NoteText') - - #foo() - - def foo(): - job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip') + #job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip') - #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') #job.jobIdentifier = job.jobMessage['Identifier'] = 1 #job.jobMessage['Identifier'] = 1 #job.jobIdentifier = 1 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-27 20:59:41
|
Revision: 16 http://fclient.svn.sourceforge.net/fclient/?rev=16&view=rev Author: jUrner Date: 2007-10-27 13:59:45 -0700 (Sat, 27 Oct 2007) Log Message: ----------- bit of refactoring + play ClientHello as save as possible Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 17:00:21 UTC (rev 15) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 20:59:45 UTC (rev 16) @@ -1,4 +1,54 @@ '''Freenet client protocol 2.0 implementation + + +@newfield event, events + + + +Sample code:: + + client = FcpClient() + nodeHello = client.connect() + if nodeHello is not None: + # do whatever + + + +Most method calls can be made either synchron or asynchron:: + + peers = client.peerList(synchron=True) + for peer in peers: + # do whatever + + +To get informed about asynchron events you should connect the relevant events the client provides:: + + # connect to one single event + client.EventListNextPeer += MyCallback + + # connect to multiple events at once + client += ( + (client.EventListPeers, MyCallback1), + (client.EventEndListPeers, MyCallback2), + ) + + # each callback is called with the event as first parameter, followed by additional parameters, + # depending on the event triggered. + def MyListNextPeerCallback(event, peer): + print peer + + client.peerList(synchron=False) + + + # when event notifications are no longer required, you should always make shure to disconnect from them + client.EventListNextPeer -= MyCallback + client -= ( + (client.EventListPeers, MyCallback1), + (client.EventEndListPeers, MyCallback2), + ) + + + ''' #NOTE: @@ -478,6 +528,11 @@ SubscribedUSKUpdate = 'SubscribedUSKUpdate' + # client messages (internal use only) + ClientSocketTimeout = 0 + ClientSocketDied = 1 + + def __init__(self, name, data=None, **params): """ @param name: messge name @@ -489,15 +544,71 @@ self.name = name self.params = params - def toString(self): - """Returns the message as formated string ready to be send""" - # TODO: "Data" not yet implemented - out = [self.name, ] - for param, value in self.params.items(): - out.append('%s=%s' % (param, value)) - out.append('EndMessage\n') - return '\n'.join(out) + def get(self, name, default=None): + """Returns the message parameter 'name' or 'default' """ + return self.params.get(name, default) + + def __getitem__(self, name): + """Returns the message parameter 'name' """ + return self.params[name] + + def __setitem__(self, name, value): + """Sets the message parameter 'name' to 'value' """ + self.params[name] = value + + @classmethod + def fromSocket(clss, socketObj): + msg = clss(None) + buf = [] + while True: + + try: + p = socketObj.recv(1) + if not p: raise ValueError('Socket is dead') + except socket.timeout, d: # no new messages in queue + msg.name = clss.ClientSocketTimeOut + return msg + except Exception, d: + msg.name = clss.ClientSocketDied + msg['Exception'] = Exception + msg['Details'] = d + return msg + + if p == '\r': # ignore + continue + + if p != '\n': + buf.append(p) + continue + + line = ''.join(buf) + if line in ('End', "EndMessage"): + break + buf = [] + + if msg.name is None: + msg.name = line + elif line == 'Data': + n = int(msg.params['DataLength']) + try: + msg.data = socketObj.recv(n) + if not msg.data: raise ValueError('Socket is dead') + except Exception, d: + msg.name = clss.ClientSocketDied + msg['Exception'] = Exception + msg['Details'] = d + return msg + + else: + head, sep, tail = line.partition('=') + msg.params[head] = tail + if not sep: + # TODO: chek for invalid messages or not + pass + + return msg + def pprint(self): """Returns the message as nicely formated human readable string""" out = ['', '>>' + self.name, ] @@ -505,28 +616,19 @@ out.append('>> %s=%s' % (param, value)) out.append('>>EndMessage') return '\n'.join(out) - - def __getitem__(self, name): - """Returns the message parameter 'name' """ - return self.params[name] - - def get(self, name, default=None): - """Returns the message parameter 'name' or 'default' """ - return self.params.get(name, default) - - def __setitem__(self, name, value): - """Sets the message parameter 'name' to 'value' """ - self.params[name] = value - + def toString(self): + """Returns the message as formated string ready to be send""" + # TODO: "Data" not yet implemented + if isinstance(self.name, (int, long)): + raise ValueError('You can not send client internal messages to the node') + out = [self.name, ] + for param, value in self.params.items(): + out.append('%s=%s' % (param, value)) + out.append('EndMessage\n') + return '\n'.join(out) -class MessageSocketTimeout(Message): - - def __init__(self): - Message.__init__(self, 'USocketTimeOut') - - #************************************************************************** # jobs #************************************************************************** @@ -1012,7 +1114,8 @@ #TODO: how to handle (ProtocolError code 18: Shutting down)? class FcpClient(events.Events): - """Fcp client implementation""" + """Fcp client implementation + """ _events_ = ( 'EventListPeers', @@ -1094,17 +1197,25 @@ else: self._log.info(self._logMessages.Connected) + # send ClientHello and wait for NodeHello #NOTE: thought I could leave ClientHelloing up to the caller # but instad of responding with ClientHelloMustBeFirst - # as expected the socket simply breaks. So take it over. + # as expected when not doing so, the node disconnects. + # So take it over here. job = JobClientHello(self) - self.jobAdd(job, synchron=True) - assert job.jobResult is not None, 'ClientHello is not working as expected' - return job.jobResult - + self.jobAdd(job, synchron=False) + while time_elapsed <= repeat: + msg = self.next() + if msg.name == Message.ClientSocketTimeout: + time_elapsed += SocketTimeout + elif msg.name == Message.NodeHello: + return msg.params + else: + break + break + + # continue polling self._log.info(self._logMessages.ConnectionRetry) - - # continue polling time_elapsed += timeout time.sleep(timeout) @@ -1117,9 +1228,8 @@ @param msg: (Message) to handle @return: True if the message was handled, False otherwise """ - - - if msg.name == 'USocketTimeOut': + + if msg.name == Message.ClientSocketTimeout: return True self._log.debug(self._logMessages.MessageReceived + msg.pprint()) @@ -1190,8 +1300,7 @@ return self.jobDispatchMessage(identifier, msg) raise RuntimeError('Should not have endet here: %s' % msg.name) - - + ######################################################### @@ -1212,6 +1321,7 @@ return result + #TODO: not quite clear about the consequences of a synchron job. Have to think this over def jobAdd(self, job, synchron=False): """Adds a job to the client @param job: (Job*) job to add @@ -1288,87 +1398,28 @@ return True - #TODO: some info when all jobs are completed + #TODO: some info when all jobs are completed? def next(self): """Pumps the next message waiting @note: use this method instead of run() to run the client step by step """ - msg = self.readMessage() + msg = Message.fromSocket(self._socket) + if msg.name == Message.ClientSocketDied: + raise SocketError(msg['Details']) self.handleMessage(msg) return msg - def readMessage(self): - """Reads the next message directly from the socket and dispatches it - @return: (Message) the next message read from the socket - @raise SocketError: if the socket connection to the node dies unexpectedly - If an error handler is passed to the client it is called emidiately before the error - is raised. - """ - msg = Message(None) - buf = [] - while True: - - try: - p = self._socket.recv(1) - if not p: raise ValueError('Socket is dead') - except socket.timeout, d: # no new messages in queue - msg = MessageSocketTimeout() - break - except Exception, d: - self._log.info(self._logMessages.SocketDead) - self.close() - if self._errorHandler is not None: - self._errorHandler(SocketError, d) - raise SocketError(d) #!! - - if p == '\r': # ignore - continue - - if p != '\n': - buf.append(p) - continue - - line = ''.join(buf) - if line in ('End', "EndMessage"): - break - buf = [] - - if msg.name is None: - msg.name = line - elif line == 'Data': - n = int(msg.params['DataLength']) - try: - msg.data = self._socket.recv(n) - if not msg.data: raise ValueError('Socket is dead') - except Exception, d: - self._log.info(self._logMessages.SocketDead) - self.close() - if self._errorHandler is not None: - self._errorHandler(SocketError, d) - raise SocketError(d) #!! - - else: - head, sep, tail = line.partition('=') - msg.params[head] = tail - if not sep: - # TODO: chek for invalid messages or not - pass - - return msg - - + def run(self): """Runs the client untill all jobs passed to it are completed @note: use KeyboardInterrupt to stop prematurely """ try: #n = 0 - while self.hasJobsRunning(): #n += 1 #if n > 40: break self.next() - except KeyboardInterrupt: self._log(self._logMessages.KeyboardInterrupt) self.close() @@ -1429,7 +1480,7 @@ ######################################################### ## boilerplate code to tackle TestDDA ## - ## ...but I don't trust it ;-) I was not yet alble to wrap my head around + ## ...but I don't trust it ;-) I was not yet able to wrap my head around ## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not. ## ## Another problem is that there is no way to know when a directory is no longer @@ -1498,13 +1549,30 @@ ## ################################################# def peerList(self, synchron=False): + """Lists all peers of the node + @param synchron: if True, waits untill the call is completed, if False returns emidiately + @return: (list) of peers in a synchron, always None in an asynchron call + + @event: EventListPeers(event). + @event: EventListNextPeer(event, peer). + @event: EventEndListPeers(event). + """ job = JobListPeers(self) self.jobAdd(job, synchron=synchron) return job.jobResult def peerNotes(self, peer, synchron=False): - if pythonBool(peer['opennet']): + """Lists all text notes associated to a peer + @param peer: peer as returned in a call to L{peerList} + @param synchron: if True, waits untill the call is completed, if False returns emidiately + @return: (list) of notes in a synchron, always None in an asynchron call + + @event: EventListPeerNotes(event). + @event: EventListNextPeerNote(event, note). + @event: EventEndListPeerNotes(event). + """ + if pythonBool(peer['opennet']): # opennet peers do not have any notes associated return [] job = JobListPeerNotes(self, peer['identity']) self.jobAdd(job, synchron=synchron) @@ -1517,7 +1585,10 @@ if __name__ == '__main__': c = FcpClient(name='test', verbosity=logging.DEBUG) nodeHello = c.connect() - if nodeHello is not None: + if nodeHello is not None or 1: + + + def foo(): job1 = JobClientHello(c) c.jobAdd(job1) @@ -1529,8 +1600,13 @@ # should raise #foo() - + def foo(): + job = JobGenerateSSK(c) + c.jobAdd(job, synchron=True) + print job.jobResult + #foo() + def foo(): job = JobGenerateSSK(c) c.jobAdd(job, synchron=True) @@ -1554,7 +1630,7 @@ for peer in peers: print c.peerNotes(peer, synchron=True) - foo() + #foo() def foo(): This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-28 09:43:18
|
Revision: 17 http://fclient.svn.sourceforge.net/fclient/?rev=17&view=rev Author: jUrner Date: 2007-10-28 02:43:20 -0700 (Sun, 28 Oct 2007) Log Message: ----------- combed over message object ++ some bug fixes Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 20:59:45 UTC (rev 16) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 09:43:20 UTC (rev 17) @@ -328,8 +328,12 @@ """ if fpath is not None: if os.path.isfile(fpath): - os.remove(fpath) - return True + try: + os.remove(fpath) + except Exception, d: + pass + else: + return True return False @@ -465,6 +469,8 @@ class Message(object): """Class wrapping a freenet message""" + __slots__ = ('name', 'data', 'params') + # client messages ClientHello = 'ClientHello' ListPeer = 'ListPeer' # (since 1045) @@ -527,12 +533,11 @@ UnknownPeerNoteType = 'UnknownPeerNoteType' SubscribedUSKUpdate = 'SubscribedUSKUpdate' - # client messages (internal use only) ClientSocketTimeout = 0 ClientSocketDied = 1 + - def __init__(self, name, data=None, **params): """ @param name: messge name @@ -545,70 +550,102 @@ self.params = params + @classmethod + def bytesFromSocket(clss, socketObj, n): + """Reads n bytes from socket + @param socketObj: socket to read bytes from + @param n: (int) number of bytes to read + @return: (tuple) error-message or None, bytes read or None if an error occured + or no bytes could be read + """ + error = p = None + try: + p = socketObj.recv(n) + if not p: + p = None + raise socket.error('Socket shut down by node') + except socket.timeout, d: # no new messages in queue + error = clss(clss.ClientSocketTimeout) + except socket.error, d: + error = clss(clss.ClientSocketDied, Exception=socket.error, Details=d) + return error, p + + + @classmethod + def fromSocket(clss, socketObj): + """Reads a message from a socket + @param socketObj: socket to read a message from + @return: L{Message} next message from the socket. If the socket dies + unexpectedly a L{ClientSocketDied} message is returned containing the parameters + 'Exception' and 'Details'. If the socket times out a L{MessageClientSocketTimout} + message is returned. + """ + + msg = clss(None) + buf = [] + + #TODO: to buffer or not to buffer? + while True: + + # get next line from socket + error, p = clss.bytesFromSocket(socketObj, 1) + if error: + return error + + if p != '\n': + buf.append(p) + continue + #TODO: check if '\r\n' is allowed in freenet client protocol + else: + if buf[-1] == '\r': + del buf[-1] + + line = ''.join(buf) + buf = [] + if line == 'EndMessage': + break + + # first line == message name + if msg.name is None: + msg.name = line + + # get data member + elif line == 'Data': + remaining = int(msg.params['DataLength']) + msg.data = '' + while remaining > 0: + error, p = clss.bytesFromSocket(socketObj, remaining) + if error: + return error + remaining -= len(p) + msg.data += p + break + + # get next paramater + else: + head, sep, tail = line.partition('=') + msg.params[head] = tail + # TODO: errorchek params? + #if not sep: pass + + return msg + + def get(self, name, default=None): """Returns the message parameter 'name' or 'default' """ return self.params.get(name, default) + def __getitem__(self, name): """Returns the message parameter 'name' """ return self.params[name] + def __setitem__(self, name, value): """Sets the message parameter 'name' to 'value' """ self.params[name] = value - @classmethod - def fromSocket(clss, socketObj): - msg = clss(None) - buf = [] - while True: - - try: - p = socketObj.recv(1) - if not p: raise ValueError('Socket is dead') - except socket.timeout, d: # no new messages in queue - msg.name = clss.ClientSocketTimeOut - return msg - except Exception, d: - msg.name = clss.ClientSocketDied - msg['Exception'] = Exception - msg['Details'] = d - return msg - - if p == '\r': # ignore - continue - - if p != '\n': - buf.append(p) - continue - - line = ''.join(buf) - if line in ('End', "EndMessage"): - break - buf = [] - - if msg.name is None: - msg.name = line - elif line == 'Data': - n = int(msg.params['DataLength']) - try: - msg.data = socketObj.recv(n) - if not msg.data: raise ValueError('Socket is dead') - except Exception, d: - msg.name = clss.ClientSocketDied - msg['Exception'] = Exception - msg['Details'] = d - return msg - - else: - head, sep, tail = line.partition('=') - msg.params[head] = tail - if not sep: - # TODO: chek for invalid messages or not - pass - - return msg - + def pprint(self): """Returns the message as nicely formated human readable string""" out = ['', '>>' + self.name, ] @@ -617,6 +654,14 @@ out.append('>>EndMessage') return '\n'.join(out) + + def send(self, socketObj): + """Dumps the message to a socket + @param socketObj: socket to dump the message to + """ + socketObj.sendall(self.toString()) + + def toString(self): """Returns the message as formated string ready to be send""" # TODO: "Data" not yet implemented @@ -625,7 +670,18 @@ out = [self.name, ] for param, value in self.params.items(): out.append('%s=%s' % (param, value)) - out.append('EndMessage\n') + if self.data: + assert 'DataLength' in self.params, 'DataLength member required' + n = None + try: + n = int(self['DataLength']) + except ValueError: pass + assert n is not None, 'DataLength member must be an integer' + assert n == len(self.data), 'DataLength member must corrospond to lenght of data' + out.append('Data') + out.append(self.data) + else: + out.append('EndMessage\n') return '\n'.join(out) @@ -1448,7 +1504,7 @@ """ self._log.debug(self._logMessages.MessageSend + msg.pprint()) try: - self._socket.sendall(msg.toString()) + msg.send(self._socket) except socket.error, d: self._log.info(self._logMessages.SocketDead) self.close() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-28 21:56:17
|
Revision: 20 http://fclient.svn.sourceforge.net/fclient/?rev=20&view=rev Author: jUrner Date: 2007-10-28 14:56:19 -0700 (Sun, 28 Oct 2007) Log Message: ----------- some more cooments Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 09:45:09 UTC (rev 19) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 21:56:19 UTC (rev 20) @@ -51,14 +51,6 @@ ''' -#NOTE: -# -# downloading data to disk is not supported st the moment. TestDDA code is quite unwritable -# and as far as I can see there are plans to get rid of it. So wait... -# -# - - import atexit import base64 import logging @@ -688,6 +680,8 @@ #************************************************************************** # jobs #************************************************************************** +#TODO: maybe remove syncron functionality and rely only on signals +# ...if so, remove timeStart, timeStop aswell.. leave up to caller class JobBase(object): """Base class for jobs""" @@ -1165,10 +1159,9 @@ #TODO: no idea what happens on reconnect if socket died. What about running jobs? #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. #TODO: no idea if to add support for pending jobs and queue management here - +# #TODO: do not mix directories as identifiers with identifiers (might lead to collisions) #TODO: how to handle (ProtocolError code 18: Shutting down)? - class FcpClient(events.Events): """Fcp client implementation """ @@ -1316,6 +1309,7 @@ # check if the is something like an identifier in the message #TODO: we run into troubles when using directories and NodeIdentifiers as identifiers + # have to maintain extra queues to prevent this. jobDispatchMessage(queue='directories') if msg.name == Message.TestDDAReply: identifier = msg['Directory'] elif msg.name == Message.TestDDAComplete: @@ -1534,17 +1528,30 @@ ######################################################### - ## boilerplate code to tackle TestDDA + ## how to tackle TestDDA? ## - ## ...but I don't trust it ;-) I was not yet able to wrap my head around - ## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not. + ## best idea hear so far is to wait for ProtocolError 25 Test DDA denied (or PersistantGet) + ## and reinsert the job if necessary after TestDDA completion. ## - ## Another problem is that there is no way to know when a directory is no longer - ## needed. And I fon't want to write code in a Gui to tackle a problem that will hopefully - ## go away in the near future. + ## Problem is, how to wait for a message without flooding the caller. Basic idea of the client + ## is to enshure a Gui can stay responsitive by letting the caller decide when to process the + ## next message. So waiting would require to buffer messages and watch messages carefuly + ## as they flood in. + ## + ## If we do not wait, the caller may flood us with download requests, I fear, faster than + ## the node and we are able to go get the error and through the TestDDA drill. Have to + ## do some tests to see how the node reacts. ## - ## see: https://bugs.freenetproject.org/view.php?id=1753 + ## easiest approach would be to let the caller test a directory explicitely when HE thinks + ## it might be necessary. But then this code will hang around forever with an already + ## assigned bug report [https://bugs.freenetproject.org/view.php?id=1753] suggesting + ## much easier processing to test DDA (DDA Challenge) ## + ## + ## so.. maybe best is to lurker around a while and keep an eye on the tracker + ## + ## + ## below is just some old boilerplate code.. to be removed sooner or later ######################################################### def testWriteAccess(self, directory): canRead, canWrite = False, False This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-28 22:44:18
|
Revision: 21 http://fclient.svn.sourceforge.net/fclient/?rev=21&view=rev Author: jUrner Date: 2007-10-28 15:44:22 -0700 (Sun, 28 Oct 2007) Log Message: ----------- added some fixed identifier prefixes to avoid collisions Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 21:56:19 UTC (rev 20) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 22:44:22 UTC (rev 21) @@ -131,6 +131,9 @@ PriorityDefault = Bulk +#TODO: no idea how fcp handles strings as in <Peer volatile.status=CONNECTED> +# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED +# in --> freenet/node/PeerManager.java class PeerNodeStatus: Connected = 1 RoutingBackedOff = 2 @@ -147,6 +150,18 @@ Disconnecting = 13 +#TODO: see if we can get away with these to avoid collisions. TestDDA uses no prefix +# cos ProtocolError 7 passes the directory as identifier and there is no other hint +# that the error is related to TestDDA. +class IdentifierPrefix: + """Identifier prefixes""" + + ClientGet = 'ClientGet::' + #TestDDA = '' + PeerNote = 'PeerNote::' + + + #************************************************************************************ # exceptions #************************************************************************************ @@ -656,7 +671,8 @@ def toString(self): """Returns the message as formated string ready to be send""" - # TODO: "Data" not yet implemented + + #TODO: just a guess, so maybe remove this check if isinstance(self.name, (int, long)): raise ValueError('You can not send client internal messages to the node') out = [self.name, ] @@ -841,7 +857,7 @@ Message.ListPeerNotes, NodeIdentifier=identifier ) - JobBase.__init__(self, fcpClient, identifier, message) + JobBase.__init__(self, fcpClient, IdentifierPrefix.PeerNote + identifier, message) def handleStart(self): @@ -906,7 +922,7 @@ string and size may not be accurate. """ - identifier = newIdentifier() + identifier = IdentifierPrefix.ClientGet + newIdentifier() message = Message( Message.ClientGet, Identifier=identifier, @@ -1158,8 +1174,6 @@ #TODO: no idea what happens on reconnect if socket died. What about running jobs? #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. -#TODO: no idea if to add support for pending jobs and queue management here -# #TODO: do not mix directories as identifiers with identifiers (might lead to collisions) #TODO: how to handle (ProtocolError code 18: Shutting down)? class FcpClient(events.Events): @@ -1290,6 +1304,8 @@ return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) elif code == ProtocolError.ShuttingDown: + + #TODO: ??? why dispatch to ClientHello.. can't remember if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg): # ######################################## @@ -1308,39 +1324,26 @@ else: # check if the is something like an identifier in the message - #TODO: we run into troubles when using directories and NodeIdentifiers as identifiers - # have to maintain extra queues to prevent this. jobDispatchMessage(queue='directories') if msg.name == Message.TestDDAReply: identifier = msg['Directory'] elif msg.name == Message.TestDDAComplete: identifier = msg['Directory'] elif msg.name == Message.PeerNote: - identifier = msg['NodeIdentifier'] + identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier'] elif msg.name == Message.EndListPeerNotes: - identifier = msg['NodeIdentifier'] - + identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier'] else: identifier = msg.get('Identifier', None) # dispatch to jobs with fixed identifiers if identifier is None: - if msg.name == Message.NodeHello: return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - elif msg.name == Message.EndListPeers: return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Message.Peer: return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - #elif msg.name == Message.PeerNote: - # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg) - - #elif msg.name == Message.EndListPeerNotes: - # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg) - - # more here..... else: @@ -1349,7 +1352,7 @@ else: return self.jobDispatchMessage(identifier, msg) - raise RuntimeError('Should not have endet here: %s' % msg.name) + raise RuntimeError('We should not have gotten here: %s' % msg.name) @@ -1670,14 +1673,8 @@ #foo() - def foo(): - job = JobGenerateSSK(c) - c.jobAdd(job, synchron=True) - print job.jobResult - #foo() - + - def foo(): d = os.path.dirname(os.path.abspath(__file__)) job2 = JobTestDDA(c, d) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-30 15:11:01
|
Revision: 22 http://fclient.svn.sourceforge.net/fclient/?rev=22&view=rev Author: jUrner Date: 2007-10-30 08:11:02 -0700 (Tue, 30 Oct 2007) Log Message: ----------- Another major rewrite. Cut all down to a plain message handler and events. For now no way for now to wrap the protocol on a higher level without getting into deep troubles. Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 22:44:22 UTC (rev 21) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-30 15:11:02 UTC (rev 22) @@ -93,6 +93,11 @@ SocketTimeout = 0.1 +class IdentifierPrefix: + """Special purpose identifier prefixes""" + + FileInfo = 'FileInfo::' + class Verbosity: Debug = logging.DEBUG @@ -100,22 +105,6 @@ Warning = logging.WARNING -class FixedJobIdentifiers: - """Fixed job identifiers - @note: he client can only handle one job of these at a time - """ - ClientHello = 'ClientHello' - ListPeers = 'ListPeers' - ListPeerNotes = 'ListPeerNotes' - GetNode = 'GetNode' - GetConfig = 'GetConfig' - ModifyConfig = 'ModifyConfig' - WatchGlobal = 'WatchGlobal' - Shutdown = 'Shutdown' - - - - class Priorities: """All priorities supported by the client""" @@ -132,8 +121,8 @@ #TODO: no idea how fcp handles strings as in <Peer volatile.status=CONNECTED> -# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED -# in --> freenet/node/PeerManager.java +# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED +# in --> freenet/node/PeerManager.java class PeerNodeStatus: Connected = 1 RoutingBackedOff = 2 @@ -150,16 +139,10 @@ Disconnecting = 13 -#TODO: see if we can get away with these to avoid collisions. TestDDA uses no prefix -# cos ProtocolError 7 passes the directory as identifier and there is no other hint -# that the error is related to TestDDA. -class IdentifierPrefix: - """Identifier prefixes""" - - ClientGet = 'ClientGet::' - #TestDDA = '' - PeerNote = 'PeerNote::' +class PeerNoteType: + """All known peer note types""" + Private = '1' #************************************************************************************ @@ -170,7 +153,7 @@ def __init__(self, msg): """ - @param msg: (Message) GetFailed message + @param msg: (Message) GetFailed message or its parameters dict """ self.value = '%s (%s, %s)' % ( msg.get('CodeDescription', 'Unknown error') , @@ -214,7 +197,7 @@ def __init__(self, msg): """ - @param msg: (Message) PutFailed message + @param msg: (Message) PutFailed message or its parameters dict """ self.value = '%s (%s, %s)' % ( msg.get('CodeDescription', 'Unknown error') , @@ -240,7 +223,7 @@ def __init__(self, msg): """ - @param msg: (Message) ProtocolError message + @param msg: (Message) ProtocolError message or its parameters dict """ self.value = '%s (%s, %s)' % ( msg.get('CodeDescription', 'Unknown error') , @@ -279,22 +262,13 @@ CanNotPeerWithSelf = '28' PeerExists = '29' OpennetDisabled = '30' - DarknetOnly = '31' + DarknetPeerOnly = '31' class SocketError(Exception): pass +class FcpError(Exception): pass #********************************************************************** # functions #********************************************************************** -def fcpBool(pythonBool): - """Converts a python bool to a fcp bool - @param pythonBool: (bool) - @return: (str) 'true' or 'false' - """ - if pythonBool: - return 'true' - return 'false' - - def newIdentifier(prefix=None): """Returns a new unique identifier @return: (str) uuid @@ -304,14 +278,6 @@ return str(uuid.uuid4()) -def pythonBool(fcpBool): - """Converts a fcp bool to a python bool - @param pythonBool: 'true' or 'false' - @return: (bool) True or False - """ - return fcpBool == 'true' - - def saveReadFile(fpath): """Reads contents of a file in the savest manner possible @param fpath: file to write @@ -692,463 +658,7 @@ out.append('EndMessage\n') return '\n'.join(out) - #************************************************************************** -# jobs -#************************************************************************** -#TODO: maybe remove syncron functionality and rely only on signals -# ...if so, remove timeStart, timeStop aswell.. leave up to caller -class JobBase(object): - """Base class for jobs""" - - - def __init__(self, fcpClient, identifier, message): - """ - @param fcpClient: FcpClient() instance - @param identifier: (str) identifier of the job - @param message: (Message) to send to the node whne the job ist started - @ivar jobClient: FcpClient() instance of the job - @ivar jobIdentifier: identifier of the job - @ivar jobMessage: message to be send to the node - @ivar jobResult: if no error was encountered, holding the result of the job when complete - @ivar jobTimeStart: time the job was started - @ivar jobTimeStop: time the job was stopped - """ - self.jobClient = fcpClient - self.jobIdentifier = identifier - self.jobMessage = message - self.jobResult = None - self.jobTimeStart = 0 - self.jobTimeStop = 0 - - - def handleMessage(self, msg): - return False - - - def handleStart(self): - """Starts the job""" - self.jobResult = None - self.jobTimeStart = time.time() - self.jobClient.sendMessageEx(self.jobMessage) - - - # XXX - def handleStop(self, flagError, msg): - """Called on job completion to stop the job - @param flagError: True if an error was encountered, False otherwise - @param msg: (Message) to pass to the job - """ - self.jobTimeStop = time.time() - self.jobResult = (flagError, msg) - - -class JobClientHello(JobBase): - """Sed a ClientHello message to the node - - @note: this must be the first message passed to the node. If everything - goes well, you will get a NodeHello in response. - """ - - def __init__(self, fcpClient, name=None, expectedVersion='2.0'): - """ - @param name: (str) connection name or None, to use an arbitrary name - @param expectedVersion: (str) node version expected - """ - message = Message( - Message.ClientHello, - Name=name if name is not None else newIdentifier(), - ExpectedVersion=expectedVersion, - ) - JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ClientHello, message) - - def displayName(self): - return 'ClientHello' - - - def handleMessage(self, msg): - if msg.name == Message.NodeHello: - return self.handleNodeHello(msg) - elif msg.name == Message.ProtocolError: - return self.handleProtocolError(msg) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handleNodeHello(self, msg): - self.jobTimeStop = time.time() - self.jobResult = msg - self.jobClient.jobRemove(self.jobIdentifier) - return True - - - def handleProtocolError(self, msg): - raise ProtocolError(msg) - - - - - - -class JobListPeers(JobBase): - """Lists all known peers of the node - """ - - def __init__(self, fcpClient, withMetaData=True, withVolantile=True): - """ - @param withMetaData: include meta data for each peer? - @param withVolantile: include volantile data for each peer? - @ivar jobResult: on job completion, will be a list containing all perrs as one 'Peer' message for each peer - """ - message = Message( - Message.ListPeers, - WithMetadata=fcpBool(withMetaData), - WithVolatile=fcpBool(withVolantile), - ) - JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) - - - def handleStart(self): - JobBase.handleStart(self) - self.jobClient.EventListPeers() - - - def handleMessage(self,msg): - if msg.name == Message.EndListPeers: - return self.handleEndListPeers(msg) - elif msg.name == Message.Peer: - return self.handlePeer(msg) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handlePeer(self, msg): - self.jobClient.EventListNextPeer(msg.params) - if self.jobResult is None: - self.jobResult = [msg.params, ] - else: - self.jobResult.append(msg.params) - return True - - - def handleEndListPeers(self, msg): - self.jobClient.EventEndListPeers() - self.jobTimeStop = time.time() - if self.jobResult is None: - self.jobResult = [] - self.jobClient.jobRemove(self.jobIdentifier) - return True - - - -class JobListPeerNotes(JobBase): - """Lists all notes associated to a peer of the node - """ - - def __init__(self, fcpClient, identifier): - """ - @param identifier: identifier of the peer to list notes for (peer identity) - @ivar jobResult: on job completion will be a list containing all notes associated to the peer - - @note: notes are only available for darknet peers (opennet == false) - """ - - message = Message( - Message.ListPeerNotes, - NodeIdentifier=identifier - ) - JobBase.__init__(self, fcpClient, IdentifierPrefix.PeerNote + identifier, message) - - - def handleStart(self): - JobBase.handleStart(self) - self.jobClient.EventListPeerNotes() - - - def handleMessage(self,msg): - if msg.name == Message.EndListPeerNotes: - return self.handleEndListPeerNotes(msg) - elif msg.name == Message.PeerNote: - return self.handlePeerNote(msg) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - def handlePeerNote(self, msg): - note = msg.get('NoteText', '') - self.jobClient.EventListNextPeerNote(note) - if note: - note = base64.decodestring(note) - if self.jobResult is None: - self.jobResult = [note, ] - else: - self.jobResult.append(note) - return True - - - def handleEndListPeerNotes(self, msg): - self.jobClient.EventEndListPeerNotes() - self.jobTimeStop = time.time() - if self.jobResult is None: - self.jobResult = [] - self.jobClient.jobRemove(self.jobIdentifier) - return True - - - - -#TODO: identifier collisions are not yet handled -class JobGetFileInfo(JobBase): - """Tries to retieve information about a file. If everything goes well - - On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file. - Note, that both members may be '' (empty string) - """ - - - # idea is to provoke a GetFailed message and take mimetype and size from 'GetFailed' - def __init__(self, fcpClient, uri, **params): - """ - @param fcpClient: FcpClient() instance - @param uri: uri of the file to retrieve info for - @param params: additional parameters: - IgnoreDS='true' / 'false' - DSOnly='true' / 'false' - MaxRetries=-1 ...N - PriorityClass=Priority* - - @ivar jobResult: will be a tuple(bool error, data). If error is True, no information could be - retrieved and data will be a GetFailed message containing details. If error is False - data will be a tuple(str metadataContentType, str size). Note that both may be empty - string and size may not be accurate. - - """ - identifier = IdentifierPrefix.ClientGet + newIdentifier() - message = Message( - Message.ClientGet, - Identifier=identifier, - URI=uri, - # suggested by Mathew Toseland to use about 32k for mimeType requests - # basic sizes of keys are: 1k for SSks and 32k for CHKs - MaxSize='32000', - ReturnType='none', - Verbosity='1', - **params - ) - JobBase.__init__(self, fcpClient, identifier, message) - - - def getPrority(self): - return self.jobMessage.get('PriorityClass', Priorities.PriorityDefault) - - - def setPriority(self, priority): - if not priority in Priorities: - raise ValueError('Invalid priority: %r' % priority) - self.jobClient.sendMessage( - Message.ModifyPersistentRequest, - Identifier=self.jobIdentifier, - Global=fcpBool(False), - PriorityClass=priority, - ) - # not shure if the response arrives in any case, so set it here - self.jobMessage['PriorityClass'] = priority - - - def stopRequest(self): - self.jobClient.sendMessage( - Message.RemovePersistentRequest, - Global=fcpBool(False), - Identifier=self.jobIdentifier, - ) - - - def handleMessage(self, msg): - if msg.name == Message.DataFound: - return self.handleDataFound(msg) - elif msg.name == Message.GetFailed: - return self.handleGetFailed(msg) - elif msg.name == Message.IdentifierCollision: - return self.handleIdentifierCollision(msg) - elif msg.name == Message.PersistentRequestModified: - return self.handlePersistentRequestModified(msg) - elif msg.name == Message.PersistentRequestRemoved: - return self.handlePersistentRequestRemoved(msg) - elif msg.name == Message.SimpleProgress: - return self.handleSimpleProgress(msg) - - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handleDataFound(self, msg): - self.jobTimeStop = time.time() - self.jobResult = ( - False, - ( - msg.get('Metadata.ContentType', ''), - msg.get('DataLength', '') - ) - ) - self.jobClient.jobRemove(self.jobIdentifier) - return True - - def handleGetFailed(self, msg): - self.jobTimeStop = time.time() - if msg['Code'] == FetchError.TooBig: - self.jobResult = (False, msg) - self.jobResult = ( - False, - ( - msg.get('ExpectedMetadata.ContentType', ''), - msg.get('ExpectedDataLength', '') - ) - ) - else: - self.jobResult = (True, msg) - self.jobClient.jobRemove(self.jobIdentifier) - return True - - - def handleIdentifierCollision(self, msg): - raise - - - def handleSimpleProgress(self, msg): - return True - - - def handlePersistentRequestModified(self, msg): - priorityClass = msg.get('PriorityClass', None) - if priorityClass is not None: - self.jobMessage['PriorityClass'] = priorityClass - return True - - def handlePersistentRequestRemoved(self, msg): - if self.jobClient.jobIsRunning(self.jobIdentifier): - self.jobClient.jobStop(self.jobIdentifier) - return True - - -#TODO: handle case where directories are registered multiple times -class JobTestDDA(JobBase): - """Tests a directory for read / write accesss - """ - - - def __init__(self, fcpClient, directory, read=False, write=False): - """ - - @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed) - """ - if not os.path.isdir(directory): - raise ValueError('No such directory: %r' % directory) - - message = Message( - Message.TestDDARequest, - Directory=directory, - WantReadDirectory=fcpBool(read), - WantWriteDirectory=fcpBool(write), - ) - - JobBase.__init__(self, fcpClient, directory, message) - self.jobTmpFile = None - - - def handleMessage(self, msg): - if msg.name == Message.TestDDAReply: - return self.handleTestDDAReply(msg) - elif msg.name == Message.TestDDAComplete: - return self.handleTestDDAComplete(msg) - elif msg.name == Message.ProtocolError: - return self.handleProtocolError(msg) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handleProtocolError(self, msg): - # most likely code 7 here... - # "Both WantReadDirectory and WantWriteDirectory are set to false: what's the point of sending a message?" - # ..a stupid response that is ;-) - self.jobTimeStop = time.time() - self.jobClient.jobRemove(self.jobIdentifier) - if msg['Code'] == ProtocolError.InvalidMessage: - self.jobResult = (False, False) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handleTestDDAReply(self, msg): - fpathWrite = msg.params.get('WriteFilename', None) - fpathRead = msg.params.get('ReadFilename', None) - readContent = '' - if fpathWrite is not None: - written = saveWriteFile(fpathWrite, msg['ContentToWrite']) - if not written: - if os.path.isfile(fpathWrite): - os.remove(fpathWrite) - else: - self.jobTmpFile = fpathWrite - - if fpathRead is not None: - readContent = saveReadFile(fpathRead) - if readContent is None: - readContent = '' - - self.jobClient.sendMessage( - Message.TestDDAResponse, - Directory=msg['Directory'], - ReadContent=readContent, - ) - return True - - - def handleTestDDAComplete(self, msg): - self.jobTimeStop = time.time() - self.jobResult = ( - pythonBool(msg.get('ReadDirectoryAllowed', 'false')), - pythonBool(msg.get('WriteDirectoryAllowed', 'false')), - ) - saveRemoveFile(self.jobTmpFile) - self.jobTmpFile = None - self.jobClient.jobRemove(self.jobIdentifier) - return True - - - -class JobGenerateSSK(JobBase): - """Job to generate a SSK key pair - """ - - - def __init__(self, fcpClient): - """ - @ivar jobResult: on job completion, a tuple(insertURI, requestURI) of the generated - SSK key - """ - - identifier = newIdentifier() - message = Message( - Message.GenerateSSK, - Identifier=identifier, - ) - JobBase.__init__(self, fcpClient, identifier, message) - - - def handleMessage(self, msg): - if msg.name == Message.SSKKeypair: - return self.handleSSKKeypair(msg) - else: - raise ValueError('Unexpected message: %s' % msg.name) - - - def handleSSKKeypair(self, msg): - self.jobTimeStop = time.time() - self.jobResult = (msg['InsertURI'], msg['RequestURI']) - self.jobClient.jobRemove(self.jobIdentifier) - return True - - -#************************************************************************** # fcp client #************************************************************************** class LogMessages: @@ -1169,7 +679,7 @@ KeyboardInterrupt = 'Keyboard interrupt' - SocketDead = 'Socket is dead' + SocketDied = 'Socket died' #TODO: no idea what happens on reconnect if socket died. What about running jobs? @@ -1181,42 +691,62 @@ """ _events_ = ( + + #Peer related events 'EventListPeers', - 'EventListNextPeer', 'EventEndListPeers', + 'EventPeer', + 'EventPeerRemoved', + 'EventUnknownIdentifier', 'EventListPeerNotes', - 'EventListNextPeerNote', 'EventEndListPeerNotes', + 'EventPeerNote', + 'EventShutdown', + 'EventSocketDied', + + # get / put related events + 'EventIdentifierCollision', + + 'EventFileInfo', + 'EventFileInfoProgress', + + 'EventDataFound', + 'EventGetFailed', + 'EventSimpleProgress', + 'EventPersistentRequestModified', + 'EventPersistentRequestRemoved', + + + # others + 'EventSSKKeypair', + ) + + Version = '2.0' + FcpTrue = 'true' + FcpFalse = 'false' def __init__(self, name='', - errorHandler=None, + connectionName=None, verbosity=Verbosity.Warning, logMessages=LogMessages ): """ @param name: name of the client instance or '' (for debugging) - @param errorHandler: will be called if the socket conncetion to the node is dead - with two params: SocketError + details. When the handler is called the client - is already closed. + @param conectionName: name of the connection @param verbosity: verbosity level for debugging @param logMessages: LogMessages class containing message strings """ - self._isConnected = False - self._jobs = { - 'Jobs': {}, - 'PendingJobs': [], - 'RegisteredDirectories': [], - } - self._errorHandler = errorHandler #TODO: check if necessary! + self._connectionName = connectionName + self._ddaTmpFiles = [] self._log = logging.getLogger(name) self._logMessages = logMessages - self._lock = thread.allocate_lock() # lock when resources are accessed + self._lock = thread.allocate_lock() self._socket = None self.setVerbosity(verbosity) @@ -1230,6 +760,10 @@ if self._socket is not None: self._socket.close() self._socket = None + + # clean left over tmp files + for fpath in self._ddaTmpFiles: + saveRemoveFile(fpath) #TODO: an iterator would be nice to enshure Guis stay responsitive in the call @@ -1245,8 +779,8 @@ self._log.info(self._logMessages.Connecting) # poll untill freenet responds - time_elapsed = 0 - while time_elapsed <= repeat: + timeElapsed = 0 + while timeElapsed <= repeat: # try to Connect socket if self._socket is not None: @@ -1265,12 +799,15 @@ # but instad of responding with ClientHelloMustBeFirst # as expected when not doing so, the node disconnects. # So take it over here. - job = JobClientHello(self) - self.jobAdd(job, synchron=False) - while time_elapsed <= repeat: + self.sendMessage( + Message.ClientHello, + Name=self._connectionName if self._connectionName is not None else newIdentifier(), + ExpectedVersion=self.Version, + ) + while timeElapsed <= repeat: msg = self.next() if msg.name == Message.ClientSocketTimeout: - time_elapsed += SocketTimeout + timeElapsed += SocketTimeout elif msg.name == Message.NodeHello: return msg.params else: @@ -1279,7 +816,7 @@ # continue polling self._log.info(self._logMessages.ConnectionRetry) - time_elapsed += timeout + timeElapsed += timeout time.sleep(timeout) self._log.info(self._logMessages.ConnectingFailed) @@ -1294,190 +831,172 @@ if msg.name == Message.ClientSocketTimeout: return True - self._log.debug(self._logMessages.MessageReceived + msg.pprint()) - - + if msg.name == Message.ProtocolError: code = msg['Code'] - if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst: - return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - - elif code == ProtocolError.ShuttingDown: - - #TODO: ??? why dispatch to ClientHello.. can't remember - if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg): - - # ######################################## - #TODO: ??? - - return True - - else: - identifier = msg.get('Identifier', None) - if identifier is None: - #TODO: inform caller - raise ProtocolError(msg) - else: - return self.jobDispatchMessage(identifier, msg) - - else: + if code == ProtocolError.ShuttingDown: + self.close() + self.EventShutdown(msg.params) + return True - # check if the is something like an identifier in the message - if msg.name == Message.TestDDAReply: - identifier = msg['Directory'] - elif msg.name == Message.TestDDAComplete: - identifier = msg['Directory'] - elif msg.name == Message.PeerNote: - identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier'] - elif msg.name == Message.EndListPeerNotes: - identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier'] - else: - identifier = msg.get('Identifier', None) - - # dispatch to jobs with fixed identifiers - if identifier is None: - if msg.name == Message.NodeHello: - return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - elif msg.name == Message.EndListPeers: - return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - elif msg.name == Message.Peer: - return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) - - # more here..... - + raise ProtocolError(msg) + + #################################################### + ## + ## TestDDA + ## + ## Note: if both, ReadDirectoryAllowed and WriteDirectoryAllowed are + ## set to false, the node sends a ProtocolError (7, 'Invalid message'). + ## Have to handle this! + ## + #################################################### + elif msg.name == Message.TestDDAReply: + fpathWrite = msg.params.get('WriteFilename', None) + fpathRead = msg.params.get('ReadFilename', None) + readContent = '' + if fpathWrite is not None: + written = saveWriteFile(fpathWrite, msg['ContentToWrite']) + if not written: + saveRemoveFile(fpathWrite) else: - raise ValueError('Unhandled message: ' + msg.name) + self._ddaTmpFiles.append(fpathWrite) - else: - return self.jobDispatchMessage(identifier, msg) + if fpathRead is not None: + readContent = saveReadFile(fpathRead) + if readContent is None: + readContent = '' - raise RuntimeError('We should not have gotten here: %s' % msg.name) - + self.sendMessage( + Message.TestDDAResponse, + Directory=msg['Directory'], + ReadContent=readContent, + ) + return True + - - ######################################################### - ## jobs - ######################################################### - def hasJobsRunning(self): - """Checks if the client has running jobs - @return: (bool) True if so, False otherwise - """ - self._lock.acquire(True) - try: - result = self._jobs['Jobs'] or self._jobs['PendingJobs'] - finally: - self._lock.release() + elif msg.name == Message.TestDDAComplete: + # clean tmp files + for fpath in self._ddaTmpFiles: + saveRemoveFile(fpath) + self._ddaTmpFiles = [] + return True + + #################################################### + ## + ## Peer related messages + ## + #################################################### + elif msg.name == Message.EndListPeers: + self.EventEndListPeers(msg.params) + return True + + elif msg.name == Message.EndListPeerNotes: + self.EventEndListPeerNotes(msg.params) + return True + elif msg.name == Message.Peer: + self.EventPeer(msg.params) + return True + elif msg.name == Message.PeerNote: + note = msg.get('NoteText', '') + if note: + note = base64.decodestring(note) + msg['NoteText'] = note + self.EventPeerNote(msg.params, note) + return True + + elif msg.name == Message.PeerRemoved: + self.EventPeerRemoved(msg.params) + return True + + elif msg.name == Message.UnknownNodeIdentifier: + self.EventUnknownIdentifier(msg.params) + return True + + #################################################### + ## + ## Get related messages + ## + #################################################### + elif msg.name == Message.DataFound: + if msg['Identifier'].startswith(IdentifierPrefix.FileInfo): + self.EventFileInfo(msg.params) + return True + + #TODO: + self.EventDataFound(msg.params) + return True + - return result - - - #TODO: not quite clear about the consequences of a synchron job. Have to think this over - def jobAdd(self, job, synchron=False): - """Adds a job to the client - @param job: (Job*) job to add - @param synchron: if True, wait untill the job is completed, if False return emidiately - """ - self._lock.acquire(True) - try: - if job.jobIdentifier in self._jobs['Jobs']: - raise ValueError('Duplicate job: %r' % job.jobIdentifier) - self._jobs['Jobs'][job.jobIdentifier] = job - finally: - self._lock.release() - - self._log.info(self._logMessages.JobStart + job.jobMessage.name) - job.handleStart() - if synchron: - while self.jobGet(job.jobIdentifier): - self.next() + elif msg.name == Message.GetFailed: + code = msg['Code'] + if code == FetchError.TooBig: + if msg['Identifier'].startswith(IdentifierPrefix.FileInfo): + params = { + 'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''), + 'DataLength': msg.get('ExpectedDataLength', '') + } + self.EventFileInfo(params) + return True + + self.EventGetFailed(msg.params) + return True + + + elif msg.name == Message.SimpleProgress: + if msg['Identifier'].startswith(IdentifierPrefix.FileInfo): + self.EventFileInfoProgress(msg.params) + else: + self.EventSimpleProgress(msg.params) + return True + + + elif msg.name == Message.IdentifierCollision: + self.EventIdentifierCollision(msg.params) + return True + + elif msg.name == Message.PersistentRequestModified: + self.EventPersistentRequestModified(msg.params) + return True - def jobDispatchMessage(self, identifier, msg): - """Dispatches a message to a job - @param identifier: identifier of the job - @param msg: (Message) message to dispatch - @return: True if the message was handled, False otherwise - """ - job = self.jobGet(identifier) - if job is not None: - return job.handleMessage(msg) - return False + elif msg.name == Message.PersistentRequestRemoved: + self.EventPersistentRequestRemoved(msg.params) + return True - - def jobGet(self, identifier): - """Returns a job given its identifier - @param identifier: identifier of the job - @return: (Job*) instance or None, if no corrosponding job was found - """ - self._lock.acquire(True) - try: - result = self._jobs['Jobs'].get(identifier, None) - finally: - self._lock.release() - return result - - - def jobIsRunning(self, identifier): - """Checks if a job is running - @param identifier: identifier of the job - @return: True if so, False otherwise - """ - self._lock.acquire(True) - try: - result = identifier in self._jobs['Jobs'] - finally: - self._lock.release() - return result + #################################################### + ## + ## Others + ## + #################################################### + elif msg.name == Message.SSKKeypair: + self.EventSSKKeypair(msg.params) + return True - def jobRemove(self, identifier): - """Removes a job unconditionally - @param identifier: identifier of the job to remove - @return: True if the job was found, False otherwise - """ - self._lock.acquire(True) - try: - job = self._jobs['Jobs'].get(identifier, None) - if job is not None: - del self._jobs['Jobs'][identifier] - finally: - self._lock.release() - if job is None: - return False - self._log.info(self._logMessages.JobStop + job.jobMessage.name) - return True + ## default + return False - #TODO: some info when all jobs are completed? + ######################################################### + ## + ## + ## + ######################################################### def next(self): """Pumps the next message waiting @note: use this method instead of run() to run the client step by step """ msg = Message.fromSocket(self._socket) if msg.name == Message.ClientSocketDied: + self.EventSocketDied(msg['Exception'], msg['Details']) raise SocketError(msg['Details']) self.handleMessage(msg) return msg - - def run(self): - """Runs the client untill all jobs passed to it are completed - @note: use KeyboardInterrupt to stop prematurely - """ - try: - #n = 0 - while self.hasJobsRunning(): - #n += 1 - #if n > 40: break - self.next() - except KeyboardInterrupt: - self._log(self._logMessages.KeyboardInterrupt) - self.close() - def sendMessage(self, name, data=None, **params): """Sends a message to freenet @param name: name of the message to send @@ -1503,10 +1022,9 @@ try: msg.send(self._socket) except socket.error, d: - self._log.info(self._logMessages.SocketDead) + self._log.info(self._logMessages.SocketDied) self.close() - if self._errorHandler is not None: - self._errorHandler(SocketError, d) + self.EventSocketDied(socket.error, d) raise SocketError(d) return msg @@ -1520,16 +1038,120 @@ """""" self._log.setLevel(verbosity) + ######################################################### + ## + ## + ## + ######################################################### + def fcpBool(self, pythonBool): + """Converts a python bool to a fcp bool + @param pythonBool: (bool) + @return: (str) 'true' or 'false' + """ + return self.FcpTrue if pythonBool else self.FcpFalse + def pythonBool(self, fcpBool): + """Converts a fcp bool to a python bool + @param pythonBool: 'true' or 'false' + @return: (bool) True or False + """ + return fcpBool == self.FcpTrue + ######################################################## ## + ## Peer related methods + ## ######################################################## - def getFileInfo(self, job): - pass + def listPeer(self, identifier): + self.jobClient.sendMessage( + Message.ListPeer, + NodeIdentifier=identifier, + ) + + + def listPeerNotes(self, identifier): + """ + @param identifier: identifier of the peer to list notes for + """ + self.sendMessage( + Message.ListPeerNotes, + NodeIdentifier=identifier + ) + + + def listPeers(self, withMetaData=True, withVolantile=True): + """ + @param withMetaData: include meta data for each peer? + @param withVolantile: include volantile data for each peer? + """ + self.sendMessage( + Message.ListPeers, + WithMetadata=self.fcpBool(withMetaData), + WithVolatile=self.fcpBool(withVolantile), + ) + def modifyPeer(self, identifier, allowLocalAddresses=None, isDisabled=None, isListenOnly=None): + msg = Message( + Message.ModifyPeer, + NodeIdentifier=identifier, + ) + if allowLocalAddresses is not None: + msg['AllowLocalAddresses'] = self.fcpBool(allowLocalAddresses) + if isDisabled is not None: + msg['isDisabled'] = self.fcpBool(isDisabled) + if isListenOnly is not None: + msg['isListenOnly'] = self.fcpBool(isListenOnly) + self.jobClient.sendMessageEx(msg) + self.sendMessageEx(msg) + + def modifyPeerNote(self, identifier, note): + self.sendMessage( + Message.ModifyPeerNote, + NodeIdentifier=identifier, + #NOTE: currently fcp supports only this one type + PeerNoteType=PeerNoteType.Private, + NoteText=note + ) + + + def removePeer(self, identifier): + self.sendMessage( + Message.RemovePeer, + NodeIdentifier=identifier, + ) + + ########################################################## + ## + ## get / put related methods + ## + ########################################################## + def fileInfo(self, uri, **params): + """Requests info about a file + @param uri: uri of the file to request info about + @event: FileInfo(event, params). If success, params will contain a key 'Metadata.ContentType' + and 'DataLength'. Both may be '' (empty string) + @event: FileInfoProgress(event, params). Triggered instead of EventSimpleProgress + @note: for other events see: L{clientGet} + @return: (str) identifier of the request + """ + identifier = IdentifierPrefix.FileInfo + newIdentifier() + self.sendMessage( + Message.ClientGet, + Identifier=identifier, + URI=uri, + # suggested by Mathew Toseland to use about 32k for mimeType requests + # basic sizes of keys are: 1k for SSks and 32k for CHKs + MaxSize='32000', + ReturnType='none', + Verbosity='1', + **params + ) + return identifier + + ######################################################### ## how to tackle TestDDA? ## @@ -1638,7 +1260,7 @@ @event: EventListNextPeerNote(event, note). @event: EventEndListPeerNotes(event). """ - if pythonBool(peer['opennet']): # opennet peers do not have any notes associated + if self.pythonBool(peer['opennet']): # opennet peers do not have any notes associated return [] job = JobListPeerNotes(self, peer['identity']) self.jobAdd(job, synchron=synchron) @@ -1649,9 +1271,9 @@ # #***************************************************************************** if __name__ == '__main__': - c = FcpClient(name='test', verbosity=logging.DEBUG) + c = FcpClient(name='test', verbosity=Verbosity.Warning) nodeHello = c.connect() - if nodeHello is not None or 1: + if nodeHello is not None: @@ -1666,7 +1288,24 @@ # should raise #foo() + + #ModifyPeer not ok + + #RemovePeer not ok + + #ModifyPeerNote ok + + #ListPeer not ok + + def foo(): + job = JobListPeer(c, '123456') + c.jobAdd(job, synchron=True) + print job.jobResult + #foo() + + + def foo(): job = JobGenerateSSK(c) c.jobAdd(job, synchron=True) print job.jobResult @@ -1686,30 +1325,26 @@ #foo() def foo(): - peers = c.peerList(synchron=True) - for peer in peers: - print c.peerNotes(peer, synchron=True) + def cb(event, params): + #print params.get('opennet', 'true'), c.pythonBool(params.get('Opennet', 'true')), params['identity'] + if params['opennet'] == c.FcpFalse: + c.listPeerNotes(params['identity']) + + c.EventPeer += cb + + + c.listPeers() + for i in xrange(80): + c.next() #foo() def foo(): #job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip') + identifier = c.fileInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + for i in xrange(20): + c.next() - job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') - #job.jobIdentifier = job.jobMessage['Identifier'] = 1 - #job.jobMessage['Identifier'] = 1 - #job.jobIdentifier = 1 - c.jobAdd(job) - - #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%201%281%29.jpg') - #job.jobMessage['Identifier'] = 1 - #job.jobIdentifier = 1 - #c.jobAdd(job) - - c.run() - print '---------------------------' - print job.jobResult - print '---------------------------' #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-10-31 10:00:18
|
Revision: 23 http://fclient.svn.sourceforge.net/fclient/?rev=23&view=rev Author: jUrner Date: 2007-10-31 03:00:15 -0700 (Wed, 31 Oct 2007) Log Message: ----------- refactored the code and added some more protocol methods and events Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-30 15:11:02 UTC (rev 22) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-31 10:00:15 UTC (rev 23) @@ -1,10 +1,9 @@ -'''Freenet client protocol 2.0 implementation +"""Freenet client protocol 2.0 implementation @newfield event, events - Sample code:: client = FcpClient() @@ -12,45 +11,8 @@ if nodeHello is not None: # do whatever - +""" -Most method calls can be made either synchron or asynchron:: - - peers = client.peerList(synchron=True) - for peer in peers: - # do whatever - - -To get informed about asynchron events you should connect the relevant events the client provides:: - - # connect to one single event - client.EventListNextPeer += MyCallback - - # connect to multiple events at once - client += ( - (client.EventListPeers, MyCallback1), - (client.EventEndListPeers, MyCallback2), - ) - - # each callback is called with the event as first parameter, followed by additional parameters, - # depending on the event triggered. - def MyListNextPeerCallback(event, peer): - print peer - - client.peerList(synchron=False) - - - # when event notifications are no longer required, you should always make shure to disconnect from them - client.EventListNextPeer -= MyCallback - client -= ( - (client.EventListPeers, MyCallback1), - (client.EventEndListPeers, MyCallback2), - ) - - - -''' - import atexit import base64 import logging @@ -92,192 +54,9 @@ DefaultFcpPort = 9481 SocketTimeout = 0.1 - -class IdentifierPrefix: - """Special purpose identifier prefixes""" - - FileInfo = 'FileInfo::' - - -class Verbosity: - Debug = logging.DEBUG - Info = logging.INFO - Warning = logging.WARNING - - -class Priorities: - """All priorities supported by the client""" - - Maximum = '0' - Interactive = '1' - SemiInteractive = '2' - Updatable = '3' - Bulk = '4' - Prefetch = '5' - Minimum = '6' - - PriorityMin = Minimum - PriorityDefault = Bulk - - -#TODO: no idea how fcp handles strings as in <Peer volatile.status=CONNECTED> -# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED -# in --> freenet/node/PeerManager.java -class PeerNodeStatus: - Connected = 1 - RoutingBackedOff = 2 - TooNew = 3 - TooOld = 4 - Disconnected = 5 - NeverConnected = 6 - Disabled = 7 - Bursting = 8 - Listening = 9 - ListenOnly = 10 - ClockProblem = 11 - ConnError = 12 - Disconnecting = 13 - - - -class PeerNoteType: - """All known peer note types""" - Private = '1' - - -#************************************************************************************ -# exceptions -#************************************************************************************ -class FetchError(Exception): - """All fetch errors supported by the client""" - - def __init__(self, msg): - """ - @param msg: (Message) GetFailed message or its parameters dict - """ - self.value = '%s (%s, %s)' % ( - msg.get('CodeDescription', 'Unknown error') , - msg['Code'], - msg.get('ExtraDescription', '...'), - ) - def __str__(self): return self.value - - MaxArchiveRecursionExceeded = '1' - UnknownSplitfileMetadata = '2' - UnknownMetadata = '3' - InvalidMetadata = '4' - ArchiveFailure = '5' - BlockDecodeError = '6' - MaxMetadataLevelsExceeded = '7' - MaxArchiveRestartsExceeded = '8' - MaxRecursionLevelExceeded = '9' - NotAnArchve = '10' - TooManyMetastrings = '11' - BucketError = '12' - DataNotFound = '13' - RouteNotFound = '14' - RejectedOverload = '15' - TooManyRedirects = '16' - InternalError = '17' - TransferFailed = '18' - SplitfileError = '19' - InvalidUri = '20' - TooBig = '21' - MetadataTooBig = '22' - TooManyBlocks = '23' - NotEnoughMetastrings = '24' - Canceled = '25' - ArchiveRestart = '26' - PermanentRedirect = '27' - NotAllDataFound = '28' - - -class InsertError(Exception): - """All insert errors supported by the client""" - - def __init__(self, msg): - """ - @param msg: (Message) PutFailed message or its parameters dict - """ - self.value = '%s (%s, %s)' % ( - msg.get('CodeDescription', 'Unknown error') , - msg['Code'], - msg.get('ExtraDescription', '...'), - ) - def __str__(self): return self.value - - InvalidUri = '1' - BucketError = '2' - InternalError = '3' - RejectedOverload = '4' - RouteNotFound = '5' - FatalErrorInBlocks = '6' - TooManyRetriesInBlock = '7' - RouteReallyNotFound = '8' - Collision = '9' - Canceled = '10' - - -class ProtocolError(Exception): - """All protocol errors supported by the client""" - - def __init__(self, msg): - """ - @param msg: (Message) ProtocolError message or its parameters dict - """ - self.value = '%s (%s, %s)' % ( - msg.get('CodeDescription', 'Unknown error') , - msg['Code'], - msg.get('ExtraDescription', '...'), - ) - def __str__(self): return self.value - - ClientHelloMustBeFirst = '1' - NoLateClientHellos = '2' - MessageParseError = '3' - UriParseError = '4' - MissingField = '5' - ErrorParsingNumber = '6' - InvalidMessage = '7' - InvalidField = '8' - FileNotFound = '9' - DiskTargetExists = '10' - SameDirectoryExpected = '11' - CouldNotCreateFile = '12' - CouldNotWriteFile = '13' - CouldNotRenameFile = '14' - NoSuchIdentifier = '15' - NotSupported = '16' - InternalError = '17' - ShuttingDown = '18' - NoSuchNodeIdentifier = '19' # Unused since 995 - UrlParseError = '20' - ReferenceParseError = '21' - FileParseError = '22' - NotAFile = '23' - AccessDenied = '24' - DDADenied = '25' - CouldNotReadFile = '26' - ReferenceSignature = '27' - CanNotPeerWithSelf = '28' - PeerExists = '29' - OpennetDisabled = '30' - DarknetPeerOnly = '31' - -class SocketError(Exception): pass -class FcpError(Exception): pass #********************************************************************** -# functions +# helpers #********************************************************************** -def newIdentifier(prefix=None): - """Returns a new unique identifier - @return: (str) uuid - """ - if prefix: - return prefix + str(uuid.uuid4()) - return str(uuid.uuid4()) - - def saveReadFile(fpath): """Reads contents of a file in the savest manner possible @param fpath: file to write @@ -330,358 +109,14 @@ fp.close() return written -def startFreenet(cmdline): - """Starts freenet - @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat') - @return: (string) whatever freenet returns - """ - #TODO: on windows it may be necessary to hide the command window - p = subprocess.Popen( - args=cmdline, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - stdout, stderr = p.communicate() - return stdout - #********************************************************************** # classes #********************************************************************** -class FcpUri(object): - """Wrapper class for freenet uris""" - - - KeySSK = 'SSK@' - KeyKSK = 'KSK@' - KeyCHK = 'CHK@' - KeyUSK = 'USK@' - KeySVK = 'SVK@' - KeyUnknown = '' - KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK) - - ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I) - ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I) - - - def __init__(self, uri): - """ - @param uri: uri to wrap - @param cvar ReUriPattern: pattern matching a freenet uri - @param cvar ReKeyPattern: pattern matching the key type of a freenet uri - - @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible - - - >>> uri = FcpUri('freenet:SSK@foo/bar') - >>> str(uri) - 'SSK@foo/bar' - >>> uri.keyType() == FcpUri.KeySSK - True - >>> uri.split() - ('SSK@foo', 'bar') - >>> uri.fileName() - 'bar' - - >>> uri = FcpUri('http://SSK@foo/bar') - >>> str(uri) - 'SSK@foo/bar' - - # uris not containing freenet keys are left unchanged - >>> uri = FcpUri('http://foo/bar') - >>> str(uri) - 'http://foo/bar' - >>> uri.keyType() == FcpUri.KeyUnknown - True - >>> uri.split() - ('http://foo/bar', '') - >>> uri.fileName() - 'http://foo/bar' - - """ - self._uri = uri - - result = self.ReUriPattern.search(uri) - if result is not None: - self._uri = result.group(0) - - def __str__(self): - return str(self._uri) - - def __unicode__(self): - return unicode(self._uri) - - def keyType(self): - """Retuns the key type of the uri - @return: one of the Key* consts - """ - result = self.ReKeyPattern.search(self._uri) - if result is not None: - return result.group(0).upper() - return self.KeyUnknown - - def split(self): - """Splits the uri - @return: tuple(freenet-key, file-name) - """ - if self.keyType() != self.KeyUnknown: - head, sep, tail = self._uri.partition('/') - return head, tail - return self._uri, '' - - def fileName(self): - """Returns the filename part of the uri - @return: str - """ - head, tail = self.split() - if tail: - return tail - return self._uri - -class Message(object): - """Class wrapping a freenet message""" - - __slots__ = ('name', 'data', 'params') - - # client messages - ClientHello = 'ClientHello' - ListPeer = 'ListPeer' # (since 1045) - ListPeers = 'ListPeers' - ListPeerNotes = 'ListPeerNotes' - AddPeer = 'AddPeer' - ModifyPeer = 'ModifyPeer' - ModifyPeerNote = 'ModifyPeerNote' - RemovePeer = 'RemovePeer' - GetNode = 'GetNode' - GetConfig = 'GetConfig' # (since 1027) - ModifyConfig = 'ModifyConfig' # (since 1027) - TestDDARequest = 'TestDDARequest' # (since 1027) - TestDDAResponse = 'TestDDAResponse' # (since 1027) - GenerateSSK = 'GenerateSSK' - ClientPut = 'ClientPut' - ClientPutDiskDir = 'ClientPutDiskDir' - ClientPutComplexDir = 'ClientPutComplexDir' - ClientGet = 'ClientGet' - SubscribeUSK = 'SubscribeUSK' - WatchGlobal = 'WatchGlobal' - GetRequestStatus = 'GetRequestStatus' - ListPersistentRequests = 'ListPersistentRequests' - RemovePersistentRequest = 'RemovePersistentRequest' - ModifyPersistentRequest = 'ModifyPersistentRequest' - Shutdown = 'Shutdown' - - # node messages - NodeHello = 'NodeHello' - CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName' - Peer = 'Peer' - PeerNote = 'PeerNote' - EndListPeers = 'EndListPeers' - EndListPeerNotes = 'EndListPeerNotes' - PeerRemoved = 'PeerRemoved' - NodeData = 'NodeData' - ConfigData = 'ConfigData' # (since 1027) - TestDDAReply = 'TestDDAReply' # (since 1027) - TestDDAComplete = 'TestDDAComplete' # (since 1027) - SSKKeypair = 'SSKKeypair' - PersistentGet = 'PersistentGet' - PersistentPut = 'PersistentPut' - PersistentPutDir = 'PersistentPutDir' - URIGenerated = 'URIGenerated' - PutSuccessful = 'PutSuccessful' - PutFetchable = 'PutFetchable' - DataFound = 'DataFound' - AllData = 'AllData' - StartedCompression = 'StartedCompression' - FinishedCompression = 'FinishedCompression' - SimpleProgress = 'SimpleProgress' - EndListPersistentRequests = 'EndListPersistentRequests' - PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016) - PersistentRequestModified = 'PersistentRequestModified' # (since 1016) - PutFailed = 'PutFailed' - GetFailed = 'GetFailed' - ProtocolError = 'ProtocolError' - IdentifierCollision = 'IdentifierCollision' - UnknownNodeIdentifier = 'UnknownNodeIdentifier' - UnknownPeerNoteType = 'UnknownPeerNoteType' - SubscribedUSKUpdate = 'SubscribedUSKUpdate' - - # client messages (internal use only) - ClientSocketTimeout = 0 - ClientSocketDied = 1 - - - def __init__(self, name, data=None, **params): - """ - @param name: messge name - @param data: data associated to the messge (not yet implemented) - @param params: {field-name: value, ...} of parameters of the message - @note: all params can be accessed as attributes of the class - """ - self.data = data - self.name = name - self.params = params - - - @classmethod - def bytesFromSocket(clss, socketObj, n): - """Reads n bytes from socket - @param socketObj: socket to read bytes from - @param n: (int) number of bytes to read - @return: (tuple) error-message or None, bytes read or None if an error occured - or no bytes could be read - """ - error = p = None - try: - p = socketObj.recv(n) - if not p: - p = None - raise socket.error('Socket shut down by node') - except socket.timeout, d: # no new messages in queue - error = clss(clss.ClientSocketTimeout) - except socket.error, d: - error = clss(clss.ClientSocketDied, Exception=socket.error, Details=d) - return error, p - - - @classmethod - def fromSocket(clss, socketObj): - """Reads a message from a socket - @param socketObj: socket to read a message from - @return: L{Message} next message from the socket. If the socket dies - unexpectedly a L{ClientSocketDied} message is returned containing the parameters - 'Exception' and 'Details'. If the socket times out a L{MessageClientSocketTimout} - message is returned. - """ - - msg = clss(None) - buf = [] - - #TODO: to buffer or not to buffer? - while True: - - # get next line from socket - error, p = clss.bytesFromSocket(socketObj, 1) - if error: - return error - - if p != '\n': - buf.append(p) - continue - #TODO: check if '\r\n' is allowed in freenet client protocol - else: - if buf[-1] == '\r': - del buf[-1] - - line = ''.join(buf) - buf = [] - if line == 'EndMessage': - break - - # first line == message name - if msg.name is None: - msg.name = line - - # get data member - elif line == 'Data': - remaining = int(msg.params['DataLength']) - msg.data = '' - while remaining > 0: - error, p = clss.bytesFromSocket(socketObj, remaining) - if error: - return error - remaining -= len(p) - msg.data += p - break - - # get next paramater - else: - head, sep, tail = line.partition('=') - msg.params[head] = tail - # TODO: errorchek params? - #if not sep: pass - - return msg - - - def get(self, name, default=None): - """Returns the message parameter 'name' or 'default' """ - return self.params.get(name, default) - - - def __getitem__(self, name): - """Returns the message parameter 'name' """ - return self.params[name] - - - def __setitem__(self, name, value): - """Sets the message parameter 'name' to 'value' """ - self.params[name] = value - - - def pprint(self): - """Returns the message as nicely formated human readable string""" - out = ['', '>>' + self.name, ] - for param, value in self.params.items(): - out.append('>> %s=%s' % (param, value)) - out.append('>>EndMessage') - return '\n'.join(out) - - - def send(self, socketObj): - """Dumps the message to a socket - @param socketObj: socket to dump the message to - """ - socketObj.sendall(self.toString()) - - - def toString(self): - """Returns the message as formated string ready to be send""" - - #TODO: just a guess, so maybe remove this check - if isinstance(self.name, (int, long)): - raise ValueError('You can not send client internal messages to the node') - out = [self.name, ] - for param, value in self.params.items(): - out.append('%s=%s' % (param, value)) - if self.data: - assert 'DataLength' in self.params, 'DataLength member required' - n = None - try: - n = int(self['DataLength']) - except ValueError: pass - assert n is not None, 'DataLength member must be an integer' - assert n == len(self.data), 'DataLength member must corrospond to lenght of data' - out.append('Data') - out.append(self.data) - else: - out.append('EndMessage\n') - return '\n'.join(out) #************************************************************************** # fcp client #************************************************************************** -class LogMessages: - """Message strings used for log infos""" - Connecting = 'Connecting to node...' - Connected = 'Connected to node' - ConnectionRetry = 'Connecting to node failed... retrying' - ConnectingFailed = 'Connecting to node failed' - - ClientClose = 'Closing client' - - MessageSend = 'SendMessage' - MessageReceived = 'ReceivedMessage' - - JobStart = 'Starting job: ' - JobStop = 'Stopping job: ' - JobsCompleted = 'All jobs completed' - - - KeyboardInterrupt = 'Keyboard interrupt' - SocketDied = 'Socket died' - - #TODO: no idea what happens on reconnect if socket died. What about running jobs? #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. #TODO: do not mix directories as identifiers with identifiers (might lead to collisions) @@ -692,6 +127,10 @@ _events_ = ( + # config related events + 'EventConfigData', + 'EventNodeData', + #Peer related events 'EventListPeers', 'EventEndListPeers', @@ -707,18 +146,18 @@ 'EventSocketDied', # get / put related events + 'EventTestDDAComplete', 'EventIdentifierCollision', - 'EventFileInfo', - 'EventFileInfoProgress', + 'EventClientGetInfo', + 'EventClientGetInfoProgress', 'EventDataFound', 'EventGetFailed', 'EventSimpleProgress', 'EventPersistentRequestModified', 'EventPersistentRequestRemoved', - - + # others 'EventSSKKeypair', @@ -727,26 +166,513 @@ Version = '2.0' FcpTrue = 'true' FcpFalse = 'false' + class FetchError(Exception): + """All fetch errors supported by the client""" + + def __init__(self, msg): + """ + @param msg: (Message) GetFailed message or its parameters dict + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + def __str__(self): return self.value + + MaxArchiveRecursionExceeded = '1' + UnknownSplitfileMetadata = '2' + UnknownMetadata = '3' + InvalidMetadata = '4' + ArchiveFailure = '5' + BlockDecodeError = '6' + MaxMetadataLevelsExceeded = '7' + MaxArchiveRestartsExceeded = '8' + MaxRecursionLevelExceeded = '9' + NotAnArchve = '10' + TooManyMetastrings = '11' + BucketError = '12' + DataNotFound = '13' + RouteNotFound = '14' + RejectedOverload = '15' + TooManyRedirects = '16' + InternalError = '17' + TransferFailed = '18' + SplitfileError = '19' + InvalidUri = '20' + TooBig = '21' + MetadataTooBig = '22' + TooManyBlocks = '23' + NotEnoughMetastrings = '24' + Canceled = '25' + ArchiveRestart = '26' + PermanentRedirect = '27' + NotAllDataFound = '28' + + class FcpError(Exception): pass + class FcpUri(object): + """Wrapper class for freenet uris""" + + + KeySSK = 'SSK@' + KeyKSK = 'KSK@' + KeyCHK = 'CHK@' + KeyUSK = 'USK@' + KeySVK = 'SVK@' + KeyUnknown = '' + KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK) + + ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I) + ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I) + + + def __init__(self, uri): + """ + @param uri: uri to wrap + @param cvar ReUriPattern: pattern matching a freenet uri + @param cvar ReKeyPattern: pattern matching the key type of a freenet uri + + @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible + + + >>> uri = FcpUri('freenet:SSK@foo/bar') + >>> str(uri) + 'SSK@foo/bar' + >>> uri.keyType() == FcpUri.KeySSK + True + >>> uri.split() + ('SSK@foo', 'bar') + >>> uri.fileName() + 'bar' + + >>> uri = FcpUri('http://SSK@foo/bar') + >>> str(uri) + 'SSK@foo/bar' + + # uris not containing freenet keys are left unchanged + >>> uri = FcpUri('http://foo/bar') + >>> str(uri) + 'http://foo/bar' + >>> uri.keyType() == FcpUri.KeyUnknown + True + >>> uri.split() + ('http://foo/bar', '') + >>> uri.fileName() + 'http://foo/bar' + + """ + self._uri = uri + + result = self.ReUriPattern.search(uri) + if result is not None: + self._uri = result.group(0) + + def __str__(self): + return str(self._uri) + + def __unicode__(self): + return unicode(self._uri) + + def keyType(self): + """Retuns the key type of the uri + @return: one of the Key* consts + """ + result = self.ReKeyPattern.search(self._uri) + if result is not None: + return result.group(0).upper() + return self.KeyUnknown + + def split(self): + """Splits the uri + @return: tuple(freenet-key, file-name) + """ + if self.keyType() != self.KeyUnknown: + head, sep, tail = self._uri.partition('/') + return head, tail + return self._uri, '' + + def fileName(self): + """Returns the filename part of the uri + @return: str + """ + head, tail = self.split() + if tail: + return tail + return self._uri + + class IdentifierPrefix: + """Special purpose identifier prefixes""" + ClientGetInfo = 'ClientGetInfo::' + + class InsertError(Exception): + """All insert errors supported by the client""" + + def __init__(self, msg): + """ + @param msg: (Message) PutFailed message or its parameters dict + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + def __str__(self): return self.value + + InvalidUri = '1' + BucketError = '2' + InternalError = '3' + RejectedOverload = '4' + RouteNotFound = '5' + FatalErrorInBlocks = '6' + TooManyRetriesInBlock = '7' + RouteReallyNotFound = '8' + Collision = '9' + Canceled = '10' + + class LogMessages: + """Message strings used for log infos""" + Connecting = 'Connecting to node...' + Connected = 'Connected to node' + ConnectionRetry = 'Connecting to node failed... retrying' + ConnectingFailed = 'Connecting to node failed' + + ClientClose = 'Closing client' + + MessageSend = 'SendMessage' + MessageReceived = 'ReceivedMessage' + + JobStart = 'Starting job: ' + JobStop = 'Stopping job: ' + JobsCompleted = 'All jobs completed' + + KeyboardInterrupt = 'Keyboard interrupt' + SocketDied = 'Socket died' + + class Message(object): + """Class wrapping a freenet message""" + + __slots__ = ('name', 'data', 'params') + + # client messages + ClientHello = 'ClientHello' + ListPeer = 'ListPeer' # (since 1045) + ListPeers = 'ListPeers' + ListPeerNotes = 'ListPeerNotes' + AddPeer = 'AddPeer' + ModifyPeer = 'ModifyPeer' + ModifyPeerNote = 'ModifyPeerNote' + RemovePeer = 'RemovePeer' + GetNode = 'GetNode' + GetConfig = 'GetConfig' # (since 1027) + ModifyConfig = 'ModifyConfig' # (since 1027) + TestDDARequest = 'TestDDARequest' # (since 1027) + TestDDAResponse = 'TestDDAResponse' # (since 1027) + GenerateSSK = 'GenerateSSK' + ClientPut = 'ClientPut' + ClientPutDiskDir = 'ClientPutDiskDir' + ClientPutComplexDir = 'ClientPutComplexDir' + ClientGet = 'ClientGet' + SubscribeUSK = 'SubscribeUSK' + WatchGlobal = 'WatchGlobal' + GetRequestStatus = 'GetRequestStatus' + ListPersistentRequests = 'ListPersistentRequests' + RemovePersistentRequest = 'RemovePersistentRequest' + ModifyPersistentRequest = 'ModifyPersistentRequest' + Shutdown = 'Shutdown' + + # node messages + NodeHello = 'NodeHello' + CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName' + Peer = 'Peer' + PeerNote = 'PeerNote' + EndListPeers = 'EndListPeers' + EndListPeerNotes = 'EndListPeerNotes' + PeerRemoved = 'PeerRemoved' + NodeData = 'NodeData' + ConfigData = 'ConfigData' # (since 1027) + TestDDAReply = 'TestDDAReply' # (since 1027) + TestDDAComplete = 'TestDDAComplete' # (since 1027) + SSKKeypair = 'SSKKeypair' + PersistentGet = 'PersistentGet' + PersistentPut = 'PersistentPut' + PersistentPutDir = 'PersistentPutDir' + URIGenerated = 'URIGenerated' + PutSuccessful = 'PutSuccessful' + PutFetchable = 'PutFetchable' + DataFound = 'DataFound' + AllData = 'AllData' + StartedCompression = 'StartedCompression' + FinishedCompression = 'FinishedCompression' + SimpleProgress = 'SimpleProgress' + EndListPersistentRequests = 'EndListPersistentRequests' + PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016) + PersistentRequestModified = 'PersistentRequestModified' # (since 1016) + PutFailed = 'PutFailed' + GetFailed = 'GetFailed' + ProtocolError = 'ProtocolError' + IdentifierCollision = 'IdentifierCollision' + UnknownNodeIdentifier = 'UnknownNodeIdentifier' + UnknownPeerNoteType = 'UnknownPeerNoteType' + SubscribedUSKUpdate = 'SubscribedUSKUpdate' + + # client messages (internal use only) + ClientSocketTimeout = 0 + ClientSocketDied = 1 + + + def __init__(self, name, data=None, **params): + """ + @param name: messge name + @param data: data associated to the messge (not yet implemented) + @param params: {field-name: value, ...} of parameters of the message + @note: all params can be accessed as attributes of the class + """ + self.data = data + self.name = name + self.params = params + + + @classmethod + def bytesFromSocket(clss, socketObj, n): + """Reads n bytes from socket + @param socketObj: socket to read bytes from + @param n: (int) number of bytes to read + @return: (tuple) error-message or None, bytes read or None if an error occured + or no bytes could be read + """ + error = p = None + try: + p = socketObj.recv(n) + if not p: + p = None + raise socket.error('Socket shut down by node') + except socket.timeout, d: # no new messages in queue + error = clss(clss.ClientSocketTimeout) + except socket.error, d: + error = clss(clss.ClientSocketDied, Exception=socket.error, Details=d) + return error, p + + + @classmethod + def fromSocket(clss, socketObj): + """Reads a message from a socket + @param socketObj: socket to read a message from + @return: L{Message} next message from the socket. If the socket dies + unexpectedly a L{ClientSocketDied} message is returned containing the parameters + 'Exception' and 'Details'. If the socket times out a L{MessageClientSocketTimout} + message is returned. + """ + + msg = clss(None) + buf = [] + + #TODO: to buffer or not to buffer? + while True: + + # get next line from socket + error, p = clss.bytesFromSocket(socketObj, 1) + if error: + return error + + if p != '\n': + buf.append(p) + continue + #TODO: check if '\r\n' is allowed in freenet client protocol + else: + if buf[-1] == '\r': + del buf[-1] + + line = ''.join(buf) + buf = [] + if line == 'EndMessage': + break + + # first line == message name + if msg.name is None: + msg.name = line + + # get data member + elif line == 'Data': + remaining = int(msg.params['DataLength']) + msg.data = '' + while remaining > 0: + error, p = clss.bytesFromSocket(socketObj, remaining) + if error: + return error + remaining -= len(p) + msg.data += p + break + + # get next paramater + else: + head, sep, tail = line.partition('=') + msg.params[head] = tail + # TODO: errorchek params? + #if not sep: pass + + return msg + + + def get(self, name, default=None): + """Returns the message parameter 'name' or 'default' """ + return self.params.get(name, default) + + + def __getitem__(self, name): + """Returns the message parameter 'name' """ + return self.params[name] + + + def __setitem__(self, name, value): + """Sets the message parameter 'name' to 'value' """ + self.params[name] = value + + + def pprint(self): + """Returns the message as nicely formated human readable string""" + out = ['', '>>' + self.name, ] + for param, value in self.params.items(): + out.append('>> %s=%s' % (param, value)) + out.append('>>EndMessage') + return '\n'.join(out) + + + def send(self, socketObj): + """Dumps the message to a socket + @param socketObj: socket to dump the message to + """ + socketObj.sendall(self.toString()) + + + def toString(self): + """Returns the message as formated string ready to be send""" + + #TODO: just a guess, so maybe remove this check + if isinstance(self.name, (int, long)): + raise ValueError('You can not send client internal messages to the node') + out = [self.name, ] + for param, value in self.params.items(): + out.append('%s=%s' % (param, value)) + if self.data: + assert 'DataLength' in self.params, 'DataLength member required' + n = None + try: + n = int(self['DataLength']) + except ValueError: pass + assert n is not None, 'DataLength member must be an integer' + assert n == len(self.data), 'DataLength member must corrospond to lenght of data' + out.append('Data') + out.append(self.data) + else: + out.append('EndMessage\n') + return '\n'.join(out) + + + + #TODO: no idea how fcp handles strings as in <Peer volatile.status=CONNECTED> + # all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED + # in --> freenet/node/PeerManager.java + class PeerNodeStatus: + Connected = 1 + RoutingBackedOff = 2 + TooNew = 3 + TooOld = 4 + Disconnected = 5 + NeverConnected = 6 + Disabled = 7 + Bursting = 8 + Listening = 9 + ListenOnly = 10 + ClockProblem = 11 + ConnError = 12 + Disconnecting = 13 + + class PeerNoteType: + """All known peer note types""" + Private = '1' + + class Priorities: + """All priorities supported by the client""" + Maximum = '0' + Interactive = '1' + SemiInteractive = '2' + Updatable = '3' + Bulk = '4' + Prefetch = '5' + Minimum = '6' + + PriorityMin = Minimum + PriorityDefault = Bulk + + class ProtocolError(Exception): + """All protocol errors supported by the client""" + + def __init__(self, msg): + """ + @param msg: (Message) ProtocolError message or its parameters dict + """ + self.value = '%s (%s, %s)' % ( + msg.get('CodeDescription', 'Unknown error') , + msg['Code'], + msg.get('ExtraDescription', '...'), + ) + def __str__(self): return self.value + + ClientHelloMustBeFirst = '1' + NoLateClientHellos = '2' + MessageParseError = '3' + UriParseError = '4' + MissingField = '5' + ErrorParsingNumber = '6' + InvalidMessage = '7' + InvalidField = '8' + FileNotFound = '9' + DiskTargetExists = '10' + SameDirectoryExpected = '11' + CouldNotCreateFile = '12' + CouldNotWriteFile = '13' + CouldNotRenameFile = '14' + NoSuchIdentifier = '15' + NotSupported = '16' + InternalError = '17' + ShuttingDown = '18' + NoSuchNodeIdentifier = '19' # Unused since 995 + UrlParseError = '20' + ReferenceParseError = '21' + FileParseError = '22' + NotAFile = '23' + AccessDenied = '24' + DDADenied = '25' + CouldNotReadFile = '26' + ReferenceSignature = '27' + CanNotPeerWithSelf = '28' + PeerExists = '29' + OpennetDisabled = '30' + DarknetPeerOnly = '31' + + class SocketError(Exception): pass + class Verbosity: + Debug = logging.DEBUG + Info = logging.INFO + Warning = logging.WARNING + + def __init__(self, name='', connectionName=None, verbosity=Verbosity.Warning, - logMessages=LogMessages ): """ @param name: name of the client instance or '' (for debugging) @param conectionName: name of the connection @param verbosity: verbosity level for debugging - @param logMessages: LogMessages class containing message strings """ self._connectionName = connectionName self._ddaTmpFiles = [] self._log = logging.getLogger(name) - self._logMessages = logMessages - self._lock = thread.allocate_lock() self._socket = None self.setVerbosity(verbosity) @@ -756,7 +682,7 @@ """Closes the client @note: make shure to call close() when done with the client """ - self._log.info(self._logMessages.ClientClose) + self._log.info(self.LogMessages.ClientClose) if self._socket is not None: self._socket.close() self._socket = None @@ -764,8 +690,8 @@ # clean left over tmp files for fpath in self._ddaTmpFiles: saveRemoveFile(fpath) + - #TODO: an iterator would be nice to enshure Guis stay responsitive in the call def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): """Establishes the connection to a freenet node @@ -776,7 +702,7 @@ @return: (Message) NodeHello if successful,None otherwise """ self._clientHello = None - self._log.info(self._logMessages.Connecting) + self._log.info(self.LogMessages.Connecting) # poll untill freenet responds timeElapsed = 0 @@ -792,7 +718,7 @@ except Exception, d: pass else: - self._log.info(self._logMessages.Connected) + self._log.info(self.LogMessages.Connected) # send ClientHello and wait for NodeHello #NOTE: thought I could leave ClientHelloing up to the caller @@ -800,26 +726,26 @@ # as expected when not doing so, the node disconnects. # So take it over here. self.sendMessage( - Message.ClientHello, - Name=self._connectionName if self._connectionName is not None else newIdentifier(), + self.Message.ClientHello, + Name=self._connectionName if self._connectionName is not None else self.newIdentifier(), ExpectedVersion=self.Version, ) while timeElapsed <= repeat: msg = self.next() - if msg.name == Message.ClientSocketTimeout: + if msg.name == self.Message.ClientSocketTimeout: timeElapsed += SocketTimeout - elif msg.name == Message.NodeHello: + elif msg.name == self.Message.NodeHello: return msg.params else: break break # continue polling - self._log.info(self._logMessages.ConnectionRetry) + self._log.info(self.LogMessages.ConnectionRetry) timeElapsed += timeout time.sleep(timeout) - self._log.info(self._logMessages.ConnectingFailed) + self._log.info(self.LogMessages.ConnectingFailed) return None @@ -829,18 +755,18 @@ @return: True if the message was handled, False otherwise """ - if msg.name == Message.ClientSocketTimeout: + if msg.name == self.Message.ClientSocketTimeout: return True - self._log.debug(self._logMessages.MessageReceived + msg.pprint()) + self._log.debug(self.LogMessages.MessageReceived + msg.pprint()) - if msg.name == Message.ProtocolError: + if msg.name == self.Message.ProtocolError: code = msg['Code'] - if code == ProtocolError.ShuttingDown: + if code == self.ProtocolError.ShuttingDown: self.close() self.EventShutdown(msg.params) return True - raise ProtocolError(msg) + raise self.ProtocolError(msg) #################################################### ## @@ -851,7 +777,7 @@ ## Have to handle this! ## #################################################### - elif msg.name == Message.TestDDAReply: + elif msg.name == self.Message.TestDDAReply: fpathWrite = msg.params.get('WriteFilename', None) fpathRead = msg.params.get('ReadFilename', None) readContent = '' @@ -868,38 +794,51 @@ readContent = '' self.sendMessage( - Message.TestDDAResponse, + self.Message.TestDDAResponse, Directory=msg['Directory'], ReadContent=readContent, ) return True - - - elif msg.name == Message.TestDDAComplete: + + elif msg.name == self.Message.TestDDAComplete: # clean tmp files for fpath in self._ddaTmpFiles: saveRemoveFile(fpath) self._ddaTmpFiles = [] + self.EventTestDDAComplete(msg.params) return True #################################################### ## + ## Config related messages + ## + #################################################### + elif msg.name == self.Message.ConfigData: + self.EventConfigData(msg.params) + return True + + elif msg.name == self.Message.NodeData: + self.EventNodeData(msg.params) + return True + + #################################################### + ## ## Peer related messages ## #################################################### - elif msg.name == Message.EndListPeers: + elif msg.name == self.Message.EndListPeers: self.EventEndListPeers(msg.params) return True - elif msg.name == Message.EndListPeerNotes: + elif msg.name == self.Message.EndListPeerNotes: self.EventEndListPeerNotes(msg.params) return True - elif msg.name == Message.Peer: + elif msg.name == self.Message.Peer: self.EventPeer(msg.params) return True - elif msg.name == Message.PeerNote: + elif msg.name == self.Message.PeerNote: note = msg.get('NoteText', '') if note: note = base64.decodestring(note) @@ -907,11 +846,11 @@ self.EventPeerNote(msg.params, note) return True - elif msg.name == Message.PeerRemoved: + elif msg.name == self.Message.PeerRemoved: self.EventPeerRemoved(msg.params) return True - elif msg.name == Message.UnknownNodeIdentifier: + elif msg.name == self.Message.UnknownNodeIdentifier: self.EventUnknownIdentifier(msg.params) return True @@ -920,49 +859,44 @@ ## Get related messages ## #################################################### - elif msg.name == Message.DataFound: - if msg['Identifier'].startswith(IdentifierPrefix.FileInfo): - self.EventFileInfo(msg.params) + elif msg.name == self.Message.DataFound: + if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo): + self.EventClientGetInfo(msg.params) return True - #TODO: self.EventDataFound(msg.params) return True - - elif msg.name == Message.GetFailed: + elif msg.name == self.Message.GetFailed: code = msg['Code'] - if code == FetchError.TooBig: - if msg['Identifier'].startswith(IdentifierPrefix.FileInfo): + if code == self.FetchError.TooBig: + if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo): params = { 'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''), 'DataLength': msg.get('ExpectedDataLength', '') } - self.EventFileInfo(params) + self.EventClientGetInfo(params) return True self.EventGetFailed(msg.params) return True - - - elif msg.name == Message.SimpleProgress: - if msg['Identifier'].startswith(IdentifierPrefix.FileInfo): - self.EventFileInfoProgress(msg.params) + + elif msg.name == self.Message.SimpleProgress: + if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo): + self.EventClientGetInfoProgress(msg.params) else: self.EventSimpleProgress(msg.params) return True - - elif msg.name == Message.IdentifierCollision: + elif msg.name == self.Message.IdentifierCollision: self.EventIdentifierCollision(msg.params) return True - - elif msg.name == Message.PersistentRequestModified: + elif msg.name == self.Message.PersistentRequestModified: self.EventPersistentRequestModified(msg.params) return True - elif msg.name == Message.PersistentRequestRemoved: + elif msg.name == self.Message.PersistentRequestRemoved: self.EventPersistentRequestRemoved(msg.params) return True @@ -971,15 +905,41 @@ ## Others ## #################################################### - elif msg.name == Message.SSKKeypair: + elif msg.name == self.Message.SSKKeypair: self.EventSSKKeypair(msg.params) return True + - - - ## default + ## default ## return False + + def setVerbosity(self, verbosity): + """Sets the verbosity level of the client + @note: see L{Verbosity} + """ + self._log.setLevel(verbosity) + + + def startFreenet(self, cmdline): + """Starts freenet + @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat') + @return: (string) whatever freenet returns + """ + #TODO: on windows it may be necessary to hide the command window + p = subprocess.Popen( + args=cmdline, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = p.communicate() + return stdout + + + def verbosity(self): + """Returns the current verbosity level of the client""" + return self._log.level ######################################################### ## ## @@ -989,8 +949,8 @@ """Pumps the next message waiting @note: use this method instead of run() to run the client step by step """ - msg = Message.fromSocket(self._socket) - if msg.name == Message.ClientSocketDied: + msg = self.Message.fromSocket(self._socket) + if msg.name == self.Message.ClientSocketDied: self.EventSocketDied(msg['Exception'], msg['Details']) raise SocketError(msg['Details']) self.handleMessage(msg) @@ -1007,7 +967,7 @@ If an error handler is passed to the client it is called emidiately before the error is raised. """ - return self.sendMessageEx(Message(name, data=data, **params)) + return self.sendMessageEx(self.Message(name, data=data, **params)) def sendMessageEx(self, msg): @@ -1018,26 +978,16 @@ If an error handler is passed to the client it is called emidiately before the error is raised. """ - self._log.debug(self._logMessages.MessageSend + msg.pprint()) + self._log.debug(self.LogMessages.MessageSend + msg.pprint()) try: msg.send(self._socket) except socket.error, d: - self._log.info(self._logMessages.SocketDied) + self._log.info(self.LogMessages.SocketDied) self.close() self.EventSocketDied(socket.error, d) raise SocketError(d) return msg - - - def setLogMessages(self, logMessages): - """""" - self._logMessages = logMessages - - - def setVerbosity(self, verbosity): - """""" - self._log.setLevel(verbosity) - + ######################################################### ## ## @@ -1049,15 +999,56 @@ @return: (str) 'true' or 'false' """ return self.FcpTrue if pythonBool else self.FcpFalse - + + def newIdentifier(self, prefix=None): + """Returns a new unique identifier + @return: (str) uuid + """ + if prefix: + return prefix + str(uuid.uuid4()) + return str(uuid.uuid4()) + + def pythonBool(self, fcpBool): """Converts a fcp bool to a python bool @param pythonBool: 'true' or 'false' @return: (bool) True or False """ return fcpBool == self.FcpTrue + + ######################################################## + ## + ## Config related methods + ## + ######################################################## + #TODO: WithDefault never returns defaults + def getConfig(self): + """ + @event: ConfigData(event, params) + """ + self.sendMessage( + self.Message.GetConfig, + WithCurrent=self.FcpTrue, + WithDefault=self.FcpTrue, + WithExpertFlag=self.FcpTrue, + WithForceWriteFlag=self.FcpTrue, + WithShortDescription=self.FcpTrue, + WithLongDescription=self.FcpTrue, + ) + + def getNode(self): + """ + @event: NodeData(event, params) + """ + self.sendMessage( + self.Message.GetNode, + WithPrivate==self.FcpTrue, + WithVlatile==self.FcpTrue, + GiveOpennetRef==self.FcpTrue, + ) + ######################################################## ## ## Peer related methods @@ -1065,28 +1056,35 @@ ######################################################## def listPeer(self, identifier): self.jobClient.sendMessage( - Message.ListPeer, + self.Message.ListPeer, NodeIdentifier=identifier, ) def listPeerNotes(self, identifier): + """Lists all text notes associated to a peer + @param identifier: peer as returned in a call to L{peerList} + @event: EventListPeerNotes(event). + @event: EventListPeerNote(event, note). + @event: EventEndListPeerNotes(event). """ - @param identifier: identifier of the peer to list notes for - """ self.sendMessage( - Message.ListPeerNotes, + self.Message.ListPeerNotes, NodeIdentifier=identifier ) def listPeers(self, withMetaData=True, withVolantile=True): - """ + """Lists all peers of the node @param withMetaData: include meta data for each peer? @param withVolantile: include volantile data for each peer? + + @event: EventListPeers(event). + @event: EvenPeer(event, peer). + @event: EventEndListPeers(event). """ self.sendMessage( - Message.ListPeers, + self.Message.ListPeers, WithMetadata=self.fcpBool(withMetaData), WithVolatile=self.fcpBool(withVolantile), ) @@ -1094,7 +1092,7 @@ def modifyPeer(self, identifier, allowLocalAddresses=None, isDisabled=None, isListenOnly=None): msg = Message( - Message.ModifyPeer, + self.Message.ModifyPeer, NodeIdentifier=identifier, ) if allowLocalAddresses is not None: @@ -1109,17 +1107,17 @@ def modifyPeerNote(self, identifier, note): self.sendMessage( - Message.ModifyPeerNote, + self.Message.ModifyPeerNote, NodeIdentifier=identifier, #NOTE: currently fcp supports only this one type - PeerNoteType=PeerNoteType.Private, + PeerNoteType=self.PeerNoteType.Private, NoteText=note ) def removePeer(self, identifier): self.sendMessage( - Message.RemovePeer, + self.Message.RemovePeer, NodeIdentifier=identifier, ) @@ -1128,18 +1126,46 @@ ## get / put related methods ## ########################################################## - def fileInfo(self, uri, **params): + #TODO: not complete yet + def clientGetFile(self, uri, filename): + """ + """ + identifier = self.new_identifier() + msg = self.Message( + self.Message.ClientGet, + IgnoreDS='false', + DSOnly='false', + URI=uri, + Identifier=identifier, + Verbosity='1', + ReturnType='disk', + #MaxSize=client_get_info['Size'], + #MaxTempSize=client_get_info['Size'], + #MaxRetries='-1', + #PriorityClass='4', + Persistence='forever', + #ClientToken=identifier, + Global='false', + #BinaryBlob='false', + Filename=filename, + ) + self.sendMessageEx(msg) + + return identifier + + + def clientGetInfo(self, uri, **params): """Requests info about a file @param uri: uri of the file to request info about - @event: FileInfo(event, params). If success, params will contain a key 'Metadata.ContentType' + @event: clientGetInfo(event, params). If success, params will contain a key 'Metadata.ContentType' and 'DataLength'. Both may be '' (empty string) - @event: FileInfoProgress(event, params). Triggered instead of EventSimpleProgress + @event: clientGetInfoProgress(event, params). Triggered instead of EventSimpleProgress @note: for other events see: L{clientGet} - @return: (str) identifier of the request + @return: (str) request identifier """ - identifier = IdentifierPrefix.FileInfo + newIdentifier() + identifier = self.IdentifierPrefix.ClientGetInfo + self.newIdentifier() self.sendMessage( - Message.ClientGet, + self.Message.ClientGet, Identifier=identifier, URI=uri, # suggested by Mathew Toseland to use about 32k for mimeType requests @@ -1152,199 +1178,152 @@ return identifier - ######################################################### - ## how to tackle TestDDA? - ## - ## best idea hear so far is to wait for ProtocolError 25 Test DDA denied (or PersistantGet) - ## and reinsert the job if necessary after TestDDA completion. - ## - ## Problem is, how to wait for a message without flooding the caller. Basic idea of the client - ## is to enshure a Gui can stay responsitive by letting the caller decide when to process the - ## next message. So waiting would require to buffer messages and watch messages carefuly - ## as they flood in. - ## - ## If we do not wait, the caller may flood us with download requests, I fear, faster than - ## the node and we are able to go get the error and through the TestDDA drill. Have to - ## do some tests to see how the node reacts. - ## - ## easiest approach would be to let the caller test a directory explicitely when HE thinks - ## it might be necessary. But then this code will hang around forever with an already - ## assigned bug report [https://bugs.freenetproject.org/view.php?id=1753] suggesting - ## much easier processing to test DDA (DDA Challenge) - ## - ## - ## so.. maybe best is to lurker around a while and keep an eye on the tracker - ## - ## - ## below is just some old boilerplate code.. to be removed sooner or later - ######################################################### - def testWriteAccess(self, directory): - canRead, canWrite = False, False - result = self._jobs.get('RegisteredDirectories', None) - if result is not None: - canRead, canWrite = result - if not canWrite: - job = JobTestDDA(directory, read=canRead, write=True) - self.addJob(job, synchron=True) - canWrite = job.jobResult[1] - self._jobs['RegisteredDirectories'] = (canRead, canWrite) - return canWrite - - def testReadAccess(self, directory): - canRead, canWrite = False, False - result = self._jobs.get('RegisteredDirectories', None) - if result is not None: - canRead, canWrite = result - if not canRead: - job = JobTestDDA(directory, read=True, write=canWrite) - self.addJob(job, synchron=True) - canRead = job.jobResult[0] - self._jobs['RegisteredDirectories'] = (canRead, canWrite) - return canRead - - - def downloadFile(self, directory, job): - if not os.path.isdir(directory): - raise IOError('No such directory') - - self._jobs['PendingJobs'].append(job) - try: - result = self.testWriteAccess(directory) - if result: - self.addJob(job) - finally: - self._jobs['PendingJobs'].remove(job) - return result + def testDDA(self, directory, wantReadDirectory=None, wantWriteDirectory=None): + """Tests a directory for read / write access + @param directory: directory to test + @param read: if not Note, test directory for read access + @param write: if not Note, test directory for write access + @event: TestDDAComplete(event, params) is triggered on test completion - - def uploadFile(self, directory, job): - if not os.path.isdir(directory): - raise IOError('No such directory') - - self._jobs['PendingJobs'].append(job) - try: - result = self.testReadAccess(directory) - if result: - self.addJob(job) - finally: - self._jobs['PendingJobs'].remove(job) - return result - - - ################################################# + @note: you have to test a directory if it can bew written to before downloading files ito it + and a directory for read access before uploading content from it + @note: the node does not like both parameters being False and will respond with a protocol error in this + case. Take care of that. + """ + msg = self.Message( + self.Message.TestDDARequest, + Directory=directory, + ) + if wantReadDirectory is not None: + msg['WantReadDirectory'] = self.fcpBool(wantReadDirectory) + if wantWriteDirectory is not None: + msg['WantWriteDirectory'] = self.fcpBool(wantWriteDirectory) + self.sendMessageEx(msg) + ########################################################## ## - ## public methods + ## others ## - ################################################# - def peerList(self, synchron=False): - """Lists all peers of the node - @param synchron: if True, waits untill the call is completed, if False returns emidiately - @return: (list) of peers in a synchron, always None in an asynchron call - - @event: EventListPeers(event). - @event: EventListNextPeer(event, peer). - @event: EventEndListPeers(event). + ########################################################## + def generateSSK(self): """ - job = JobListPeers(self) - self.jobAdd(job, synchron=synchron) - return job.jobResult - - - def peerNotes(self, peer, synchron=False): - """Lists all text notes associated to a peer - @param peer: peer as returned in a call to L{peerList} - @param synchron: if True, waits untill the call is completed, if False returns emidiately - @return: (list) of notes in a synchron, always None in an asynchron call - - @event: EventListPeerNotes(event). - @event: EventListNextPeerNote(event, note). - @event: EventEndListPeerNotes(event). + @event: SSKKeypair(event, params), triggered when the request is complete + @return: identifier of the request """ - if self.pythonBool(peer['opennet']): # opennet peers do not have any notes associated - return [] - job = JobListPeerNotes(self, peer['identity']) - self.jobAdd(job, synchron=synchron) - return job.jobResult - + identifier = self.newIdentifier() + self.sendMessage( + self.Message.GenerateSSK, + Identifier=identifier + ) + return identifier + #***************************************************************************** # #***************************************************************************** if __name__ == '__main__': - c = FcpClient(name='test', verbosity=Verbosity.Warning) + c = FcpClient(name='test', verbosity=FcpClient.Verbosity.Debug) nodeHello = c.connect() if nodeHello is not None: + - - - def foo(): - job1 = JobClientHello(c) - c.jobAdd(job1) - - c.run() - print '---------------------------' - print job1.jobResult - print '---------------------------' + def testLateClientHello(): + c.sendMessage( + c.Message.ClientHello, + Name=c.newIdentifier(), + ExpectedVersion=c.Version, + ) + for i in xrange(2): + c.next() + # should raise - #foo() + #testLateClientHello() - #ModifyPeer not ok + + def testGetConfig(): + + def getBuddyValue(params, settingName, buddyPrefix): + buddyName = buddyPrefix + '.' + settingName + value = params.get(buddyName, '') + return (buddyPrefix, value) + + def cb(event, params): + + settings = [ i for i in params if i.startswith('current.')] + settings.sort() + for setting in settings: + + configTree, sep, settingName = setting.partition('.') + value = params[setting] + print '%s=%s' % (settingName, value) + print '%s=%s' % getBuddyValue(params, settingName, 'expertFlag') + print '%s=%s' % getBuddyValue(params, settingName, 'forceWriteFlag') + print '%s=%s' % getBuddyValue(params, settingName, 'shortDescription') + prefix, value = getBuddyValue(params, settingName, 'longDescription') + value = value.replace('. ', '.\n') + value = value.replace('? ', '.\n') + print '%s=%s' % (prefix, value) + print + + c.EventConfigData += cb + oldVerbosity = c.verbosity() + ##c.setVerbosity(c.Verbosity.Warning) + + print '\n>> Requesting config\n' + c.getConfig() + for i in xrange(1): + c.next() + + c.setVerbosity(oldVerbosity) + + #testGetConfig() - #RemovePeer not ok - #ModifyPeerNote ok - #ListPeer not ok + def testGenerateSSK(): + def cb(event, params): + print params + + c.EventSSKKeypair += cb + c.generateSSK() + for i in xrange(1): + c.next() + #testGenerateSSK() - - def foo(): - job = JobListPeer(c, '123456') - c.jobAdd(job, synchron=True) - print job.jobResult - #foo() - - - def foo(): - job = JobGenerateSSK(c) - c.jobAdd(job, synchron=True) - print job.jobResult - #foo() - - - - def foo(): + def testTestDDA(): + def cb(event, params): + print params + + c.EventTestDDAComplete += cb d = os.path.dirname(os.path.abspath(__file__)) - job2 = JobTestDDA(c, d) - c.jobAdd(job2) - c.run() - print '---------------------------' - print job2.jobResult - print '---------------------------' - #foo() + c.testDDA(d, True, True) + for i in xrange(4): + c.next() + + #testTestDDA() - def foo(): + + def testListPeerNotes(): def cb(event, params): - #print params.get('opennet', 'true'), c.pythonBool(params.get('Opennet', 'true')), params['identity'] if params['opennet'] == c.FcpFalse: c.listPeerNotes(params['identity']) c.EventPeer += cb - - c.listPeers() - for i in xrange(80): + for i in xrange(100): c.next() - #foo() + #testListPeerNotes() - def foo(): - #job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip') - identifier = c.fileInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + def testClientGetInfo(): + def cb(event, params): + print params + + c.EventClientGetInfo += cb + identifier = c.clientGetInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') for i in xrange(20): c.next() - - #foo() + #testClientGetInfo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-01 14:54:29
|
Revision: 26 http://fclient.svn.sourceforge.net/fclient/?rev=26&view=rev Author: jUrner Date: 2007-11-01 07:54:34 -0700 (Thu, 01 Nov 2007) Log Message: ----------- bit of refactoring Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-01 14:53:13 UTC (rev 25) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-01 14:54:34 UTC (rev 26) @@ -109,41 +109,34 @@ fp.close() return written -#********************************************************************** -# classes -#********************************************************************** - - #************************************************************************** # fcp client #************************************************************************** -#TODO: no idea what happens on reconnect if socket died. What about running jobs? #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. -#TODO: do not mix directories as identifiers with identifiers (might lead to collisions) -#TODO: how to handle (ProtocolError code 18: Shutting down)? +#TODO: events should be FcpClient.event.PeerNote not FcpClient.EventPeerNote class FcpClient(events.Events): """Fcp client implementation """ _events_ = ( + 'EventClientConnected', + 'EventClientDisconnected', + # config related events 'EventConfigData', 'EventNodeData', #Peer related events - 'EventListPeers', 'EventEndListPeers', 'EventPeer', 'EventPeerRemoved', - 'EventUnknownIdentifier', + 'EventUnknownNodeIdentifier', 'EventListPeerNotes', 'EventEndListPeerNotes', 'EventPeerNote', - 'EventShutdown', - 'EventSocketDied', # get / put related events 'EventTestDDAComplete', @@ -166,6 +159,11 @@ Version = '2.0' FcpTrue = 'true' FcpFalse = 'false' + class DisconnectReason: + """Reason for client disconnect""" + Shutdown = '1' + SocketDied = '2' + class FetchError(Exception): """All fetch errors supported by the client""" @@ -692,14 +690,16 @@ saveRemoveFile(fpath) - #TODO: an iterator would be nice to enshure Guis stay responsitive in the call + def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): - """Establishes the connection to a freenet node + """Iterator to stablish a connection to a freenet node @param host: (str) host of th node @param port: (int) port of the node @param repeat: (int) how many seconds try to connect before giving up @param timeout: (int) how much time to wait before another attempt to connect - @return: (Message) NodeHello if successful,None otherwise + @event: EventConnected(event, params). Triggered as soon as the client is connected. Params + will be the parameters of the NodeHello message. + @return: (Message) NodeHello if successful, None otherwise for the next iteration """ self._clientHello = None self._log.info(self.LogMessages.Connecting) @@ -716,7 +716,7 @@ try: self._socket.connect((host, port)) except Exception, d: - pass + yield None else: self._log.info(self.LogMessages.Connected) @@ -733,9 +733,12 @@ while timeElapsed <= repeat: msg = self.next() if msg.name == self.Message.ClientSocketTimeout: - timeElapsed += SocketTimeout + timeElapsed += SocketTimeout + yield None elif msg.name == self.Message.NodeHello: - return msg.params + self.EventClientConnected(msg.params) + yield msg.params + raise StopIteration else: break break @@ -746,9 +749,11 @@ time.sleep(timeout) self._log.info(self.LogMessages.ConnectingFailed) - return None + raise StopIteration + + - + def handleMessage(self, msg): """Handles a message from the freenet node @param msg: (Message) to handle @@ -763,7 +768,7 @@ code = msg['Code'] if code == self.ProtocolError.ShuttingDown: self.close() - self.EventShutdown(msg.params) + self.EventClientDisconnect({'DisconnectReason': DisconnectReason.Shutdown}) return True raise self.ProtocolError(msg) @@ -851,7 +856,7 @@ return True elif msg.name == self.Message.UnknownNodeIdentifier: - self.EventUnknownIdentifier(msg.params) + self.EventUnknownNodeIdentifier(msg.params) return True #################################################### @@ -951,8 +956,13 @@ """ msg = self.Message.fromSocket(self._socket) if msg.name == self.Message.ClientSocketDied: - self.EventSocketDied(msg['Exception'], msg['Details']) - raise SocketError(msg['Details']) + params = { + 'DisconnectReason': DisconnectReason.SocketDied, + 'Exception': msg['Exception'], + 'Details': msg['Details'] + } + self.EventClientDisconnected(params) + raise self.SocketError(msg['Details']) self.handleMessage(msg) return msg @@ -984,8 +994,13 @@ except socket.error, d: self._log.info(self.LogMessages.SocketDied) self.close() - self.EventSocketDied(socket.error, d) - raise SocketError(d) + params = { + 'DisconnectReason': DisconnectReason.SocketDied, + 'Exception': socket.error, + 'Details': d + } + self.EventClientDisconnected(params) + raise self.SocketError(d) return msg ######################################################### @@ -1001,7 +1016,8 @@ return self.FcpTrue if pythonBool else self.FcpFalse - def newIdentifier(self, prefix=None): + @classmethod + def newIdentifier(clss, prefix=None): """Returns a new unique identifier @return: (str) uuid """ @@ -1009,7 +1025,7 @@ return prefix + str(uuid.uuid4()) return str(uuid.uuid4()) - + def pythonBool(self, fcpBool): """Converts a fcp bool to a python bool @param pythonBool: 'true' or 'false' @@ -1017,6 +1033,15 @@ """ return fcpBool == self.FcpTrue + + def pythonTime(self, fcpTime): + """Converts a fcp time value to a python time value + @param fcpTime: (int, str) time to convert + @return: (int) python time + """ + fcpTime = int(fcpTime) + return fcpTime / 1000 + ######################################################## ## ## Config related methods @@ -1054,14 +1079,14 @@ ## Peer related methods ## ######################################################## - def listPeer(self, identifier): + def listPeer(self, identity): self.jobClient.sendMessage( self.Message.ListPeer, - NodeIdentifier=identifier, + NodeIdentifier=identity, ) - def listPeerNotes(self, identifier): + def listPeerNotes(self, identity): """Lists all text notes associated to a peer @param identifier: peer as returned in a call to L{peerList} @event: EventListPeerNotes(event). @@ -1070,7 +1095,7 @@ """ self.sendMessage( self.Message.ListPeerNotes, - NodeIdentifier=identifier + NodeIdentifier=identity ) @@ -1079,9 +1104,8 @@ @param withMetaData: include meta data for each peer? @param withVolantile: include volantile data for each peer? - @event: EventListPeers(event). @event: EvenPeer(event, peer). - @event: EventEndListPeers(event). + @event: EventEndListPeers(event, params). """ self.sendMessage( self.Message.ListPeers, @@ -1090,10 +1114,10 @@ ) - def modifyPeer(self, identifier, allowLocalAddresses=None, isDisabled=None, isListenOnly=None): + def modifyPeer(self, identity, allowLocalAddresses=None, isDisabled=None, isListenOnly=None): msg = Message( self.Message.ModifyPeer, - NodeIdentifier=identifier, + NodeIdentifier=identity, ) if allowLocalAddresses is not None: msg['AllowLocalAddresses'] = self.fcpBool(allowLocalAddresses) @@ -1105,20 +1129,20 @@ self.sendMessageEx(msg) - def modifyPeerNote(self, identifier, note): + def modifyPeerNote(self, identity, note): self.sendMessage( self.Message.ModifyPeerNote, - NodeIdentifier=identifier, + NodeIdentifier=identity, #NOTE: currently fcp supports only this one type PeerNoteType=self.PeerNoteType.Private, NoteText=note ) - def removePeer(self, identifier): + def removePeer(self, identity): self.sendMessage( self.Message.RemovePeer, - NodeIdentifier=identifier, + NodeIdentifier=identity, ) ########################################################## This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-02 08:27:30
|
Revision: 33 http://fclient.svn.sourceforge.net/fclient/?rev=33&view=rev Author: jUrner Date: 2007-11-02 01:27:25 -0700 (Fri, 02 Nov 2007) Log Message: ----------- combed over events + a minor adjustement in testDDA() Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-01 14:58:13 UTC (rev 32) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-02 08:27:25 UTC (rev 33) @@ -114,56 +114,61 @@ #************************************************************************** #TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this. #TODO: events should be FcpClient.event.PeerNote not FcpClient.EventPeerNote -class FcpClient(events.Events): +class FcpClient(object): """Fcp client implementation """ + + Version = '2.0' + FcpTrue = 'true' + FcpFalse = 'false' + class DisconnectReason: + """Reason for client disconnect""" + Shutdown = '1' + SocketDied = '2' + - _events_ = ( + class Events(events.Events): + """All events the client supports""" + _events_ = ( - 'EventClientConnected', - 'EventClientDisconnected', + 'ClientConnected', + 'ClientDisconnected', # config related events - 'EventConfigData', - 'EventNodeData', + 'ConfigData', + 'NodeData', #Peer related events - 'EventEndListPeers', - 'EventPeer', - 'EventPeerRemoved', - 'EventUnknownNodeIdentifier', + 'EndListPeers', + 'Peer', + 'PeerRemoved', + 'UnknownNodeIdentifier', - 'EventListPeerNotes', - 'EventEndListPeerNotes', - 'EventPeerNote', + 'ListPeerNotes', + 'EndListPeerNotes', + 'PeerNote', # get / put related events - 'EventTestDDAComplete', - 'EventIdentifierCollision', + 'TestDDAComplete', + 'IdentifierCollision', - 'EventClientGetInfo', - 'EventClientGetInfoProgress', + 'ClientGetInfo', + 'ClientGetInfoProgress', - 'EventDataFound', - 'EventGetFailed', - 'EventSimpleProgress', - 'EventPersistentRequestModified', - 'EventPersistentRequestRemoved', + 'DataFound', + 'GetFailed', + 'SimpleProgress', + 'PersistentRequestModified', + 'PersistentRequestRemoved', # others - 'EventSSKKeypair', + 'SSKKeypair', ) - Version = '2.0' - FcpTrue = 'true' - FcpFalse = 'false' - class DisconnectReason: - """Reason for client disconnect""" - Shutdown = '1' - SocketDied = '2' + class FetchError(Exception): """All fetch errors supported by the client""" @@ -348,6 +353,7 @@ KeyboardInterrupt = 'Keyboard interrupt' SocketDied = 'Socket died' + #TODO: maybe speed up lookup of message name lookup by implementing integer message names class Message(object): """Class wrapping a freenet message""" @@ -666,6 +672,8 @@ @param name: name of the client instance or '' (for debugging) @param conectionName: name of the connection @param verbosity: verbosity level for debugging + + @ivar events: events the client supports """ self._connectionName = connectionName @@ -673,6 +681,8 @@ self._log = logging.getLogger(name) self._socket = None + self.events = self.Events() + self.setVerbosity(verbosity) atexit.register(self.close) @@ -697,7 +707,7 @@ @param port: (int) port of the node @param repeat: (int) how many seconds try to connect before giving up @param timeout: (int) how much time to wait before another attempt to connect - @event: EventConnected(event, params). Triggered as soon as the client is connected. Params + @event: Connected(event, params). Triggered as soon as the client is connected. Params will be the parameters of the NodeHello message. @return: (Message) NodeHello if successful, None otherwise for the next iteration """ @@ -736,7 +746,7 @@ timeElapsed += SocketTimeout yield None elif msg.name == self.Message.NodeHello: - self.EventClientConnected(msg.params) + self.events.ClientConnected(msg.params) yield msg.params raise StopIteration else: @@ -751,9 +761,7 @@ self._log.info(self.LogMessages.ConnectingFailed) raise StopIteration - - - + def handleMessage(self, msg): """Handles a message from the freenet node @param msg: (Message) to handle @@ -768,7 +776,7 @@ code = msg['Code'] if code == self.ProtocolError.ShuttingDown: self.close() - self.EventClientDisconnect({'DisconnectReason': DisconnectReason.Shutdown}) + self.events.ClientDisconnect({'DisconnectReason': DisconnectReason.Shutdown}) return True raise self.ProtocolError(msg) @@ -777,10 +785,6 @@ ## ## TestDDA ## - ## Note: if both, ReadDirectoryAllowed and WriteDirectoryAllowed are - ## set to false, the node sends a ProtocolError (7, 'Invalid message'). - ## Have to handle this! - ## #################################################### elif msg.name == self.Message.TestDDAReply: fpathWrite = msg.params.get('WriteFilename', None) @@ -810,7 +814,7 @@ for fpath in self._ddaTmpFiles: saveRemoveFile(fpath) self._ddaTmpFiles = [] - self.EventTestDDAComplete(msg.params) + self.events.TestDDAComplete(msg.params) return True #################################################### @@ -819,11 +823,11 @@ ## #################################################### elif msg.name == self.Message.ConfigData: - self.EventConfigData(msg.params) + self.events.ConfigData(msg.params) return True elif msg.name == self.Message.NodeData: - self.EventNodeData(msg.params) + self.events.NodeData(msg.params) return True #################################################### @@ -832,15 +836,15 @@ ## #################################################### elif msg.name == self.Message.EndListPeers: - self.EventEndListPeers(msg.params) + self.events.EndListPeers(msg.params) return True elif msg.name == self.Message.EndListPeerNotes: - self.EventEndListPeerNotes(msg.params) + self.events.EndListPeerNotes(msg.params) return True elif msg.name == self.Message.Peer: - self.EventPeer(msg.params) + self.events.Peer(msg.params) return True elif msg.name == self.Message.PeerNote: @@ -848,28 +852,28 @@ if note: note = base64.decodestring(note) msg['NoteText'] = note - self.EventPeerNote(msg.params, note) + self.events.PeerNote(msg.params, note) return True elif msg.name == self.Message.PeerRemoved: - self.EventPeerRemoved(msg.params) + self.events.PeerRemoved(msg.params) return True elif msg.name == self.Message.UnknownNodeIdentifier: - self.EventUnknownNodeIdentifier(msg.params) + self.events.UnknownNodeIdentifier(msg.params) return True #################################################### ## - ## Get related messages + ## ClientGet related messages ## #################################################### elif msg.name == self.Message.DataFound: if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo): - self.EventClientGetInfo(msg.params) + self.events.ClientGetInfo(msg.params) return True - self.EventDataFound(msg.params) + self.events.DataFound(msg.params) return True elif msg.name == self.Message.GetFailed: @@ -880,29 +884,29 @@ 'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''), 'DataLength': msg.get('ExpectedDataLength', '') } - self.EventClientGetInfo(params) + self.events.ClientGetInfo(params) return True - self.EventGetFailed(msg.params) + self.events.GetFailed(msg.params) return True elif msg.name == self.Message.SimpleProgress: if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo): - self.EventClientGetInfoProgress(msg.params) + self.events.ClientGetInfoProgress(msg.params) else: - self.EventSimpleProgress(msg.params) + self.events.SimpleProgress(msg.params) return True elif msg.name == self.Message.IdentifierCollision: - self.EventIdentifierCollision(msg.params) + self.events.IdentifierCollision(msg.params) return True elif msg.name == self.Message.PersistentRequestModified: - self.EventPersistentRequestModified(msg.params) + self.events.PersistentRequestModified(msg.params) return True elif msg.name == self.Message.PersistentRequestRemoved: - self.EventPersistentRequestRemoved(msg.params) + self.events.PersistentRequestRemoved(msg.params) return True #################################################### @@ -911,7 +915,7 @@ ## #################################################### elif msg.name == self.Message.SSKKeypair: - self.EventSSKKeypair(msg.params) + self.events.SSKKeypair(msg.params) return True @@ -961,7 +965,7 @@ 'Exception': msg['Exception'], 'Details': msg['Details'] } - self.EventClientDisconnected(params) + self.events.ClientDisconnected(params) raise self.SocketError(msg['Details']) self.handleMessage(msg) return msg @@ -999,7 +1003,7 @@ 'Exception': socket.error, 'Details': d } - self.EventClientDisconnected(params) + self.events.ClientDisconnected(params) raise self.SocketError(d) return msg @@ -1089,9 +1093,9 @@ def listPeerNotes(self, identity): """Lists all text notes associated to a peer @param identifier: peer as returned in a call to L{peerList} - @event: EventListPeerNotes(event). - @event: EventListPeerNote(event, note). - @event: EventEndListPeerNotes(event). + @event: ListPeerNotes(event). + @event: ListPeerNote(event, note). + @event: EndListPeerNotes(event). """ self.sendMessage( self.Message.ListPeerNotes, @@ -1104,8 +1108,8 @@ @param withMetaData: include meta data for each peer? @param withVolantile: include volantile data for each peer? - @event: EvenPeer(event, peer). - @event: EventEndListPeers(event, params). + @event: Peer(event, peer). + @event: EndListPeers(event, params). """ self.sendMessage( self.Message.ListPeers, @@ -1183,7 +1187,7 @@ @param uri: uri of the file to request info about @event: clientGetInfo(event, params). If success, params will contain a key 'Metadata.ContentType' and 'DataLength'. Both may be '' (empty string) - @event: clientGetInfoProgress(event, params). Triggered instead of EventSimpleProgress + @event: GetInfoProgress(event, params). Triggered instead ofSimpleProgress @note: for other events see: L{clientGet} @return: (str) request identifier """ @@ -1201,7 +1205,7 @@ ) return identifier - + def testDDA(self, directory, wantReadDirectory=None, wantWriteDirectory=None): """Tests a directory for read / write access @param directory: directory to test @@ -1213,7 +1217,19 @@ and a directory for read access before uploading content from it @note: the node does not like both parameters being False and will respond with a protocol error in this case. Take care of that. + @note: if a directory is no longer needed, best pratice is to free resources by calling + testDDA() with both parameters set to False. """ + + # if both, ReadDirectoryAllowed and WriteDirectoryAllowed are + # set to false, the node sends a ProtocolError (7, 'Invalid message') + # No idea what the error is good for... so simply ignore the request. + # + # already file a bug report that (False, False) should be interpreted + # by the node to forgett a directory (free resources) + if not wantReadDirectory and not wantWriteDirectory: + return + msg = self.Message( self.Message.TestDDARequest, Directory=directory, @@ -1223,6 +1239,7 @@ if wantWriteDirectory is not None: msg['WantWriteDirectory'] = self.fcpBool(wantWriteDirectory) self.sendMessageEx(msg) + ########################################################## ## ## others @@ -1246,7 +1263,8 @@ #***************************************************************************** if __name__ == '__main__': c = FcpClient(name='test', verbosity=FcpClient.Verbosity.Debug) - nodeHello = c.connect() + + for nodeHello in c.connect(): pass if nodeHello is not None: @@ -1289,13 +1307,13 @@ print '%s=%s' % (prefix, value) print - c.EventConfigData += cb + c.events.ConfigData += cb oldVerbosity = c.verbosity() ##c.setVerbosity(c.Verbosity.Warning) print '\n>> Requesting config\n' c.getConfig() - for i in xrange(1): + for i in xrange(5): c.next() c.setVerbosity(oldVerbosity) @@ -1308,7 +1326,7 @@ def cb(event, params): print params - c.EventSSKKeypair += cb + c.events.SSKKeypair += cb c.generateSSK() for i in xrange(1): c.next() @@ -1319,7 +1337,7 @@ def cb(event, params): print params - c.EventTestDDAComplete += cb + c.events.TestDDAComplete += cb d = os.path.dirname(os.path.abspath(__file__)) c.testDDA(d, True, True) for i in xrange(4): @@ -1333,9 +1351,9 @@ if params['opennet'] == c.FcpFalse: c.listPeerNotes(params['identity']) - c.EventPeer += cb + c.events.Peer += cb c.listPeers() - for i in xrange(100): + for i in xrange(120): c.next() #testListPeerNotes() @@ -1345,7 +1363,7 @@ def cb(event, params): print params - c.EventClientGetInfo += cb + c.events.ClientGetInfo += cb identifier = c.clientGetInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') for i in xrange(20): c.next() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-02 08:43:53
|
Revision: 36 http://fclient.svn.sourceforge.net/fclient/?rev=36&view=rev Author: jUrner Date: 2007-11-02 01:43:54 -0700 (Fri, 02 Nov 2007) Log Message: ----------- message has to be dispatched Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-02 08:30:13 UTC (rev 35) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-02 08:43:54 UTC (rev 36) @@ -809,6 +809,7 @@ ) return True + #TODO: unconditionally clean up all tmp files? Looks like trouble.. elif msg.name == self.Message.TestDDAComplete: # clean tmp files for fpath in self._ddaTmpFiles: @@ -1228,6 +1229,12 @@ # already file a bug report that (False, False) should be interpreted # by the node to forgett a directory (free resources) if not wantReadDirectory and not wantWriteDirectory: + msg = self.Message( + self.Message.TestDDAComplete, + ReadDirectoryAllowed=self.FcpFalse, + WriteDirectoryAllowed=self.FcpFalse, + ) + self.handleMessage(msg) return msg = self.Message( This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-02 16:01:26
|
Revision: 37 http://fclient.svn.sourceforge.net/fclient/?rev=37&view=rev Author: jUrner Date: 2007-11-02 09:01:29 -0700 (Fri, 02 Nov 2007) Log Message: ----------- some fixes Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-02 08:43:54 UTC (rev 36) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-02 16:01:29 UTC (rev 37) @@ -144,7 +144,6 @@ 'PeerRemoved', 'UnknownNodeIdentifier', - 'ListPeerNotes', 'EndListPeerNotes', 'PeerNote', @@ -443,7 +442,7 @@ """Reads n bytes from socket @param socketObj: socket to read bytes from @param n: (int) number of bytes to read - @return: (tuple) error-message or None, bytes read or None if an error occured + @return: (tuple) (error-message or None, bytes read or None) if an error occured or no bytes could be read """ error = p = None @@ -452,7 +451,7 @@ if not p: p = None raise socket.error('Socket shut down by node') - except socket.timeout, d: # no new messages in queue + except socket.timeout, d: # nothing in the queue error = clss(clss.ClientSocketTimeout) except socket.error, d: error = clss(clss.ClientSocketDied, Exception=socket.error, Details=d) @@ -853,7 +852,7 @@ if note: note = base64.decodestring(note) msg['NoteText'] = note - self.events.PeerNote(msg.params, note) + self.events.PeerNote(msg.params) return True elif msg.name == self.Message.PeerRemoved: @@ -1094,9 +1093,9 @@ def listPeerNotes(self, identity): """Lists all text notes associated to a peer @param identifier: peer as returned in a call to L{peerList} - @event: ListPeerNotes(event). - @event: ListPeerNote(event, note). - @event: EndListPeerNotes(event). + @event: ListPeerNote(event, params) + @event: EndListPeerNotes(event, params) + @note: listPeerNotes() is only available for darknet nodes """ self.sendMessage( self.Message.ListPeerNotes, This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-04 17:31:25
|
Revision: 43 http://fclient.svn.sourceforge.net/fclient/?rev=43&view=rev Author: jUrner Date: 2007-11-04 09:31:30 -0800 (Sun, 04 Nov 2007) Log Message: ----------- some more consts Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-04 17:29:50 UTC (rev 42) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-04 17:31:30 UTC (rev 43) @@ -121,10 +121,20 @@ Version = '2.0' FcpTrue = 'true' FcpFalse = 'false' + class ConnectReason: + Connect = '1' + Reconnect = '2' + + class DisconnectReason: - """Reason for client disconnect""" + """Reasons for client disconnect + @cvar Shutdown: regular shutdown of the connection + @cvar SocketDied: connection to the node died unexpectingly + @cvar ConnectFailed: connection could not be established + """ Shutdown = '1' SocketDied = '2' + ConnectFailed = '3' class Events(events.Events): @@ -757,10 +767,16 @@ timeElapsed += timeout time.sleep(timeout) + sef.events.ClientDisconnect({'DisconnectReason': DisconnectReason.ConectFailed}) self._log.info(self.LogMessages.ConnectingFailed) raise StopIteration + + + def connectionName(self): + """Returns the connection name the client uses""" + return self._connectionName - + def handleMessage(self, msg): """Handles a message from the freenet node @param msg: (Message) to handle This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-05 12:43:06
|
Revision: 46 http://fclient.svn.sourceforge.net/fclient/?rev=46&view=rev Author: jUrner Date: 2007-11-05 04:43:07 -0800 (Mon, 05 Nov 2007) Log Message: ----------- error handling Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-05 12:42:27 UTC (rev 45) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-05 12:43:07 UTC (rev 46) @@ -1057,9 +1057,13 @@ def pythonTime(self, fcpTime): """Converts a fcp time value to a python time value @param fcpTime: (int, str) time to convert + @raise ValueError: if the fcpTime could not be converted @return: (int) python time """ - fcpTime = int(fcpTime) + try: + fcpTime = int(fcpTime) + except Exception, d: + raise ValueError(d) return fcpTime / 1000 ######################################################## This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-06 12:34:48
|
Revision: 48 http://fclient.svn.sourceforge.net/fclient/?rev=48&view=rev Author: jUrner Date: 2007-11-06 04:34:50 -0800 (Tue, 06 Nov 2007) Log Message: ----------- minor changes Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-05 12:50:42 UTC (rev 47) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-06 12:34:50 UTC (rev 48) @@ -222,6 +222,7 @@ NotAllDataFound = '28' class FcpError(Exception): pass + #TODO: better description of keys (MaxKeyLen ...) so we can use regex(es) to parse arbitrary strings for keys class FcpUri(object): """Wrapper class for freenet uris""" @@ -362,7 +363,7 @@ KeyboardInterrupt = 'Keyboard interrupt' SocketDied = 'Socket died' - #TODO: maybe speed up lookup of message name lookup by implementing integer message names + #TODO: maybe speed up message name lookup by implementing integer message names class Message(object): """Class wrapping a freenet message""" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-07 17:33:38
|
Revision: 54 http://fclient.svn.sourceforge.net/fclient/?rev=54&view=rev Author: jUrner Date: 2007-11-07 09:33:26 -0800 (Wed, 07 Nov 2007) Log Message: ----------- added an idle event and fixed a missing identifier in EventClientGetInfo Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-07 17:32:18 UTC (rev 53) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-07 17:33:26 UTC (rev 54) @@ -141,6 +141,8 @@ """All events the client supports""" _events_ = ( + 'Idle', + 'ClientConnected', 'ClientDisconnected', @@ -898,6 +900,7 @@ if code == self.FetchError.TooBig: if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo): params = { + 'Identifier': msg['Identifier'], 'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''), 'DataLength': msg.get('ExpectedDataLength', '') } @@ -984,7 +987,12 @@ } self.events.ClientDisconnected(params) raise self.SocketError(msg['Details']) - self.handleMessage(msg) + + elif msg.name == self.Message.ClientSocketTimeout: + self.events.Idle(msg) + + else: + self.handleMessage(msg) return msg This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-08 11:12:52
|
Revision: 58 http://fclient.svn.sourceforge.net/fclient/?rev=58&view=rev Author: jUrner Date: 2007-11-08 03:12:56 -0800 (Thu, 08 Nov 2007) Log Message: ----------- protocol errors in relation to requests are now passed to the caller ++ some renames Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-07 17:35:34 UTC (rev 57) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-08 11:12:56 UTC (rev 58) @@ -146,6 +146,8 @@ 'ClientConnected', 'ClientDisconnected', + 'ProtocolError', + # config related events 'ConfigData', 'NodeData', @@ -164,8 +166,8 @@ 'TestDDAComplete', 'IdentifierCollision', - 'ClientGetInfo', - 'ClientGetInfoProgress', + 'ClientRequestInfo', + 'ClientRequestInfoProgress', 'DataFound', 'GetFailed', @@ -319,7 +321,7 @@ class IdentifierPrefix: """Special purpose identifier prefixes""" - ClientGetInfo = 'ClientGetInfo::' + ClientRequestInfo = 'ClientRequestInfo::' class InsertError(Exception): """All insert errors supported by the client""" @@ -797,8 +799,14 @@ self.events.ClientDisconnect({'DisconnectReason': DisconnectReason.Shutdown}) return True - raise self.ProtocolError(msg) + identifier = msg.get('Identifier', None) + if identifier is None: + #TODO: check how to handle this + raise self.ProtocolError(msg) + else: + self.events.ProtocolError(msg.params) + #################################################### ## ## TestDDA @@ -888,8 +896,8 @@ ## #################################################### elif msg.name == self.Message.DataFound: - if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo): - self.events.ClientGetInfo(msg.params) + if msg['Identifier'].startswith(self.IdentifierPrefix.ClientRequestInfo): + self.events.ClientRequestInfo(msg.params) return True self.events.DataFound(msg.params) @@ -898,21 +906,21 @@ elif msg.name == self.Message.GetFailed: code = msg['Code'] if code == self.FetchError.TooBig: - if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo): + if msg['Identifier'].startswith(self.IdentifierPrefix.ClientRequestInfo): params = { 'Identifier': msg['Identifier'], 'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''), 'DataLength': msg.get('ExpectedDataLength', '') } - self.events.ClientGetInfo(params) + self.events.ClientRequestInfo(params) return True self.events.GetFailed(msg.params) return True elif msg.name == self.Message.SimpleProgress: - if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo): - self.events.ClientGetInfoProgress(msg.params) + if msg['Identifier'].startswith(self.IdentifierPrefix.ClientRequestInfo): + self.events.ClientRequestInfoProgress(msg.params) else: self.events.SimpleProgress(msg.params) return True @@ -1211,7 +1219,7 @@ return identifier - def clientGetInfo(self, uri, **params): + def clientRequestInfo(self, uri, **params): """Requests info about a file @param uri: uri of the file to request info about @event: clientGetInfo(event, params). If success, params will contain a key 'Metadata.ContentType' @@ -1220,7 +1228,7 @@ @note: for other events see: L{clientGet} @return: (str) request identifier """ - identifier = self.IdentifierPrefix.ClientGetInfo + self.newIdentifier() + identifier = self.IdentifierPrefix.ClientRequestInfo + self.newIdentifier() self.sendMessage( self.Message.ClientGet, Identifier=identifier, @@ -1394,13 +1402,13 @@ #testListPeerNotes() - def testClientGetInfo(): + def testClientRequestInfo(): def cb(event, params): print params c.events.ClientGetInfo += cb - identifier = c.clientGetInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') + identifier = c.clientRequestInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg') for i in xrange(20): c.next() - #testClientGetInfo() + #testClientRequestInfo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-10 11:28:50
|
Revision: 65 http://fclient.svn.sourceforge.net/fclient/?rev=65&view=rev Author: jUrner Date: 2007-11-10 03:28:52 -0800 (Sat, 10 Nov 2007) Log Message: ----------- request identifiers can now be set explicitely + EventFileInfoProgress has gone + a bit of this and that Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-10 11:27:11 UTC (rev 64) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-10 11:28:52 UTC (rev 65) @@ -167,8 +167,7 @@ 'IdentifierCollision', 'ClientRequestInfo', - 'ClientRequestInfoProgress', - + 'DataFound', 'GetFailed', 'SimpleProgress', @@ -321,6 +320,7 @@ class IdentifierPrefix: """Special purpose identifier prefixes""" + ClientGetFile = 'ClientGetFile::' ClientRequestInfo = 'ClientRequestInfo::' class InsertError(Exception): @@ -919,10 +919,7 @@ return True elif msg.name == self.Message.SimpleProgress: - if msg['Identifier'].startswith(self.IdentifierPrefix.ClientRequestInfo): - self.events.ClientRequestInfoProgress(msg.params) - else: - self.events.SimpleProgress(msg.params) + self.events.SimpleProgress(msg.params) return True elif msg.name == self.Message.IdentifierCollision: @@ -1192,10 +1189,13 @@ ## ########################################################## #TODO: not complete yet - def clientGetFile(self, uri, filename): + def clientGetFile(self, uri, filename, identifier=None): """ """ - identifier = self.new_identifier() + if identifier is None: + identifier = self.IdentifierPrefix.ClientGetFile + self.newIdentifier() + else: + assert identifier.startswith(self.IdentifierPrefix.ClientGetFile), 'Wrong prefix' msg = self.Message( self.Message.ClientGet, IgnoreDS='false', @@ -1219,20 +1219,29 @@ return identifier - def clientRequestInfo(self, uri, **params): + def clientRequestInfo(self, uri, identifier=None, **params): """Requests info about a file @param uri: uri of the file to request info about + @param identifier: request identifier or None to let the method create one. If an identifier is passed, it has to be + be prefixed with L{IdentifierPrefix.ClientRequestInfo} @event: clientGetInfo(event, params). If success, params will contain a key 'Metadata.ContentType' and 'DataLength'. Both may be '' (empty string) @event: GetInfoProgress(event, params). Triggered instead ofSimpleProgress @note: for other events see: L{clientGet} @return: (str) request identifier + @note: the request identifier returned is very likely to be unique but uniqueness is not guaranteed """ - identifier = self.IdentifierPrefix.ClientRequestInfo + self.newIdentifier() + if identifier is None: + identifier = self.IdentifierPrefix.ClientRequestInfo + self.newIdentifier() + else: + assert identifier.startswith(self.IdentifierPrefix.ClientRequestInfo), 'Wrong prefix' self.sendMessage( self.Message.ClientGet, Identifier=identifier, URI=uri, + #TODO: persistance??? + #Persistence='forever', + # suggested by Mathew Toseland to use about 32k for mimeType requests # basic sizes of keys are: 1k for SSks and 32k for CHKs MaxSize='32000', This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-12 10:35:06
|
Revision: 67 http://fclient.svn.sourceforge.net/fclient/?rev=67&view=rev Author: jUrner Date: 2007-11-12 02:35:09 -0800 (Mon, 12 Nov 2007) Log Message: ----------- bit of this and that Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-10 11:30:27 UTC (rev 66) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-12 10:35:09 UTC (rev 67) @@ -168,6 +168,7 @@ 'ClientRequestInfo', + 'PersistentGet', 'DataFound', 'GetFailed', 'SimpleProgress', @@ -318,6 +319,7 @@ + class IdentifierPrefix: """Special purpose identifier prefixes""" ClientGetFile = 'ClientGetFile::' @@ -691,7 +693,7 @@ """ self._connectionName = connectionName - self._ddaTmpFiles = [] + self._ddaTmpFiles = {} self._log = logging.getLogger(name) self._socket = None @@ -813,6 +815,7 @@ ## #################################################### elif msg.name == self.Message.TestDDAReply: + directory = msg['Directory'] fpathWrite = msg.params.get('WriteFilename', None) fpathRead = msg.params.get('ReadFilename', None) readContent = '' @@ -821,8 +824,13 @@ if not written: saveRemoveFile(fpathWrite) else: - self._ddaTmpFiles.append(fpathWrite) - + # ...hope the node will keep request order so the correct + # tmp file is removed on completion + if directory in self._ddaTmpFiles: + self._ddaTmpFiles[directory].append(fpathWrite) + else: + self._ddaTmpFiles[directory] = [fpathWrite, ] + if fpathRead is not None: readContent = saveReadFile(fpathRead) if readContent is None: @@ -835,12 +843,11 @@ ) return True - #TODO: unconditionally clean up all tmp files? Looks like trouble.. elif msg.name == self.Message.TestDDAComplete: - # clean tmp files - for fpath in self._ddaTmpFiles: - saveRemoveFile(fpath) - self._ddaTmpFiles = [] + # clean up tmp file + directory = msg['Directory'] + tmp_file = self._ddaTmpFiles[directory].pop(0) + saveRemoveFile(tmp_file) self.events.TestDDAComplete(msg.params) return True @@ -926,6 +933,10 @@ self.events.IdentifierCollision(msg.params) return True + elif msg.name == self.Message.PersistentGet: + self.events.PersistentGet(msg.params) + return True + elif msg.name == self.Message.PersistentRequestModified: self.events.PersistentRequestModified(msg.params) return True @@ -1189,13 +1200,13 @@ ## ########################################################## #TODO: not complete yet - def clientGetFile(self, uri, filename, identifier=None): + def clientGetFile(self, uri, filename, identifier=None, clientToken=None): """ """ if identifier is None: identifier = self.IdentifierPrefix.ClientGetFile + self.newIdentifier() else: - assert identifier.startswith(self.IdentifierPrefix.ClientGetFile), 'Wrong prefix' + assert self.isClientGetFile(identifier), 'Wrong prefix' msg = self.Message( self.Message.ClientGet, IgnoreDS='false', @@ -1214,12 +1225,14 @@ #BinaryBlob='false', Filename=filename, ) + if clientToken is not None: + msg['ClientToken'] = clientToken self.sendMessageEx(msg) return identifier - def clientRequestInfo(self, uri, identifier=None, **params): + def clientRequestInfo(self, uri, identifier=None, clientToken=None, **params): """Requests info about a file @param uri: uri of the file to request info about @param identifier: request identifier or None to let the method create one. If an identifier is passed, it has to be @@ -1234,13 +1247,13 @@ if identifier is None: identifier = self.IdentifierPrefix.ClientRequestInfo + self.newIdentifier() else: - assert identifier.startswith(self.IdentifierPrefix.ClientRequestInfo), 'Wrong prefix' - self.sendMessage( + assert self.isClientRequestInfo(identifier), 'Wrong prefix' + msg = self.Message( self.Message.ClientGet, Identifier=identifier, URI=uri, #TODO: persistance??? - #Persistence='forever', + Persistence='forever', # suggested by Mathew Toseland to use about 32k for mimeType requests # basic sizes of keys are: 1k for SSks and 32k for CHKs @@ -1249,9 +1262,41 @@ Verbosity='1', **params ) + + if clientToken is not None: + msg['ClientToken'] = clientToken + + self.sendMessageEx(msg) return identifier + + def isClientGetFile(self, identifier): + """Checks if an identifier is a ClientGetFile identifier + @return: (bool) + """ + return identifier.startswith(self.IdentifierPrefix.ClientGetFile) + + def isClientRequestInfo(self, identifier): + """Checks if an identifier is a RequestFileInfo identifier + @return: (bool) + """ + return identifier.startswith(self.IdentifierPrefix.ClientRequestInfo) + + + def removePersistentRequest(self, identifier, global_=False): + """Removes a request + @param identifier: (str) identifier of the request to remove + @param global_: (bool) has to be set to True if the request was a global request + """ + self.sendMessage( + self.Message.RemovePersistentRequest, + Global=self.fcpBool(global_), + Identifier=identifier + ) + + + #TODO: check if node passes directory unchanged to testDDAComplete (trailing slash) def testDDA(self, directory, wantReadDirectory=None, wantWriteDirectory=None): """Tests a directory for read / write access @param directory: directory to test @@ -1292,6 +1337,7 @@ msg['WantWriteDirectory'] = self.fcpBool(wantWriteDirectory) self.sendMessageEx(msg) + ########################################################## ## ## others This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-13 20:57:12
|
Revision: 73 http://fclient.svn.sourceforge.net/fclient/?rev=73&view=rev Author: jUrner Date: 2007-11-13 12:57:13 -0800 (Tue, 13 Nov 2007) Log Message: ----------- more methods, more flags Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-12 11:33:36 UTC (rev 72) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-13 20:57:13 UTC (rev 73) @@ -613,7 +613,7 @@ """All known peer note types""" Private = '1' - class Priorities: + class Priority: """All priorities supported by the client""" Maximum = '0' Interactive = '1' @@ -623,9 +623,7 @@ Prefetch = '5' Minimum = '6' - PriorityMin = Minimum - PriorityDefault = Bulk - + class ProtocolError(Exception): """All protocol errors supported by the client""" @@ -1200,7 +1198,7 @@ ## ########################################################## #TODO: not complete yet - def clientGetFile(self, uri, filename, identifier=None, clientToken=None): + def clientGetFile(self, uri, filename, identifier=None, clientToken=None, priorityClass=None): """ """ if identifier is None: @@ -1225,14 +1223,17 @@ #BinaryBlob='false', Filename=filename, ) - if clientToken is not None: + if clientToken is not None: msg['ClientToken'] = clientToken + if priorityClass is not None: + msg['PriorityClass'] = priorityClass + self.sendMessageEx(msg) return identifier - def clientRequestInfo(self, uri, identifier=None, clientToken=None, **params): + def clientRequestInfo(self, uri, identifier=None, clientToken=None, priorityClass=None, **params): """Requests info about a file @param uri: uri of the file to request info about @param identifier: request identifier or None to let the method create one. If an identifier is passed, it has to be @@ -1262,10 +1263,11 @@ Verbosity='1', **params ) - if clientToken is not None: msg['ClientToken'] = clientToken - + if priorityClass is not None: + msg['PriorityClass'] = priorityClass + self.sendMessageEx(msg) return identifier @@ -1284,6 +1286,24 @@ return identifier.startswith(self.IdentifierPrefix.ClientRequestInfo) + def modifyPersistantRequest(self, identifier, global_=False, priorityClass=None): + """Modifies a request + @param identifier: identifier of the request + @param global: (bool) required. Is the request global or local? + @param clientToken: new client token or None + @param priorityClass: new priority or None + """ + msg = self.Message( + self.Message.ModifyPersistentRequest, + Identifier=identifier, + Global=self.fcpBool(global_), + ) + if priorityClass is not None: + msg['PriorityClass'] = priorityClass + self.sendMessageEx(msg) + + + def removePersistentRequest(self, identifier, global_=False): """Removes a request @param identifier: (str) identifier of the request to remove This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jU...@us...> - 2007-11-14 11:03:31
|
Revision: 78 http://fclient.svn.sourceforge.net/fclient/?rev=78&view=rev Author: jUrner Date: 2007-11-14 03:03:35 -0800 (Wed, 14 Nov 2007) Log Message: ----------- typo Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-14 11:03:13 UTC (rev 77) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-14 11:03:35 UTC (rev 78) @@ -1286,7 +1286,7 @@ return identifier.startswith(self.IdentifierPrefix.ClientRequestInfo) - def modifyPersistantRequest(self, identifier, global_=False, priorityClass=None): + def modifyPersistentRequest(self, identifier, global_=False, priorityClass=None): """Modifies a request @param identifier: identifier of the request @param global: (bool) required. Is the request global or local? This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |