SF.net SVN: fclient: [5] trunk/fclient/fclient_lib/fcp/fcp20.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <ju...@us...> - 2007-10-16 10:19:40
|
Revision: 5
http://fclient.svn.sourceforge.net/fclient/?rev=5&view=rev
Author: jurner
Date: 2007-10-16 03:19:24 -0700 (Tue, 16 Oct 2007)
Log Message:
-----------
continued working on FcpClient
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp20.py
Modified: trunk/fclient/fclient_lib/fcp/fcp20.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-15 17:42:24 UTC (rev 4)
+++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-16 10:19:24 UTC (rev 5)
@@ -1,9 +1,15 @@
-
+
+import atexit
+import logging
import os
import socket
+import subprocess
+import sys
import time
import thread
import uuid
+
+logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
#**************************************************************
# consts
#**************************************************************
@@ -13,7 +19,6 @@
DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
except: pass
SocketTimeout = 0.1
-KeyTypes = ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@')
class JobIdentifiers:
# fixed job identifiers
@@ -200,6 +205,13 @@
fp.close()
return read
+def saveRemoveFile(fpath):
+ if fpath is not None:
+ if os.path.isfile(fpath):
+ os.remove(fpath)
+ return True
+ return False
+
def saveWriteFile(fpath, data):
"""Writes data to a file i the savest manner possible
@@ -221,6 +233,22 @@
fp.close()
return written
+def startFreenet(cmdline):
+ """Starts freenet
+ @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat')
+ @return: (string) whatever freenet returns
+ """
+ #TODO: on windows it may be necessary to hide the comand window
+ p = subprocess.Popen(
+ args=cmdline,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ stdout, stderr = p.communicate()
+ return stdout
+
+
def fcpBool(pythonBool):
"""Converts a python bool to a fcp bool
@param pythonBool: (bool)
@@ -237,6 +265,95 @@
"""
return fcpBool == 'true'
+
+def uriToList(uri):
+ """Splits a freenet uri into its components
+ @param uri: (str) uri to split
+ @return: (list) or components
+ @note: additional dexoration like 'freenet:' or 'http://blah' are ignored
+
+ >>> uriToList('SSK@foo')
+ ['SSK@foo']
+
+ >>> uriToList('SSK@foo/bar/baz')
+ ['SSK@foo', 'bar', 'baz']
+
+ >>> uriToList('freenet:SSK@foo')
+ ['SSK@foo']
+
+ >>> uriToList('http://foo/SSK@foo')
+ ['SSK@foo']
+
+ >>> uriToList('http://foo')
+ []
+
+ """
+ if uri.startswith('freenet:'):
+ uri = uri[len('freenet:'): ]
+
+ components = []
+ head = uri
+ while head:
+ head, tail = posixpath.split(head)
+ components.append(tail)
+
+ components.reverse()
+
+ while components:
+ if components[0][:4] in FcpKeys.KeysAll:
+ break
+ else:
+ components.pop(0)
+
+ return components
+
+def uriKeyType(uri):
+ pass
+
+
+def isValidUri(uri):
+ """Checks if an uri is a valid freenet uri
+ @param uri: (str) uri to check
+ @reuturn: (bool)
+ """
+ return bool(UriToList(uri))
+
+
+def splitUri(uri):
+ """Splits an uri into uri and filename
+ @param uri: uri to split
+ @return: tuple(uri, filename)
+
+ >>> splitUri('SSK@foo/bar/baz')
+ ('SSK@foo', 'bar/baz')
+
+ >>> splitUri('SSK@foo')
+ ('SSK@foo', '')
+
+ >>> splitUri('NoUri')
+ ('NoUri', '')
+
+ """
+ L = uriToList(uri)
+ if not L:
+ return (uri, '')
+ names = L[1:]
+ if not names:
+ name = ''
+ else:
+ name = '/'.join(names)
+ return (L[0], name)
+
+
+def fileNameFromUri(uri):
+ """Returns the filename part of an uri
+ @return: (str) filename. If no filename is found the uri is returned unchanged
+ """
+ tmp_uri, name = splitUri(uri)
+ if name:
+ return name
+ return uri
+
#**********************************************************************
# classes
#**********************************************************************
@@ -303,7 +420,6 @@
#**************************************************************************
# jobs
#**************************************************************************
-#TODO: do somrthing that this class does not lock the queue
class JobBase(object):
"""Base class for jobs"""
@@ -311,18 +427,28 @@
def __init__(self, fcpClient, identifier, message):
- self.fcpClient = fcpClient
- self.fcpIdentifier = identifier
- self.fcpMessage = message
- self.fcpResult = None
- self.fcpTime = 0
+ self.fcpClient = fcpClient # FcpClient() instance the job belongs to
+ self.fcpError = None # last error (either this is set or dcpResult)
+ self.fcpIdentifier = identifier #
+ self.fcpMessage = message # message send to node
+ self.fcpResult = None # job result
+ self.fcpTime = 0 # start time (will hld duration whern the job is complte)
+ def displayName(self):
+ return 'JobBase'
+
def start(self):
self.fcpTime = time.time()
self.fcpClient.sendMessageEx(self.fcpMessage)
+ def error(self, msg):
+ self.fcpTime = time.time() - self.fcpTime
+ self.fcpError = msg
+ self.fcpResult = None
+
def stop(self, result):
self.fcpTime = time.time() - self.fcpTime
+ self.fcpError = None
self.fcpResult = result
@@ -338,8 +464,10 @@
)
JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
+ def displayName(self):
+ return 'NodeHello'
+
-
class JobListPeers(JobBase):
_fcp_auto_remove_ = True
@@ -353,15 +481,21 @@
JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
+ def displayName(self):
+ return 'ListPeers'
+
def handlePeer(self,msg):
pass
-class JobFileInfo(JobBase):
+class JobGetFileInfo(JobBase):
+ """"""
_fcp_auto_remove_ = False
+
+ # idea is to provoke a GetFailed message and take mimetype and size from it
def __init__(self, fcpClient, uri, **params):
"""
@param fcpClient: FcpClient() instance
@@ -378,7 +512,7 @@
Messages.ClientGet,
Identifier=identifier,
URI=uri,
- MaxSize='ase0',
+ MaxSize='0',
ReturnType='none',
Verbosity='1',
**params
@@ -386,36 +520,37 @@
JobBase.__init__(self, fcpClient, identifier, message)
+ def displayName(self):
+ return 'GetFileInfo'
+
def handleProgress(self, msg):
pass
- def stop(self, msg):
- JobBase.stop(self, msg)
- error = result = None
+ def error(self, msg):
+ JobBase.error(self, msg)
if msg.name == Messages.GetFailed:
if msg['Code'] == FetchErrors.TooBig:
- result = (
+ self.fcpError = None
+ self.fcpResult = (
msg.get('ExpectedMetadata.ContentType', ''),
msg.get('ExpectedDataLength', '')
)
- else:
- error, result = msg['Code'], msg
-
- elif msg.name == Messages.DataFound:
- result = (
+ #else:
+ # raise ValueError('Unhandled message: %s' % msg.name)
+
+
+ def stop(self, msg):
+ JobBase.stop(self, msg)
+ if msg.name == Messages.DataFound:
+ self.fcpResult = (
msg.get('Metadata.ContentType', ''),
msg.get('DataLength', '')
)
-
- elif msg.name == Messages.ProtocolError:
- error, result = msg['Code'], msg
-
else:
raise ValueError('Unhandled message: %s' % msg.name)
-
- self.fcpResult = error, result
+
#TODO: handle case where directories are registered multiple times
class JobTestDDA(JobBase):
@@ -432,7 +567,9 @@
JobBase.__init__(self, fcpClient, directory, message)
self.fcpTmpFile = None
-
+ def displayName(self):
+ return 'TestDDA'
+
def handleTestDDAReply(self, msg):
fpathWrite = msg.params.get('WriteFilename', None)
fpathRead = msg.params.get('ReadFilename', None)
@@ -456,19 +593,58 @@
ReadContent=readContent,
)
+ def error(self, msg):
+ JobBase.error(self, msg)
+ saveRemoveFile(self.fcpTmpFile)
+ self.fcpTmpFile = None
+
+
def stop(self, msg):
JobBase.stop(self, msg)
- if self.fcpTmpFile is not None:
- if os.path.isfile(self.fcpTmpFile):
- os.remove(self.fcpTmpFile)
+ saveRemoveFile(self.fcpTmpFile)
+ self.fcpTmpFile = None
#**************************************************************************
# fcp client
#**************************************************************************
+class LogMessages:
+ Connecting = 'Connecting to node...'
+ Connected = 'Connected to node'
+ ConnectionRetry = 'Connecting to node failed... retrying'
+ ConnectingFailed = 'Connecting to node failed'
+
+ ClientClose = 'Closing client'
+
+ MessageSend = 'Message send'
+ MessageReceived = 'Message received'
+
+ JobStart = 'Starting job: '
+ JobStop = 'Stopping job: '
+ JobsCompleted = 'All jobs completed'
+
+ KeyboardInterrupt = 'Keyboard interrupt'
+ SocketDead = 'Socket is dead'
+
+
+#TODO: no idea what happens on reconnect if socket died. What about running jobs?
+#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
class FcpClient(object):
- def __init__(self):
-
+ def __init__(self,
+ name='',
+ errorHandler=None,
+ verbosity=logging.WARNING,
+ logMessages=LogMessages
+ ):
+ """
+ @param name: name of the client instance or '' (for debugging)
+ @param errorHandler: will be called if the socket conncetion to the node is dead
+ with two params: FcpSocketError + details. When the handler is called the client
+ is already closed.
+ @param verbosity: verbosity level for debugging
+ @param logMessages: LogMessages class containing messages
+ """
+
self._isConnected = False
self._jobs = {
'all': {},
@@ -476,24 +652,34 @@
'running': [],
'complete': [],
}
+ self._errrorHandler = errorHandler
+ self._log = logging.getLogger('FcpClient20:%s' % name)
+ self._logMessages = logMessages
self._lock = thread.allocate_lock()
self._socket = None
-
+ self.setVerbosity(verbosity)
+ atexit.register(self.close)
def close(self):
+ self._log.info(self._logMessages.ClientClose)
if self._socket is not None:
self._socket.close()
self._socket = None
+ #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
+
+ self._log.info(self._logMessages.Connecting)
+
# poll untill freenet responds
time_elapsed = 0
while time_elapsed <= repeat:
# try to Connect socket
- self.close()
+ if self._socket is not None:
+ self.close()
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(SocketTimeout)
try:
@@ -501,19 +687,19 @@
except Exception, d:
pass
else:
- #self._isConnected = True
+ self._log.info(self._logMessages.Connected)
return True
+ self._log.info(self._logMessages.ConnectionRetry)
+
# continue polling
time_elapsed += timeout
time.sleep(timeout)
+ self._log.info(self._logMessages.ConnectingFailed)
return False
- #def __nonzero__(self):
- # return self._isConnected
-
def addJob(self, job):
self._lock.acquire(True)
try:
@@ -523,9 +709,12 @@
self._jobs['running'].append(job)
finally:
self._lock.release()
+
+ self._log.info(self._logMessages.JobStart + job.displayName())
job.start()
- def finishJob(self, identifier, msg):
+
+ def stopJob(self, identifier, msg, error=False):
self._lock.acquire(True)
try:
job = self._jobs['all'].get(identifier, None)
@@ -540,7 +729,11 @@
if job is None:
raise ValueError('No such job: %r' % identifier)
- job.stop(msg)
+ self._log.info(self._logMessages.JobStop + job.displayName())
+ if error:
+ job.error(msg)
+ else:
+ job.stop(msg)
def notifyJob(self, identifier, handler, msg):
@@ -558,27 +751,25 @@
# TODO:
# x. push pending jobs
- # x. on error stop this thingy
-
- n = 0
- while True:
- if not self._lock.acquire(False):
- continue
+ try:
+ while True:
+ if not self._lock.acquire(False):
+ continue
+
+ try:
+ if not self._jobs['pending'] and not self._jobs['running']:
+ self._log.info(self._logMessages.JobsCompleted)
+ break
+ finally:
+ self._lock.release()
+
+ self.next()
+ except KeyboardInterrupt:
+ self._log(self._logMessages.KeyboardInterrupt)
+ self.close()
- try:
- if not self._jobs['pending'] and not self._jobs['running']:
- break
- finally:
- self._lock.release()
-
- msg = self.readMessage()
- self.handleMessage(msg)
-
-
- n += 1
- if n > 50: break
-
+ #TODO: some info when all jobs are completed
def next(self):
msg = self.readMessage()
self.handleMessage(msg)
@@ -586,33 +777,32 @@
def handleMessage(self, msg):
- print msg.pprint()
-
+ self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
+
if msg.name == Messages.NodeHello:
#connectionIdentifier = msg['ConnectionIdentifier']
- self.finishJob(JobIdentifiers.ClientHello, msg)
+ self.stopJob(JobIdentifiers.ClientHello, msg)
elif msg.name == Messages.ProtocolError:
code = msg['Code']
-
if code == ProtocolErrors.NoLateClientHellos:
- self.finishJob(JobIdentifiers.ClientHello, msg)
+ self.stopJob(JobIdentifiers.ClientHello, msg, error=True)
else:
identifier = msg.get('Identifier', None)
if identifier is None:
pass # raise ???
else:
- self.finishJob(identifier, msg)
+ self.stopJob(identifier, msg, error=True)
elif msg.name == Messages.Peer:
self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg)
elif msg.name == Messages.EndListPeers:
- self.finishJob(IdentifierListPeers, msg)
+ self.stopJob(IdentifierListPeers, msg)
elif msg.name == Messages.GetFailed:
- self.finishJob(msg['Identifier'], msg)
+ self.stopJob(msg['Identifier'], msg, error=True)
elif msg.name == Messages.SimpleProgress:
self.notifyJob(msg['Identifier'], 'handleProgress', msg)
@@ -621,7 +811,7 @@
self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg)
elif msg.name == Messages.TestDDAComplete:
- self.finishJob(msg['Directory'], msg)
+ self.stopJob(msg['Directory'], msg)
elif msg.name == Messages.IdentifierCollision:
pass
@@ -642,6 +832,10 @@
msg = MessageSocketTimeout()
break
except Exception, d:
+ self._log.info(self._logMessages.SocketDead)
+ self.close()
+ if self._errorHandler is not None:
+ self._errorHandler(FcpSocketError, d)
raise FcpSocketError(d) #!!
if p == '\r': # ignore
@@ -662,7 +856,12 @@
n = int(msg.params['DataLength'])
try:
msg.data = self._socket.recv(n)
+ if not msg.data: raise ValueError('Socket is dead')
except Exception, d:
+ self._log.info(self._logMessages.SocketDead)
+ self.close()
+ if self._errorHandler is not None:
+ self._errorHandler(FcpSocketError, d)
raise FcpSocketError(d) #!!
else:
@@ -674,8 +873,13 @@
return msg
-
+ def setLogMessages(self, logMessages):
+ self._logMessages = logMessages
+
+ def setVerbosity(self, verbosity):
+ self._log.setLevel(verbosity)
+
def sendMessage(self, name, data=None, **params):
"""Sends a message to freenet
@param name: name of the message to send
@@ -691,25 +895,50 @@
@param msg: (Message) message to send
@return: Message
"""
- #self.log.info('SendMessage\n' + msg.pprint())
rawMsg = msg.toString()
+ self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint())
try:
self._socket.sendall(rawMsg)
except Exception, d:
+ self._log.info(self._logMessages.SocketDead)
+ self.close()
+ if self._errorHandler is not None:
+ self._errorHandler(FcpSocketError, d)
raise FcpSocketError(d)
- #TODO: allow for an error handler to handle
return msg
#*****************************************************************************
#
#*****************************************************************************
if __name__ == '__main__':
- c = FcpClient()
+ c = FcpClient(name='test', verbosity=logging.DEBUG)
if c.connect():
job1 = JobNodeHello(c)
c.addJob(job1)
c.run()
print '---------------------------'
- print job1.fcpResult.pprint()
+ print job1.fcpError
+ print job1.fcpResult
+ print job1.fcpTime
+ print '---------------------------'
+
+
+ job2 = JobTestDDA(c, os.path.dirname(__file__))
+ c.addJob(job2)
+ c.run()
+ print '---------------------------'
+ print job1.fcpError
+ print job2.fcpResult
+ print job2.fcpTime
+ print '---------------------------'
+
+ job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
+ c.addJob(job2)
+ c.run()
+ print '---------------------------'
+ print job1.fcpError
+ print job2.fcpResult
+ print job2.fcpTime
+ print '---------------------------'
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|