SF.net SVN: fclient: [16] trunk/fclient/fclient_lib/fcp/fcp2_0.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <jU...@us...> - 2007-10-27 20:59:41
|
Revision: 16
http://fclient.svn.sourceforge.net/fclient/?rev=16&view=rev
Author: jUrner
Date: 2007-10-27 13:59:45 -0700 (Sat, 27 Oct 2007)
Log Message:
-----------
bit of refactoring + play ClientHello as save as possible
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 17:00:21 UTC (rev 15)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 20:59:45 UTC (rev 16)
@@ -1,4 +1,54 @@
'''Freenet client protocol 2.0 implementation
+
+
+@newfield event, events
+
+
+
+Sample code::
+
+ client = FcpClient()
+ nodeHello = client.connect()
+ if nodeHello is not None:
+ # do whatever
+
+
+
+Most method calls can be made either synchron or asynchron::
+
+ peers = client.peerList(synchron=True)
+ for peer in peers:
+ # do whatever
+
+
+To get informed about asynchron events you should connect the relevant events the client provides::
+
+ # connect to one single event
+ client.EventListNextPeer += MyCallback
+
+ # connect to multiple events at once
+ client += (
+ (client.EventListPeers, MyCallback1),
+ (client.EventEndListPeers, MyCallback2),
+ )
+
+ # each callback is called with the event as first parameter, followed by additional parameters,
+ # depending on the event triggered.
+ def MyListNextPeerCallback(event, peer):
+ print peer
+
+ client.peerList(synchron=False)
+
+
+ # when event notifications are no longer required, you should always make shure to disconnect from them
+ client.EventListNextPeer -= MyCallback
+ client -= (
+ (client.EventListPeers, MyCallback1),
+ (client.EventEndListPeers, MyCallback2),
+ )
+
+
+
'''
#NOTE:
@@ -478,6 +528,11 @@
SubscribedUSKUpdate = 'SubscribedUSKUpdate'
+ # client messages (internal use only)
+ ClientSocketTimeout = 0
+ ClientSocketDied = 1
+
+
def __init__(self, name, data=None, **params):
"""
@param name: messge name
@@ -489,15 +544,71 @@
self.name = name
self.params = params
- def toString(self):
- """Returns the message as formated string ready to be send"""
- # TODO: "Data" not yet implemented
- out = [self.name, ]
- for param, value in self.params.items():
- out.append('%s=%s' % (param, value))
- out.append('EndMessage\n')
- return '\n'.join(out)
+ def get(self, name, default=None):
+ """Returns the message parameter 'name' or 'default' """
+ return self.params.get(name, default)
+
+ def __getitem__(self, name):
+ """Returns the message parameter 'name' """
+ return self.params[name]
+
+ def __setitem__(self, name, value):
+ """Sets the message parameter 'name' to 'value' """
+ self.params[name] = value
+
+ @classmethod
+ def fromSocket(clss, socketObj):
+ msg = clss(None)
+ buf = []
+ while True:
+
+ try:
+ p = socketObj.recv(1)
+ if not p: raise ValueError('Socket is dead')
+ except socket.timeout, d: # no new messages in queue
+ msg.name = clss.ClientSocketTimeOut
+ return msg
+ except Exception, d:
+ msg.name = clss.ClientSocketDied
+ msg['Exception'] = Exception
+ msg['Details'] = d
+ return msg
+
+ if p == '\r': # ignore
+ continue
+
+ if p != '\n':
+ buf.append(p)
+ continue
+
+ line = ''.join(buf)
+ if line in ('End', "EndMessage"):
+ break
+ buf = []
+
+ if msg.name is None:
+ msg.name = line
+ elif line == 'Data':
+ n = int(msg.params['DataLength'])
+ try:
+ msg.data = socketObj.recv(n)
+ if not msg.data: raise ValueError('Socket is dead')
+ except Exception, d:
+ msg.name = clss.ClientSocketDied
+ msg['Exception'] = Exception
+ msg['Details'] = d
+ return msg
+
+ else:
+ head, sep, tail = line.partition('=')
+ msg.params[head] = tail
+ if not sep:
+ # TODO: chek for invalid messages or not
+ pass
+
+ return msg
+
def pprint(self):
"""Returns the message as nicely formated human readable string"""
out = ['', '>>' + self.name, ]
@@ -505,28 +616,19 @@
out.append('>> %s=%s' % (param, value))
out.append('>>EndMessage')
return '\n'.join(out)
-
- def __getitem__(self, name):
- """Returns the message parameter 'name' """
- return self.params[name]
-
- def get(self, name, default=None):
- """Returns the message parameter 'name' or 'default' """
- return self.params.get(name, default)
-
- def __setitem__(self, name, value):
- """Sets the message parameter 'name' to 'value' """
- self.params[name] = value
-
+ def toString(self):
+ """Returns the message as formated string ready to be send"""
+ # TODO: "Data" not yet implemented
+ if isinstance(self.name, (int, long)):
+ raise ValueError('You can not send client internal messages to the node')
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append('%s=%s' % (param, value))
+ out.append('EndMessage\n')
+ return '\n'.join(out)
-class MessageSocketTimeout(Message):
-
- def __init__(self):
- Message.__init__(self, 'USocketTimeOut')
-
-
#**************************************************************************
# jobs
#**************************************************************************
@@ -1012,7 +1114,8 @@
#TODO: how to handle (ProtocolError code 18: Shutting down)?
class FcpClient(events.Events):
- """Fcp client implementation"""
+ """Fcp client implementation
+ """
_events_ = (
'EventListPeers',
@@ -1094,17 +1197,25 @@
else:
self._log.info(self._logMessages.Connected)
+ # send ClientHello and wait for NodeHello
#NOTE: thought I could leave ClientHelloing up to the caller
# but instad of responding with ClientHelloMustBeFirst
- # as expected the socket simply breaks. So take it over.
+ # as expected when not doing so, the node disconnects.
+ # So take it over here.
job = JobClientHello(self)
- self.jobAdd(job, synchron=True)
- assert job.jobResult is not None, 'ClientHello is not working as expected'
- return job.jobResult
-
+ self.jobAdd(job, synchron=False)
+ while time_elapsed <= repeat:
+ msg = self.next()
+ if msg.name == Message.ClientSocketTimeout:
+ time_elapsed += SocketTimeout
+ elif msg.name == Message.NodeHello:
+ return msg.params
+ else:
+ break
+ break
+
+ # continue polling
self._log.info(self._logMessages.ConnectionRetry)
-
- # continue polling
time_elapsed += timeout
time.sleep(timeout)
@@ -1117,9 +1228,8 @@
@param msg: (Message) to handle
@return: True if the message was handled, False otherwise
"""
-
-
- if msg.name == 'USocketTimeOut':
+
+ if msg.name == Message.ClientSocketTimeout:
return True
self._log.debug(self._logMessages.MessageReceived + msg.pprint())
@@ -1190,8 +1300,7 @@
return self.jobDispatchMessage(identifier, msg)
raise RuntimeError('Should not have endet here: %s' % msg.name)
-
-
+
#########################################################
@@ -1212,6 +1321,7 @@
return result
+ #TODO: not quite clear about the consequences of a synchron job. Have to think this over
def jobAdd(self, job, synchron=False):
"""Adds a job to the client
@param job: (Job*) job to add
@@ -1288,87 +1398,28 @@
return True
- #TODO: some info when all jobs are completed
+ #TODO: some info when all jobs are completed?
def next(self):
"""Pumps the next message waiting
@note: use this method instead of run() to run the client step by step
"""
- msg = self.readMessage()
+ msg = Message.fromSocket(self._socket)
+ if msg.name == Message.ClientSocketDied:
+ raise SocketError(msg['Details'])
self.handleMessage(msg)
return msg
- def readMessage(self):
- """Reads the next message directly from the socket and dispatches it
- @return: (Message) the next message read from the socket
- @raise SocketError: if the socket connection to the node dies unexpectedly
- If an error handler is passed to the client it is called emidiately before the error
- is raised.
- """
- msg = Message(None)
- buf = []
- while True:
-
- try:
- p = self._socket.recv(1)
- if not p: raise ValueError('Socket is dead')
- except socket.timeout, d: # no new messages in queue
- msg = MessageSocketTimeout()
- break
- except Exception, d:
- self._log.info(self._logMessages.SocketDead)
- self.close()
- if self._errorHandler is not None:
- self._errorHandler(SocketError, d)
- raise SocketError(d) #!!
-
- if p == '\r': # ignore
- continue
-
- if p != '\n':
- buf.append(p)
- continue
-
- line = ''.join(buf)
- if line in ('End', "EndMessage"):
- break
- buf = []
-
- if msg.name is None:
- msg.name = line
- elif line == 'Data':
- n = int(msg.params['DataLength'])
- try:
- msg.data = self._socket.recv(n)
- if not msg.data: raise ValueError('Socket is dead')
- except Exception, d:
- self._log.info(self._logMessages.SocketDead)
- self.close()
- if self._errorHandler is not None:
- self._errorHandler(SocketError, d)
- raise SocketError(d) #!!
-
- else:
- head, sep, tail = line.partition('=')
- msg.params[head] = tail
- if not sep:
- # TODO: chek for invalid messages or not
- pass
-
- return msg
-
-
+
def run(self):
"""Runs the client untill all jobs passed to it are completed
@note: use KeyboardInterrupt to stop prematurely
"""
try:
#n = 0
-
while self.hasJobsRunning():
#n += 1
#if n > 40: break
self.next()
-
except KeyboardInterrupt:
self._log(self._logMessages.KeyboardInterrupt)
self.close()
@@ -1429,7 +1480,7 @@
#########################################################
## boilerplate code to tackle TestDDA
##
- ## ...but I don't trust it ;-) I was not yet alble to wrap my head around
+ ## ...but I don't trust it ;-) I was not yet able to wrap my head around
## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not.
##
## Another problem is that there is no way to know when a directory is no longer
@@ -1498,13 +1549,30 @@
##
#################################################
def peerList(self, synchron=False):
+ """Lists all peers of the node
+ @param synchron: if True, waits untill the call is completed, if False returns emidiately
+ @return: (list) of peers in a synchron, always None in an asynchron call
+
+ @event: EventListPeers(event).
+ @event: EventListNextPeer(event, peer).
+ @event: EventEndListPeers(event).
+ """
job = JobListPeers(self)
self.jobAdd(job, synchron=synchron)
return job.jobResult
def peerNotes(self, peer, synchron=False):
- if pythonBool(peer['opennet']):
+ """Lists all text notes associated to a peer
+ @param peer: peer as returned in a call to L{peerList}
+ @param synchron: if True, waits untill the call is completed, if False returns emidiately
+ @return: (list) of notes in a synchron, always None in an asynchron call
+
+ @event: EventListPeerNotes(event).
+ @event: EventListNextPeerNote(event, note).
+ @event: EventEndListPeerNotes(event).
+ """
+ if pythonBool(peer['opennet']): # opennet peers do not have any notes associated
return []
job = JobListPeerNotes(self, peer['identity'])
self.jobAdd(job, synchron=synchron)
@@ -1517,7 +1585,10 @@
if __name__ == '__main__':
c = FcpClient(name='test', verbosity=logging.DEBUG)
nodeHello = c.connect()
- if nodeHello is not None:
+ if nodeHello is not None or 1:
+
+
+
def foo():
job1 = JobClientHello(c)
c.jobAdd(job1)
@@ -1529,8 +1600,13 @@
# should raise
#foo()
-
+ def foo():
+ job = JobGenerateSSK(c)
+ c.jobAdd(job, synchron=True)
+ print job.jobResult
+ #foo()
+
def foo():
job = JobGenerateSSK(c)
c.jobAdd(job, synchron=True)
@@ -1554,7 +1630,7 @@
for peer in peers:
print c.peerNotes(peer, synchron=True)
- foo()
+ #foo()
def foo():
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|