From: <jU...@us...> - 2007-10-27 20:59:41
|
Revision: 16 http://fclient.svn.sourceforge.net/fclient/?rev=16&view=rev Author: jUrner Date: 2007-10-27 13:59:45 -0700 (Sat, 27 Oct 2007) Log Message: ----------- bit of refactoring + play ClientHello as save as possible Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 17:00:21 UTC (rev 15) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 20:59:45 UTC (rev 16) @@ -1,4 +1,54 @@ '''Freenet client protocol 2.0 implementation + + +@newfield event, events + + + +Sample code:: + + client = FcpClient() + nodeHello = client.connect() + if nodeHello is not None: + # do whatever + + + +Most method calls can be made either synchron or asynchron:: + + peers = client.peerList(synchron=True) + for peer in peers: + # do whatever + + +To get informed about asynchron events you should connect the relevant events the client provides:: + + # connect to one single event + client.EventListNextPeer += MyCallback + + # connect to multiple events at once + client += ( + (client.EventListPeers, MyCallback1), + (client.EventEndListPeers, MyCallback2), + ) + + # each callback is called with the event as first parameter, followed by additional parameters, + # depending on the event triggered. + def MyListNextPeerCallback(event, peer): + print peer + + client.peerList(synchron=False) + + + # when event notifications are no longer required, you should always make shure to disconnect from them + client.EventListNextPeer -= MyCallback + client -= ( + (client.EventListPeers, MyCallback1), + (client.EventEndListPeers, MyCallback2), + ) + + + ''' #NOTE: @@ -478,6 +528,11 @@ SubscribedUSKUpdate = 'SubscribedUSKUpdate' + # client messages (internal use only) + ClientSocketTimeout = 0 + ClientSocketDied = 1 + + def __init__(self, name, data=None, **params): """ @param name: messge name @@ -489,15 +544,71 @@ self.name = name self.params = params - def toString(self): - """Returns the message as formated string ready to be send""" - # TODO: "Data" not yet implemented - out = [self.name, ] - for param, value in self.params.items(): - out.append('%s=%s' % (param, value)) - out.append('EndMessage\n') - return '\n'.join(out) + def get(self, name, default=None): + """Returns the message parameter 'name' or 'default' """ + return self.params.get(name, default) + + def __getitem__(self, name): + """Returns the message parameter 'name' """ + return self.params[name] + + def __setitem__(self, name, value): + """Sets the message parameter 'name' to 'value' """ + self.params[name] = value + + @classmethod + def fromSocket(clss, socketObj): + msg = clss(None) + buf = [] + while True: + + try: + p = socketObj.recv(1) + if not p: raise ValueError('Socket is dead') + except socket.timeout, d: # no new messages in queue + msg.name = clss.ClientSocketTimeOut + return msg + except Exception, d: + msg.name = clss.ClientSocketDied + msg['Exception'] = Exception + msg['Details'] = d + return msg + + if p == '\r': # ignore + continue + + if p != '\n': + buf.append(p) + continue + + line = ''.join(buf) + if line in ('End', "EndMessage"): + break + buf = [] + + if msg.name is None: + msg.name = line + elif line == 'Data': + n = int(msg.params['DataLength']) + try: + msg.data = socketObj.recv(n) + if not msg.data: raise ValueError('Socket is dead') + except Exception, d: + msg.name = clss.ClientSocketDied + msg['Exception'] = Exception + msg['Details'] = d + return msg + + else: + head, sep, tail = line.partition('=') + msg.params[head] = tail + if not sep: + # TODO: chek for invalid messages or not + pass + + return msg + def pprint(self): """Returns the message as nicely formated human readable string""" out = ['', '>>' + self.name, ] @@ -505,28 +616,19 @@ out.append('>> %s=%s' % (param, value)) out.append('>>EndMessage') return '\n'.join(out) - - def __getitem__(self, name): - """Returns the message parameter 'name' """ - return self.params[name] - - def get(self, name, default=None): - """Returns the message parameter 'name' or 'default' """ - return self.params.get(name, default) - - def __setitem__(self, name, value): - """Sets the message parameter 'name' to 'value' """ - self.params[name] = value - + def toString(self): + """Returns the message as formated string ready to be send""" + # TODO: "Data" not yet implemented + if isinstance(self.name, (int, long)): + raise ValueError('You can not send client internal messages to the node') + out = [self.name, ] + for param, value in self.params.items(): + out.append('%s=%s' % (param, value)) + out.append('EndMessage\n') + return '\n'.join(out) -class MessageSocketTimeout(Message): - - def __init__(self): - Message.__init__(self, 'USocketTimeOut') - - #************************************************************************** # jobs #************************************************************************** @@ -1012,7 +1114,8 @@ #TODO: how to handle (ProtocolError code 18: Shutting down)? class FcpClient(events.Events): - """Fcp client implementation""" + """Fcp client implementation + """ _events_ = ( 'EventListPeers', @@ -1094,17 +1197,25 @@ else: self._log.info(self._logMessages.Connected) + # send ClientHello and wait for NodeHello #NOTE: thought I could leave ClientHelloing up to the caller # but instad of responding with ClientHelloMustBeFirst - # as expected the socket simply breaks. So take it over. + # as expected when not doing so, the node disconnects. + # So take it over here. job = JobClientHello(self) - self.jobAdd(job, synchron=True) - assert job.jobResult is not None, 'ClientHello is not working as expected' - return job.jobResult - + self.jobAdd(job, synchron=False) + while time_elapsed <= repeat: + msg = self.next() + if msg.name == Message.ClientSocketTimeout: + time_elapsed += SocketTimeout + elif msg.name == Message.NodeHello: + return msg.params + else: + break + break + + # continue polling self._log.info(self._logMessages.ConnectionRetry) - - # continue polling time_elapsed += timeout time.sleep(timeout) @@ -1117,9 +1228,8 @@ @param msg: (Message) to handle @return: True if the message was handled, False otherwise """ - - - if msg.name == 'USocketTimeOut': + + if msg.name == Message.ClientSocketTimeout: return True self._log.debug(self._logMessages.MessageReceived + msg.pprint()) @@ -1190,8 +1300,7 @@ return self.jobDispatchMessage(identifier, msg) raise RuntimeError('Should not have endet here: %s' % msg.name) - - + ######################################################### @@ -1212,6 +1321,7 @@ return result + #TODO: not quite clear about the consequences of a synchron job. Have to think this over def jobAdd(self, job, synchron=False): """Adds a job to the client @param job: (Job*) job to add @@ -1288,87 +1398,28 @@ return True - #TODO: some info when all jobs are completed + #TODO: some info when all jobs are completed? def next(self): """Pumps the next message waiting @note: use this method instead of run() to run the client step by step """ - msg = self.readMessage() + msg = Message.fromSocket(self._socket) + if msg.name == Message.ClientSocketDied: + raise SocketError(msg['Details']) self.handleMessage(msg) return msg - def readMessage(self): - """Reads the next message directly from the socket and dispatches it - @return: (Message) the next message read from the socket - @raise SocketError: if the socket connection to the node dies unexpectedly - If an error handler is passed to the client it is called emidiately before the error - is raised. - """ - msg = Message(None) - buf = [] - while True: - - try: - p = self._socket.recv(1) - if not p: raise ValueError('Socket is dead') - except socket.timeout, d: # no new messages in queue - msg = MessageSocketTimeout() - break - except Exception, d: - self._log.info(self._logMessages.SocketDead) - self.close() - if self._errorHandler is not None: - self._errorHandler(SocketError, d) - raise SocketError(d) #!! - - if p == '\r': # ignore - continue - - if p != '\n': - buf.append(p) - continue - - line = ''.join(buf) - if line in ('End', "EndMessage"): - break - buf = [] - - if msg.name is None: - msg.name = line - elif line == 'Data': - n = int(msg.params['DataLength']) - try: - msg.data = self._socket.recv(n) - if not msg.data: raise ValueError('Socket is dead') - except Exception, d: - self._log.info(self._logMessages.SocketDead) - self.close() - if self._errorHandler is not None: - self._errorHandler(SocketError, d) - raise SocketError(d) #!! - - else: - head, sep, tail = line.partition('=') - msg.params[head] = tail - if not sep: - # TODO: chek for invalid messages or not - pass - - return msg - - + def run(self): """Runs the client untill all jobs passed to it are completed @note: use KeyboardInterrupt to stop prematurely """ try: #n = 0 - while self.hasJobsRunning(): #n += 1 #if n > 40: break self.next() - except KeyboardInterrupt: self._log(self._logMessages.KeyboardInterrupt) self.close() @@ -1429,7 +1480,7 @@ ######################################################### ## boilerplate code to tackle TestDDA ## - ## ...but I don't trust it ;-) I was not yet alble to wrap my head around + ## ...but I don't trust it ;-) I was not yet able to wrap my head around ## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not. ## ## Another problem is that there is no way to know when a directory is no longer @@ -1498,13 +1549,30 @@ ## ################################################# def peerList(self, synchron=False): + """Lists all peers of the node + @param synchron: if True, waits untill the call is completed, if False returns emidiately + @return: (list) of peers in a synchron, always None in an asynchron call + + @event: EventListPeers(event). + @event: EventListNextPeer(event, peer). + @event: EventEndListPeers(event). + """ job = JobListPeers(self) self.jobAdd(job, synchron=synchron) return job.jobResult def peerNotes(self, peer, synchron=False): - if pythonBool(peer['opennet']): + """Lists all text notes associated to a peer + @param peer: peer as returned in a call to L{peerList} + @param synchron: if True, waits untill the call is completed, if False returns emidiately + @return: (list) of notes in a synchron, always None in an asynchron call + + @event: EventListPeerNotes(event). + @event: EventListNextPeerNote(event, note). + @event: EventEndListPeerNotes(event). + """ + if pythonBool(peer['opennet']): # opennet peers do not have any notes associated return [] job = JobListPeerNotes(self, peer['identity']) self.jobAdd(job, synchron=synchron) @@ -1517,7 +1585,10 @@ if __name__ == '__main__': c = FcpClient(name='test', verbosity=logging.DEBUG) nodeHello = c.connect() - if nodeHello is not None: + if nodeHello is not None or 1: + + + def foo(): job1 = JobClientHello(c) c.jobAdd(job1) @@ -1529,8 +1600,13 @@ # should raise #foo() - + def foo(): + job = JobGenerateSSK(c) + c.jobAdd(job, synchron=True) + print job.jobResult + #foo() + def foo(): job = JobGenerateSSK(c) c.jobAdd(job, synchron=True) @@ -1554,7 +1630,7 @@ for peer in peers: print c.peerNotes(peer, synchron=True) - foo() + #foo() def foo(): This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |