SF.net SVN: fclient: [8] trunk/fclient/fclient_lib/fcp
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <jU...@us...> - 2007-10-20 10:02:24
|
Revision: 8
http://fclient.svn.sourceforge.net/fclient/?rev=8&view=rev
Author: jUrner
Date: 2007-10-20 03:02:28 -0700 (Sat, 20 Oct 2007)
Log Message:
-----------
renamed to be able to map to fcp version numbers to modules
Added Paths:
-----------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Removed Paths:
-------------
trunk/fclient/fclient_lib/fcp/fcp20.py
Deleted: trunk/fclient/fclient_lib/fcp/fcp20.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:00:20 UTC (rev 7)
+++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-20 10:02:28 UTC (rev 8)
@@ -1,1098 +0,0 @@
-
-import atexit
-import logging
-import os
-import re
-import socket
-import subprocess
-import sys
-import time
-import thread
-import uuid
-
-logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
-#**************************************************************
-# consts
-#**************************************************************
-NameClient = 'Fcp20Client'
-DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
-DefaultFcpPort = 9481
-try:
- DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
-except: pass
-SocketTimeout = 0.1
-
-class JobIdentifiers:
- """Fixed job identifiers
- @note: he client can only handle one job of these at a time
- """
- ClientHello = 'ClientHello'
- ListPeers = 'ListPeers'
-
-class Messages:
- """All messages supported by the client"""
-
- # client messages
- ClientHello = 'ClientHello'
- ListPeer = 'ListPeer' # (since 1045)
- ListPeers = 'ListPeers'
- ListPeerNotes = 'ListPeerNotes'
- AddPeer = 'AddPeer'
- ModifyPeer = 'ModifyPeer'
- ModifyPeerNote = 'ModifyPeerNote'
- RemovePeer = 'RemovePeer'
- GetNode = 'GetNode'
- GetConfig = 'GetConfig' # (since 1027)
- ModifyConfig = 'ModifyConfig' # (since 1027)
- TestDDARequest = 'TestDDARequest' # (since 1027)
- TestDDAResponse = 'TestDDAResponse' # (since 1027)
- GenerateSSK = 'GenerateSSK'
- ClientPut = 'ClientPut'
- ClientPutDiskDir = 'ClientPutDiskDir'
- ClientPutComplexDir = 'ClientPutComplexDir'
- ClientGet = 'ClientGet'
- SubscribeUSK = 'SubscribeUSK'
- WatchGlobal = 'WatchGlobal'
- GetRequestStatus = 'GetRequestStatus'
- ListPersistentRequests = 'ListPersistentRequests'
- RemovePersistentRequest = 'RemovePersistentRequest'
- ModifyPersistentRequest = 'ModifyPersistentRequest'
- Shutdown = 'Shutdown'
-
- # node messages
- NodeHello = 'NodeHello'
- CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
- Peer = 'Peer'
- PeerNote = 'PeerNote'
- EndListPeers = 'EndListPeers'
- EndListPeerNotes = 'EndListPeerNotes'
- PeerRemoved = 'PeerRemoved'
- NodeData = 'NodeData'
- ConfigData = 'ConfigData' # (since 1027)
- TestDDAReply = 'TestDDAReply' # (since 1027)
- TestDDAComplete = 'TestDDAComplete' # (since 1027)
- SSKKeypair = 'SSKKeypair'
- PersistentGet = 'PersistentGet'
- PersistentPut = 'PersistentPut'
- PersistentPutDir = 'PersistentPutDir'
- URIGenerated = 'URIGenerated'
- PutSuccessful = 'PutSuccessful'
- PutFetchable = 'PutFetchable'
- DataFound = 'DataFound'
- AllData = 'AllData'
- StartedCompression = 'StartedCompression'
- FinishedCompression = 'FinishedCompression'
- SimpleProgress = 'SimpleProgress'
- EndListPersistentRequests = 'EndListPersistentRequests'
- PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
- PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
- PutFailed = 'PutFailed'
- GetFailed = 'GetFailed'
- ProtocolError = 'ProtocolError'
- IdentifierCollision = 'IdentifierCollision'
- UnknownNodeIdentifier = 'UnknownNodeIdentifier'
- UnknownPeerNoteType = 'UnknownPeerNoteType'
- SubscribedUSKUpdate = 'SubscribedUSKUpdate'
-
-
-class Priorities:
- """All priorities supported by the client"""
-
- Maximum = 0
- Interactive = 1
- SemiInteractive = 2
- Updatable = 3
- Bulk = 4
- Prefetch = 5
- Minimum = 6
-
- PriorityMin = Minimum
- PriorityDefault = Bulk
-
-
-# errors
-
-class FetchErrors:
- """All fetch errors supported by the client"""
-
- MaxArchiveRecursionExceeded = '1'
- UnknownSplitfileMetadata = '2'
- UnknownMetadata = '3'
- InvalidMetadata = '4'
- ArchiveFailure = '5'
- BlockDecodeError = '6'
- MaxMetadataLevelsExceeded = '7'
- MaxArchiveRestartsExceeded = '8'
- MaxRecursionLevelExceeded = '9'
- NotAnArchve = '10'
- TooManyMetastrings = '11'
- BucketError = '12'
- DataNotFound = '13'
- RouteNotFound = '14'
- RejectedOverload = '15'
- TooManyRedirects = '16'
- InternalError = '17'
- TransferFailed = '18'
- SplitfileError = '19'
- InvalidUri = '20'
- TooBig = '21'
- MetadataTooBig = '22'
- TooManyBlocks = '23'
- NotEnoughMetastrings = '24'
- Canceled = '25'
- ArchiveRestart = '26'
- PermanentRedirect = '27'
- NotAllDataFound = '28'
-
-
-class InsertErrors:
- """All insert errors supported by the client"""
-
- InvalidUri = '1'
- BucketError = '2'
- InternalError = '3'
- RejectedOverload = '4'
- RouteNotFound = '5'
- FatalErrorInBlocks = '6'
- TooManyRetriesInBlock = '7'
- RouteReallyNotFound = '8'
- Collision = '9'
- Canceled = '10'
-
-
-class ProtocolErrors:
- """All protocol errors supported by the client"""
-
- ClientHelloMustBeFirst = '1'
- NoLateClientHellos = '2'
- MessageParseError = '3'
- UriParseError = '4'
- MissingField = '5'
- ErrorParsingNumber = '6'
- InvalidMessage = '7'
- InvalidField = '8'
- FileNotFound = '9'
- DiskTargetExists = '10'
- SameDirectoryExpected = '11'
- CouldNotCreateFile = '12'
- CouldNotWriteFile = '13'
- CouldNotRenameFile = '14'
- NoSuchIdentifier = '15'
- NotSupported = '16'
- InternalError = '17'
- ShuttingDown = '18'
- NoSuchNodeIdentifier = '19' # Unused since 995
- UrlParseError = '20'
- ReferenceParseError = '21'
- FileParseError = '22'
- NotAFile = '23'
- AccessDenied = '24'
- DDADenied = '25'
- CouldNotReadFile = '26'
- ReferenceSignature = '27'
- CanNotPeerWithSelf = '28'
- PeerExists = '29'
- OpennetDisabled = '30'
- DarknetOnly = '31'
-
-#**********************************************************************
-# functions
-#**********************************************************************
-def newIdentifier():
- """Returns a new unique identifier
- @return: (str) uuid
- """
- return str(uuid.uuid4())
-
-def saveReadFile(fpath):
- """Reads contents of a file in the savest manner possible
- @param fpath: file to write
- @return: contents of the file if successful, None otherwise
- """
- read = None
- try:
- fp = open(fpath, 'rb')
- except: pass
- else:
- try:
- read = fp.read()
- except: pass
- fp.close()
- return read
-
-def saveRemoveFile(fpath):
- """Savely removes a file
- @param fpath: filepath of the file to remove or None
- @return: True if the file was removed, False otherwise
- """
- if fpath is not None:
- if os.path.isfile(fpath):
- os.remove(fpath)
- return True
- return False
-
-
-def saveWriteFile(fpath, data):
- """Writes data to a file i the savest manner possible
- @param fpath: file to write
- @param data: data to write to file
- @return: True if successful, False otherwise
- """
- written = False
- try:
- fp = open(fpath, 'wb')
- except: pass
- else:
- try:
- fp.write(data)
- written = True
- except:
- fp.Close()
- else:
- fp.close()
- return written
-
-def startFreenet(cmdline):
- """Starts freenet
- @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat')
- @return: (string) whatever freenet returns
- """
- #TODO: on windows it may be necessary to hide the comand window
- p = subprocess.Popen(
- args=cmdline,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- stdout, stderr = p.communicate()
- return stdout
-
-
-def fcpBool(pythonBool):
- """Converts a python bool to a fcp bool
- @param pythonBool: (bool)
- @return: (str) 'true' or 'false'
- """
- if pythonBool:
- return 'true'
- return 'false'
-
-def pythonBool(fcpBool):
- """Converts a fcp bool to a python bool
- @param pythonBool: 'true' or 'false'
- @return: (bool) True or False
- """
- return fcpBool == 'true'
-
-#**********************************************************************
-# classes
-#**********************************************************************
-class FcpSocketError(Exception): pass
-class FcpProtocolError(Exception):
- def __init__(self, msg):
- """
- @param msg: (Message) ProtocolError message
- """
- self.value = '%s (%s, %s)' % (
- msg.get('CodeDescription', 'Unknown error') ,
- msg['Code'],
- msg.get('ExtraDescription', '...'),
- )
-
- def __str__(self): return self.value
-
-
-
-class FcpUri(object):
- """Wrapper class for freenet uris"""
-
-
- KeySSK = 'SSK@'
- KeyKSK = 'KSK@'
- KeyCHK = 'CHK@'
- KeyUSK = 'USK@'
- KeySVK = 'SVK@'
- KeyUnknown = ''
- KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK)
-
- ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I)
- ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I)
-
-
- def __init__(self, uri):
- """
- @param uri: uri to wrap
- @param cvar ReUriPattern: pattern matching a freenet uri
- @param cvar ReKeyPattern: pattern matching the key type of a freenet uri
-
- @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible
-
-
- >>> uri = FcpUri('freenet:SSK@foo/bar')
- >>> str(uri)
- 'SSK@foo/bar'
- >>> uri.keyType() == FcpUri.KeySSK
- True
- >>> uri.split()
- ('SSK@foo', 'bar')
- >>> uri.fileName()
- 'bar'
-
- >>> uri = FcpUri('http://SSK@foo/bar')
- >>> str(uri)
- 'SSK@foo/bar'
-
- # uris not containing freenet keys are left unchanged
- >>> uri = FcpUri('http://foo/bar')
- >>> str(uri)
- 'http://foo/bar'
- >>> uri.keyType() == FcpUri.KeyUnknown
- True
- >>> uri.split()
- ('http://foo/bar', '')
- >>> uri.fileName()
- 'http://foo/bar'
-
- """
- self._uri = uri
-
- result = self.ReUriPattern.search(uri)
- if result is not None:
- self._uri = result.group(0)
-
- def __str__(self):
- return str(self._uri)
-
- def __unicode__(self):
- return unicode(self._uri)
-
- def keyType(self):
- """Retuns the key type of the uri
- @return: one of the Key* consts
- """
- result = self.ReKeyPattern.search(self._uri)
- if result is not None:
- return result.group(0).upper()
- return self.KeyUnknown
-
- def split(self):
- """Splits the uri
- @return: tuple(freenet-key, file-name)
- """
- if self.keyType() != self.KeyUnknown:
- head, sep, tail = self._uri.partition('/')
- return head, tail
- return self._uri, ''
-
- def fileName(self):
- """Returns the filename part of the uri
- @return: str
- """
- head, tail = self.split()
- if tail:
- return tail
- return self._uri
-
-
-class Message(object):
- """Class wrapping a freenet message"""
-
- Name = 'UMessage'
-
-
- def __init__(self, name, data=None, **params):
- """
- @param name: messge name
- @param data: data associated to the messge (not yet implemented)
- @param params: {field-name: value, ...} of parameters of the message
- @note: all params can be accessed as attributes of the class
- """
- self.data = data
- self.name = name
- self.params = params
-
-
- def toString(self):
- """Returns a string with the formated message, ready to be send"""
-
- # TODO: "Data" not yet implemented
- out = [self.name, ]
- for param, value in self.params.items():
- out.append('%s=%s' % (param, value))
- out.append('EndMessage\n')
- return '\n'.join(out)
-
-
- def pprint(self):
- """Returns the message as nicely formated human readable string"""
-
- out = [self.name, ]
- for param, value in self.params.items():
- out.append(' %s=%s' % (param, value))
- out.append('EndMessage')
- out.append('')
- return '\n'.join(out)
-
- def __getitem__(self, name):
- """Returns the message parameter 'name' """
- return self.params[name]
-
- def get(self, name, default=None):
- """Returns the message parameter 'name' or 'default' """
- return self.params.get(name, default)
-
- def __setitem__(self, name, value):
- """Sets the message parameter 'name' to 'value' """
- self.params[name] = value
-
-
-class MessageSocketTimeout(Message):
-
- Name = 'USocketTimeout'
-
- def __init__(self):
- Message.__init__(self, self.Name)
-
-
-
-#**************************************************************************
-# jobs
-#**************************************************************************
-class JobBase(object):
- """Base class for jobs"""
-
- _fcp_auto_remove_ = True
-
- def __init__(self, fcpClient, identifier, message):
- """
- @param fcpClient: FcpClient() instance
- @param identifier: (str) identifier of the job
- @param message: (Message) to send to the node whne the job ist started
- @ivar fcpClient: FcpClient() instance
- @ivar fcpError: holding the error message if an error was encountered while running
- the job, None otherwise
- @ivar fcpIdentifier: identifier of the job
- @ivar fcpMessage: initial message send to the node
- @ivar fcpResult: if no error was encountered, holding the result of the job when complete
- @ivar fcpTime: when the job is complete, holding the time the job took to complete
- """
-
- self.fcpClient = fcpClient # FcpClient() instance the job belongs to
- self.fcpError = None # last error (either this is set or dcpResult)
- self.fcpIdentifier = identifier #
- self.fcpMessage = message # message send to node
- self.fcpResult = None # job result
- self.fcpTimeStart = 0 # time the job was started
- self.fcpTimeStop = 0 # time the job was stopped
- self.fcpStopped = False
-
- def displayName(self):
- """Returns the display name of the job
- @return: (str) display name
- """
- return 'JobBase'
-
- def start(self):
- """Starts the job"""
- self.fcpStopped = False
- self.fcpTimeStart = time.time()
- self.fcpClient.sendMessageEx(self.fcpMessage)
-
- def error(self, msg):
- """Called on job completion if an error was encounterd while runnng the job
- @param msg: (Message) to pass to the job
- """
- self.fcpStopped = True
- self.fcpTimeStop = time.time()
- self.fcpError = msg
- self.fcpResult = None
-
- def stop(self, msg):
- """Called on job completion to stop the job
- @param msg: (Message) to pass to the job
- """
- self.fcpStopped = True
- self.fcpTimeStop = time.time()
- self.fcpError = None
- self.fcpResult = msg
-
-
-class JobClientHello(JobBase):
- """Sed a ClientHello message to the node
-
- @note: this must be the first message passed to the node. If everything
- goes well, you will get a NodeHello in response.
- """
-
- _fcp_auto_remove_ = True
-
- def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
- """
- @param name: (str) connection name or None, to use an arbitrary name
- @param expectedVersion: (str) node version expected
- """
- message = Message(
- Messages.ClientHello,
- Name=name if name is not None else newIdentifier(),
- ExpectedVersion=expectedVersion,
- )
- JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
-
- def displayName(self):
- return 'ClientHello'
-
-
-class JobListPeers(JobBase):
- """Lists all known peers of the node
- """
-
- _fcp_auto_remove_ = True
-
- def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
- message = Message(
- Messages.ListPeers,
- WithMetadata=fcpBool(withMetaData),
- WithVolatile=fcpBool(withVolantile),
- )
- JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
-
-
- def displayName(self):
- return 'ListPeers'
-
- def handlePeer(self,msg):
- """Handles the next peer send by the node in form of a 'Peer' message
- while the job is running. Overwrite to process.
- """
-
-
-
-class JobGetFileInfo(JobBase):
- """Tries to retieve information about a file. If everything goes well
-
- On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
- Note, that both members may be '' (empty string)
-
- """
-
- _fcp_auto_remove_ = False
-
-
- # idea is to provoke a GetFailed message and take mimetype and size from it
- def __init__(self, fcpClient, uri, **params):
- """
- @param fcpClient: FcpClient() instance
- @param uri: uri of the file to retrieve info for
- @param params: additional parameters:
- IgnoreDS='true' / 'false'
- DSOnly='true' / 'false'
- MaxRetries=-1 ...N
- PriorityClass=Priority*
-
- """
- identifier = newIdentifier()
- message = Message(
- Messages.ClientGet,
- Identifier=identifier,
- URI=uri,
- MaxSize='0',
- ReturnType='none',
- Verbosity='1',
- **params
- )
- JobBase.__init__(self, fcpClient, identifier, message)
-
-
- def displayName(self):
- return 'GetFileInfo'
-
- def handleProgress(self, msg):
- """Handles the next progress made. Overwrite to process.
- """
-
-
- def error(self, msg):
- JobBase.error(self, msg)
- if msg.name == Messages.GetFailed:
- if msg['Code'] == FetchErrors.TooBig:
- self.fcpError = None
- self.fcpResult = (
- msg.get('ExpectedMetadata.ContentType', ''),
- msg.get('ExpectedDataLength', '')
- )
- #else:
- # raise ValueError('Unhandled message: %s' % msg.name)
-
-
- def stop(self, msg):
- JobBase.stop(self, msg)
- if msg.name == Messages.DataFound:
- self.fcpResult = (
- msg.get('Metadata.ContentType', ''),
- msg.get('DataLength', '')
- )
- else:
- raise ValueError('Unhandled message: %s' % msg.name)
-
-
-
-#TODO: handle case where directories are registered multiple times
-class JobTestDDA(JobBase):
- """Tests a directory for read / write accesss
- """
-
- _fcp_auto_remove_ = False
-
- def __init__(self, fcpClient, directory, read=True, write=True):
- if not os.path.isdir(directory):
- raise ValueError('No such directory: %r' % directory)
-
- message = Message(
- Messages.TestDDARequest,
- Directory=directory,
- WantReadDirectory=fcpBool(read),
- WantWriteDirectory=fcpBool(write),
- )
- JobBase.__init__(self, fcpClient, directory, message)
- self.fcpTmpFile = None
-
- def displayName(self):
- return 'TestDDA'
-
- def handleTestDDAReply(self, msg):
- fpathWrite = msg.params.get('WriteFilename', None)
- fpathRead = msg.params.get('ReadFilename', None)
- readContent = ''
- if fpathWrite is not None:
- written = saveWriteFile(fpathWrite, msg['ContentToWrite'])
- if not written:
- if os.path.isfile(fpathWrite):
- os.remove(fpathWrite)
- else:
- self.fcpTmpFile = fpathWrite
-
- if fpathRead is not None:
- readContent = saveReadFile(fpathRead)
- if readContent is None:
- readContent = ''
-
- self.fcpClient.sendMessage(
- Messages.TestDDAResponse,
- Directory=msg['Directory'],
- ReadContent=readContent,
- )
-
-
- def error(self, msg):
- JobBase.error(self, msg)
- saveRemoveFile(self.fcpTmpFile)
- self.fcpTmpFile = None
-
-
- def stop(self, msg):
- JobBase.stop(self, msg)
- saveRemoveFile(self.fcpTmpFile)
- self.fcpTmpFile = None
-
-#**************************************************************************
-# fcp client
-#**************************************************************************
-class LogMessages:
- """Message strings used for log infos"""
- Connecting = 'Connecting to node...'
- Connected = 'Connected to node'
- ConnectionRetry = 'Connecting to node failed... retrying'
- ConnectingFailed = 'Connecting to node failed'
-
- ClientClose = 'Closing client'
-
- MessageSend = 'Message send'
- MessageReceived = 'Message received'
-
- JobStart = 'Starting job: '
- JobStop = 'Stopping job: '
- JobsCompleted = 'All jobs completed'
-
- KeyboardInterrupt = 'Keyboard interrupt'
- SocketDead = 'Socket is dead'
-
-
-#TODO: no idea what happens on reconnect if socket died. What about running jobs?
-#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
-#TODO: no idea if to add support for pending jobs and queue management here
-class FcpClient(object):
- """Fcp client implementation"""
-
-
- def __init__(self,
- name='',
- errorHandler=None,
- verbosity=logging.WARNING,
- logMessages=LogMessages
- ):
- """
- @param name: name of the client instance or '' (for debugging)
- @param errorHandler: will be called if the socket conncetion to the node is dead
- with two params: FcpSocketError + details. When the handler is called the client
- is already closed.
- @param verbosity: verbosity level for debugging
- @param logMessages: LogMessages class containing messages
- """
-
- self._isConnected = False
- self._jobs = {
- 'all': {},
- 'pending': [], # ???
- 'running': [],
- 'complete': [], # ???
- }
- self._errorHandler = errorHandler
- self._log = logging.getLogger(NameClient + ':' + name)
- self._logMessages = logMessages
- self._lock = thread.allocate_lock()
- self._socket = None
-
- self.setVerbosity(verbosity)
- atexit.register(self.close)
-
- def close(self):
- """Closes the client
- @note: make shure to call close() when done with the client
- """
- self._log.info(self._logMessages.ClientClose)
- if self._socket is not None:
- self._socket.close()
- self._socket = None
-
-
- #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
- def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
- """Establishes the connection to a freenet node
- @param host: (str) host of th node
- @param port: (int) port of the node
- @param repeat: (int) how many seconds try to connect before giving up
- @param timeout: (int) how much time to wait before another attempt to connect
- @return: (Message) NodeHello if successful,None otherwise
- """
- self._clientHello = None
- self._log.info(self._logMessages.Connecting)
-
- # poll untill freenet responds
- time_elapsed = 0
- while time_elapsed <= repeat:
-
- # try to Connect socket
- if self._socket is not None:
- self.close()
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.settimeout(SocketTimeout)
- try:
- self._socket.connect((host, port))
- except Exception, d:
- pass
- else:
- self._log.info(self._logMessages.Connected)
- job = JobClientHello(self)
- self.jobAdd(job, synchron=True)
- assert job.fcpError is None, 'Error should have been caught by handleMessage()'
- return job.fcpResult
-
- self._log.info(self._logMessages.ConnectionRetry)
-
- # continue polling
- time_elapsed += timeout
- time.sleep(timeout)
-
- self._log.info(self._logMessages.ConnectingFailed)
- return None
-
-
- def handleMessage(self, msg):
- """Handles the next message from the freenet node
- @param msg: Message() to handle
- """
- self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
-
- if msg.name == Messages.NodeHello:
- #connectionIdentifier = msg['ConnectionIdentifier']
- self.jobStop(JobIdentifiers.ClientHello, msg)
-
- elif msg.name == Messages.ProtocolError:
- code = msg['Code']
- #if code == ProtocolErrors.NoLateClientHellos:
- # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
- #elif code == ProtocolErrors.ClientHelloMustBeFirst:
- # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
- #else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- #TODO: inform caller
- raise FcpProtocolError(msg)
- else:
- self.jobStop(identifier, msg, error=True)
-
- elif msg.name == Messages.Peer:
- self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg)
-
- elif msg.name == Messages.EndListPeers:
- self.jobStop(IdentifierListPeers, msg)
-
- elif msg.name == Messages.GetFailed:
- self.jobStop(msg['Identifier'], msg, error=True)
-
- elif msg.name == Messages.SimpleProgress:
- self.jobNotify(msg['Identifier'], 'handleProgress', msg)
-
- elif msg.name == Messages.TestDDAReply:
- self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
-
- elif msg.name == Messages.TestDDAComplete:
- self.jobStop(msg['Directory'], msg)
-
- elif msg.name == Messages.IdentifierCollision:
- pass
-
-
- def jobAdd(self, job, synchron=False):
- """Adds a job to the client
- @param job: (Job*) job to add
- @param synchron: if True, wait untill the job is completed, if False return emidiately
- """
- self._lock.acquire(True)
- try:
- if job.fcpIdentifier in self._jobs['all']:
- raise ValueError('Duplicate job: %r' % job.identifier)
- self._jobs['all'][job.fcpIdentifier] = job
- self._jobs['running'].append(job)
- finally:
- self._lock.release()
-
- self._log.info(self._logMessages.JobStart + job.displayName())
- job.start()
- if synchron:
- while not job.fcpStopped:
- self.next()
-
-
-
- def jobNotify(self, identifier, handler, msg):
- """Notifies a job about an event while it is running
- @param identifier: identifier of the job to notify
- @param handler: (str) method of the job to call to handle the notification
- @param msg: Message() to pass to the job
- """
- self._lock.acquire(True)
- try:
- job = self._jobs['all'].get(identifier, None)
- finally:
- self._lock.release()
- if job is None:
- raise ValueError('No such job: %r' % identifier)
- getattr(job, handler)(msg)
-
-
- #TODO: quite unclear when to remove a job
- def jobStop(self, identifier, msg, error=False):
- """Stops a job
- @param identifier: identifier of the job to stop
- @param msg: Message() to pass to the job as result
- @param error: set to True to indicate unsuccessful completion of the job, True otherwisse
- """
- self._lock.acquire(True)
- try:
- job = self._jobs['all'].get(identifier, None)
- if job is not None:
- self._jobs['running'].remove(job)
- if job._fcp_auto_remove_:
- del self._jobs['all'][identifier]
- else:
- self._jobs['complete'].append(job)
- finally:
- self._lock.release()
-
- if job is None:
- raise ValueError('No such job: %r' % identifier)
- self._log.info(self._logMessages.JobStop + job.displayName())
- if error:
- job.error(msg)
- else:
- job.stop(msg)
-
-
- #TODO: some info when all jobs are completed
- def next(self):
- """Pumps the next message waiting
- @note: use this method instead of run() to run the client step by step
- """
- msg = self.readMessage()
- self.handleMessage(msg)
- return msg
-
- def readMessage(self):
- """Reads the next message directly from the socket and dispatches it
- @return: (Message) the next message read from the socket
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly
- If an error handler is passed to the client it is called emidiately before the error
- is raised.
- """
- msg = Message(None)
- buf = []
- while True:
-
- try:
- p = self._socket.recv(1)
- if not p: raise ValueError('Socket is dead')
- except socket.timeout, d: # no new messages in queue
- msg = MessageSocketTimeout()
- break
- except Exception, d:
- self._log.info(self._logMessages.SocketDead)
- self.close()
- if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d) #!!
-
- if p == '\r': # ignore
- continue
-
- if p != '\n':
- buf.append(p)
- continue
-
- line = ''.join(buf)
- if line in ('End', "EndMessage"):
- break
- buf = []
-
- if msg.name is None:
- msg.name = line
- elif line == 'Data':
- n = int(msg.params['DataLength'])
- try:
- msg.data = self._socket.recv(n)
- if not msg.data: raise ValueError('Socket is dead')
- except Exception, d:
- self._log.info(self._logMessages.SocketDead)
- self.close()
- if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d) #!!
-
- else:
- head, sep, tail = line.partition('=')
- msg.params[head] = tail
- if not sep:
- # TODO: chek for invalid messages or not
- pass
-
- return msg
-
-
- def run(self):
- """Runs the client untill all jobs passed to it are completed
- @note: use KeyboardInterrupt to stop prematurely
- """
-
- # TODO:
- # x. push pending jobs
- try:
- while True:
- if not self._lock.acquire(False):
- continue
-
- try:
- if not self._jobs['pending'] and not self._jobs['running']:
- self._log.info(self._logMessages.JobsCompleted)
- break
- finally:
- self._lock.release()
-
- self.next()
- except KeyboardInterrupt:
- self._log(self._logMessages.KeyboardInterrupt)
- self.close()
-
-
- def sendMessage(self, name, data=None, **params):
- """Sends a message to freenet
- @param name: name of the message to send
- @param data: data to atatch to the message
- @param params: {para-name: param-calue, ...} of parameters to pass along
- with the message (see freenet protocol)
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly
- If an error handler is passed to the client it is called emidiately before the error
- is raised.
- """
- return self.sendMessageEx(Message(name, data=data, **params))
-
-
- def sendMessageEx(self, msg):
- """Sends a message to freenet
- @param msg: (Message) message to send
- @return: Message
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly.
- If an error handler is passed to the client it is called emidiately before the error
- is raised.
- """
- rawMsg = msg.toString()
- self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint())
- try:
- self._socket.sendall(rawMsg)
- except Exception, d:
- self._log.info(self._logMessages.SocketDead)
- self.close()
- if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d)
- return msg
-
-
- def setLogMessages(self, logMessages):
- """"""
- self._logMessages = logMessages
-
-
- def setVerbosity(self, verbosity):
- """"""
- self._log.setLevel(verbosity)
-
-#*****************************************************************************
-#
-#*****************************************************************************
-if __name__ == '__main__':
- c = FcpClient(name='test', verbosity=logging.DEBUG)
- if c.connect():
- def foo():
- job1 = JobClientHello(c)
- c.jobAdd(job1)
-
- c.run()
- print '---------------------------'
- print job1.fcpError
- print job1.fcpResult
- print job1.fcpTime
- print '---------------------------'
- #foo()
-
-
- def foo():
- d = os.path.dirname(os.path.abspath(__file__))
- job2 = JobTestDDA(c, d)
- c.jobAdd(job2)
- c.run()
- print '---------------------------'
- print job2.fcpError
- print job2.fcpResult
- print job2.fcpTime
- print '---------------------------'
-
- def foo():
- job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
- c.jobAdd(job2)
- c.run()
- print '---------------------------'
- print job2.fcpError
- print job2.fcpResult
- print job2.fcpTime
- print '---------------------------'
- #foo()
Copied: trunk/fclient/fclient_lib/fcp/fcp2_0.py (from rev 7, trunk/fclient/fclient_lib/fcp/fcp20.py)
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py (rev 0)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 10:02:28 UTC (rev 8)
@@ -0,0 +1,1098 @@
+
+import atexit
+import logging
+import os
+import re
+import socket
+import subprocess
+import sys
+import time
+import thread
+import uuid
+
+logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
+#**************************************************************
+# consts
+#**************************************************************
+NameClient = 'Fcp20Client'
+DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
+DefaultFcpPort = 9481
+try:
+ DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
+except: pass
+SocketTimeout = 0.1
+
+class JobIdentifiers:
+ """Fixed job identifiers
+ @note: he client can only handle one job of these at a time
+ """
+ ClientHello = 'ClientHello'
+ ListPeers = 'ListPeers'
+
+class Messages:
+ """All messages supported by the client"""
+
+ # client messages
+ ClientHello = 'ClientHello'
+ ListPeer = 'ListPeer' # (since 1045)
+ ListPeers = 'ListPeers'
+ ListPeerNotes = 'ListPeerNotes'
+ AddPeer = 'AddPeer'
+ ModifyPeer = 'ModifyPeer'
+ ModifyPeerNote = 'ModifyPeerNote'
+ RemovePeer = 'RemovePeer'
+ GetNode = 'GetNode'
+ GetConfig = 'GetConfig' # (since 1027)
+ ModifyConfig = 'ModifyConfig' # (since 1027)
+ TestDDARequest = 'TestDDARequest' # (since 1027)
+ TestDDAResponse = 'TestDDAResponse' # (since 1027)
+ GenerateSSK = 'GenerateSSK'
+ ClientPut = 'ClientPut'
+ ClientPutDiskDir = 'ClientPutDiskDir'
+ ClientPutComplexDir = 'ClientPutComplexDir'
+ ClientGet = 'ClientGet'
+ SubscribeUSK = 'SubscribeUSK'
+ WatchGlobal = 'WatchGlobal'
+ GetRequestStatus = 'GetRequestStatus'
+ ListPersistentRequests = 'ListPersistentRequests'
+ RemovePersistentRequest = 'RemovePersistentRequest'
+ ModifyPersistentRequest = 'ModifyPersistentRequest'
+ Shutdown = 'Shutdown'
+
+ # node messages
+ NodeHello = 'NodeHello'
+ CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
+ Peer = 'Peer'
+ PeerNote = 'PeerNote'
+ EndListPeers = 'EndListPeers'
+ EndListPeerNotes = 'EndListPeerNotes'
+ PeerRemoved = 'PeerRemoved'
+ NodeData = 'NodeData'
+ ConfigData = 'ConfigData' # (since 1027)
+ TestDDAReply = 'TestDDAReply' # (since 1027)
+ TestDDAComplete = 'TestDDAComplete' # (since 1027)
+ SSKKeypair = 'SSKKeypair'
+ PersistentGet = 'PersistentGet'
+ PersistentPut = 'PersistentPut'
+ PersistentPutDir = 'PersistentPutDir'
+ URIGenerated = 'URIGenerated'
+ PutSuccessful = 'PutSuccessful'
+ PutFetchable = 'PutFetchable'
+ DataFound = 'DataFound'
+ AllData = 'AllData'
+ StartedCompression = 'StartedCompression'
+ FinishedCompression = 'FinishedCompression'
+ SimpleProgress = 'SimpleProgress'
+ EndListPersistentRequests = 'EndListPersistentRequests'
+ PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
+ PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
+ PutFailed = 'PutFailed'
+ GetFailed = 'GetFailed'
+ ProtocolError = 'ProtocolError'
+ IdentifierCollision = 'IdentifierCollision'
+ UnknownNodeIdentifier = 'UnknownNodeIdentifier'
+ UnknownPeerNoteType = 'UnknownPeerNoteType'
+ SubscribedUSKUpdate = 'SubscribedUSKUpdate'
+
+
+class Priorities:
+ """All priorities supported by the client"""
+
+ Maximum = 0
+ Interactive = 1
+ SemiInteractive = 2
+ Updatable = 3
+ Bulk = 4
+ Prefetch = 5
+ Minimum = 6
+
+ PriorityMin = Minimum
+ PriorityDefault = Bulk
+
+
+# errors
+
+class FetchErrors:
+ """All fetch errors supported by the client"""
+
+ MaxArchiveRecursionExceeded = '1'
+ UnknownSplitfileMetadata = '2'
+ UnknownMetadata = '3'
+ InvalidMetadata = '4'
+ ArchiveFailure = '5'
+ BlockDecodeError = '6'
+ MaxMetadataLevelsExceeded = '7'
+ MaxArchiveRestartsExceeded = '8'
+ MaxRecursionLevelExceeded = '9'
+ NotAnArchve = '10'
+ TooManyMetastrings = '11'
+ BucketError = '12'
+ DataNotFound = '13'
+ RouteNotFound = '14'
+ RejectedOverload = '15'
+ TooManyRedirects = '16'
+ InternalError = '17'
+ TransferFailed = '18'
+ SplitfileError = '19'
+ InvalidUri = '20'
+ TooBig = '21'
+ MetadataTooBig = '22'
+ TooManyBlocks = '23'
+ NotEnoughMetastrings = '24'
+ Canceled = '25'
+ ArchiveRestart = '26'
+ PermanentRedirect = '27'
+ NotAllDataFound = '28'
+
+
+class InsertErrors:
+ """All insert errors supported by the client"""
+
+ InvalidUri = '1'
+ BucketError = '2'
+ InternalError = '3'
+ RejectedOverload = '4'
+ RouteNotFound = '5'
+ FatalErrorInBlocks = '6'
+ TooManyRetriesInBlock = '7'
+ RouteReallyNotFound = '8'
+ Collision = '9'
+ Canceled = '10'
+
+
+class ProtocolErrors:
+ """All protocol errors supported by the client"""
+
+ ClientHelloMustBeFirst = '1'
+ NoLateClientHellos = '2'
+ MessageParseError = '3'
+ UriParseError = '4'
+ MissingField = '5'
+ ErrorParsingNumber = '6'
+ InvalidMessage = '7'
+ InvalidField = '8'
+ FileNotFound = '9'
+ DiskTargetExists = '10'
+ SameDirectoryExpected = '11'
+ CouldNotCreateFile = '12'
+ CouldNotWriteFile = '13'
+ CouldNotRenameFile = '14'
+ NoSuchIdentifier = '15'
+ NotSupported = '16'
+ InternalError = '17'
+ ShuttingDown = '18'
+ NoSuchNodeIdentifier = '19' # Unused since 995
+ UrlParseError = '20'
+ ReferenceParseError = '21'
+ FileParseError = '22'
+ NotAFile = '23'
+ AccessDenied = '24'
+ DDADenied = '25'
+ CouldNotReadFile = '26'
+ ReferenceSignature = '27'
+ CanNotPeerWithSelf = '28'
+ PeerExists = '29'
+ OpennetDisabled = '30'
+ DarknetOnly = '31'
+
+#**********************************************************************
+# functions
+#**********************************************************************
+def newIdentifier():
+ """Returns a new unique identifier
+ @return: (str) uuid
+ """
+ return str(uuid.uuid4())
+
+def saveReadFile(fpath):
+ """Reads contents of a file in the savest manner possible
+ @param fpath: file to write
+ @return: contents of the file if successful, None otherwise
+ """
+ read = None
+ try:
+ fp = open(fpath, 'rb')
+ except: pass
+ else:
+ try:
+ read = fp.read()
+ except: pass
+ fp.close()
+ return read
+
+def saveRemoveFile(fpath):
+ """Savely removes a file
+ @param fpath: filepath of the file to remove or None
+ @return: True if the file was removed, False otherwise
+ """
+ if fpath is not None:
+ if os.path.isfile(fpath):
+ os.remove(fpath)
+ return True
+ return False
+
+
+def saveWriteFile(fpath, data):
+ """Writes data to a file i the savest manner possible
+ @param fpath: file to write
+ @param data: data to write to file
+ @return: True if successful, False otherwise
+ """
+ written = False
+ try:
+ fp = open(fpath, 'wb')
+ except: pass
+ else:
+ try:
+ fp.write(data)
+ written = True
+ except:
+ fp.Close()
+ else:
+ fp.close()
+ return written
+
+def startFreenet(cmdline):
+ """Starts freenet
+ @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat')
+ @return: (string) whatever freenet returns
+ """
+ #TODO: on windows it may be necessary to hide the comand window
+ p = subprocess.Popen(
+ args=cmdline,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ stdout, stderr = p.communicate()
+ return stdout
+
+
+def fcpBool(pythonBool):
+ """Converts a python bool to a fcp bool
+ @param pythonBool: (bool)
+ @return: (str) 'true' or 'false'
+ """
+ if pythonBool:
+ return 'true'
+ return 'false'
+
+def pythonBool(fcpBool):
+ """Converts a fcp bool to a python bool
+ @param pythonBool: 'true' or 'false'
+ @return: (bool) True or False
+ """
+ return fcpBool == 'true'
+
+#**********************************************************************
+# classes
+#**********************************************************************
+class FcpSocketError(Exception): pass
+class FcpProtocolError(Exception):
+ def __init__(self, msg):
+ """
+ @param msg: (Message) ProtocolError message
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+
+ def __str__(self): return self.value
+
+
+
+class FcpUri(object):
+ """Wrapper class for freenet uris"""
+
+
+ KeySSK = 'SSK@'
+ KeyKSK = 'KSK@'
+ KeyCHK = 'CHK@'
+ KeyUSK = 'USK@'
+ KeySVK = 'SVK@'
+ KeyUnknown = ''
+ KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK)
+
+ ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I)
+ ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I)
+
+
+ def __init__(self, uri):
+ """
+ @param uri: uri to wrap
+ @param cvar ReUriPattern: pattern matching a freenet uri
+ @param cvar ReKeyPattern: pattern matching the key type of a freenet uri
+
+ @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible
+
+
+ >>> uri = FcpUri('freenet:SSK@foo/bar')
+ >>> str(uri)
+ 'SSK@foo/bar'
+ >>> uri.keyType() == FcpUri.KeySSK
+ True
+ >>> uri.split()
+ ('SSK@foo', 'bar')
+ >>> uri.fileName()
+ 'bar'
+
+ >>> uri = FcpUri('http://SSK@foo/bar')
+ >>> str(uri)
+ 'SSK@foo/bar'
+
+ # uris not containing freenet keys are left unchanged
+ >>> uri = FcpUri('http://foo/bar')
+ >>> str(uri)
+ 'http://foo/bar'
+ >>> uri.keyType() == FcpUri.KeyUnknown
+ True
+ >>> uri.split()
+ ('http://foo/bar', '')
+ >>> uri.fileName()
+ 'http://foo/bar'
+
+ """
+ self._uri = uri
+
+ result = self.ReUriPattern.search(uri)
+ if result is not None:
+ self._uri = result.group(0)
+
+ def __str__(self):
+ return str(self._uri)
+
+ def __unicode__(self):
+ return unicode(self._uri)
+
+ def keyType(self):
+ """Retuns the key type of the uri
+ @return: one of the Key* consts
+ """
+ result = self.ReKeyPattern.search(self._uri)
+ if result is not None:
+ return result.group(0).upper()
+ return self.KeyUnknown
+
+ def split(self):
+ """Splits the uri
+ @return: tuple(freenet-key, file-name)
+ """
+ if self.keyType() != self.KeyUnknown:
+ head, sep, tail = self._uri.partition('/')
+ return head, tail
+ return self._uri, ''
+
+ def fileName(self):
+ """Returns the filename part of the uri
+ @return: str
+ """
+ head, tail = self.split()
+ if tail:
+ return tail
+ return self._uri
+
+
+class Message(object):
+ """Class wrapping a freenet message"""
+
+ Name = 'UMessage'
+
+
+ def __init__(self, name, data=None, **params):
+ """
+ @param name: messge name
+ @param data: data associated to the messge (not yet implemented)
+ @param params: {field-name: value, ...} of parameters of the message
+ @note: all params can be accessed as attributes of the class
+ """
+ self.data = data
+ self.name = name
+ self.params = params
+
+
+ def toString(self):
+ """Returns a string with the formated message, ready to be send"""
+
+ # TODO: "Data" not yet implemented
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append('%s=%s' % (param, value))
+ out.append('EndMessage\n')
+ return '\n'.join(out)
+
+
+ def pprint(self):
+ """Returns the message as nicely formated human readable string"""
+
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append(' %s=%s' % (param, value))
+ out.append('EndMessage')
+ out.append('')
+ return '\n'.join(out)
+
+ def __getitem__(self, name):
+ """Returns the message parameter 'name' """
+ return self.params[name]
+
+ def get(self, name, default=None):
+ """Returns the message parameter 'name' or 'default' """
+ return self.params.get(name, default)
+
+ def __setitem__(self, name, value):
+ """Sets the message parameter 'name' to 'value' """
+ self.params[name] = value
+
+
+class MessageSocketTimeout(Message):
+
+ Name = 'USocketTimeout'
+
+ def __init__(self):
+ Message.__init__(self, self.Name)
+
+
+
+#**************************************************************************
+# jobs
+#**************************************************************************
+class JobBase(object):
+ """Base class for jobs"""
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, identifier, message):
+ """
+ @param fcpClient: FcpClient() instance
+ @param identifier: (str) identifier of the job
+ @param message: (Message) to send to the node whne the job ist started
+ @ivar fcpClient: FcpClient() instance
+ @ivar fcpError: holding the error message if an error was encountered while running
+ the job, None otherwise
+ @ivar fcpIdentifier: identifier of the job
+ @ivar fcpMessage: initial message send to the node
+ @ivar fcpResult: if no error was encountered, holding the result of the job when complete
+ @ivar fcpTime: when the job is complete, holding the time the job took to complete
+ """
+
+ self.fcpClient = fcpClient # FcpClient() instance the job belongs to
+ self.fcpError = None # last error (either this is set or dcpResult)
+ self.fcpIdentifier = identifier #
+ self.fcpMessage = message # message send to node
+ self.fcpResult = None # job result
+ self.fcpTimeStart = 0 # time the job was started
+ self.fcpTimeStop = 0 # time the job was stopped
+ self.fcpStopped = False
+
+ def displayName(self):
+ """Returns the display name of the job
+ @return: (str) display name
+ """
+ return 'JobBase'
+
+ def start(self):
+ """Starts the job"""
+ self.fcpStopped = False
+ self.fcpTimeStart = time.time()
+ self.fcpClient.sendMessageEx(self.fcpMessage)
+
+ def error(self, msg):
+ """Called on job completion if an error was encounterd while runnng the job
+ @param msg: (Message) to pass to the job
+ """
+ self.fcpStopped = True
+ self.fcpTimeStop = time.time()
+ self.fcpError = msg
+ self.fcpResult = None
+
+ def stop(self, msg):
+ """Called on job completion to stop the job
+ @param msg: (Message) to pass to the job
+ """
+ self.fcpStopped = True
+ self.fcpTimeStop = time.time()
+ self.fcpError = None
+ self.fcpResult = msg
+
+
+class JobClientHello(JobBase):
+ """Sed a ClientHello message to the node
+
+ @note: this must be the first message passed to the node. If everything
+ goes well, you will get a NodeHello in response.
+ """
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
+ """
+ @param name: (str) connection name or None, to use an arbitrary name
+ @param expectedVersion: (str) node version expected
+ """
+ message = Message(
+ Messages.ClientHello,
+ Name=name if name is not None else newIdentifier(),
+ ExpectedVersion=expectedVersion,
+ )
+ JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
+
+ def displayName(self):
+ return 'ClientHello'
+
+
+class JobListPeers(JobBase):
+ """Lists all known peers of the node
+ """
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
+ message = Message(
+ Messages.ListPeers,
+ WithMetadata=fcpBool(withMetaData),
+ WithVolatile=fcpBool(withVolantile),
+ )
+ JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
+
+
+ def displayName(self):
+ return 'ListPeers'
+
+ def handlePeer(self,msg):
+ """Handles the next peer send by the node in form of a 'Peer' message
+ while the job is running. Overwrite to process.
+ """
+
+
+
+class JobGetFileInfo(JobBase):
+ """Tries to retieve information about a file. If everything goes well
+
+ On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
+ Note, that both members may be '' (empty string)
+
+ """
+
+ _fcp_auto_remove_ = False
+
+
+ # idea is to provoke a GetFailed message and take mimetype and size from it
+ def __init__(self, fcpClient, uri, **params):
+ """
+ @param fcpClient: FcpClient() instance
+ @param uri: uri of the file to retrieve info for
+ @param params: additional parameters:
+ IgnoreDS='true' / 'false'
+ DSOnly='true' / 'false'
+ MaxRetries=-1 ...N
+ PriorityClass=Priority*
+
+ """
+ identifier = newIdentifier()
+ message = Message(
+ Messages.ClientGet,
+ Identifier=identifier,
+ URI=uri,
+ MaxSize='0',
+ ReturnType='none',
+ Verbosity='1',
+ **params
+ )
+ JobBase.__init__(self, fcpClient, identifier, message)
+
+
+ def displayName(self):
+ return 'GetFileInfo'
+
+ def handleProgress(self, msg):
+ """Handles the next progress made. Overwrite to process.
+ """
+
+
+ def error(self, msg):
+ JobBase.error(self, msg)
+ if msg.name == Messages.GetFailed:
+ if msg['Code'] == FetchErrors.TooBig:
+ self.fcpError = None
+ self.fcpResult = (
+ msg.get('ExpectedMetadata.ContentType', ''),
+ msg.get('ExpectedDataLength', '')
+ )
+ #else:
+ # raise ValueError('Unhandled message: %s' % msg.name)
+
+
+ def stop(self, msg):
+ JobBase.stop(self, msg)
+ if msg.name == Messages.DataFound:
+ self.fcpResult = (
+ msg.get('Metadata.ContentType', ''),
+ msg.get('DataLength', '')
+ )
+ else:
+ raise ValueError('Unhandled message: %s' % msg.name)
+
+
+
+#TODO: handle case where directories are registered multiple times
+class JobTestDDA(JobBase):
+ """Tests a directory for read / write accesss
+ """
+
+ _fcp_auto_remove_ = False
+
+ def __init__(self, fcpClient, directory, read=True, write=True):
+ if not os.path.isdir(directory):
+ raise ValueError('No such directory: %r' % directory)
+
+ message = Message(
+ Messages.TestDDARequest,
+ Directory=directory,
+ WantReadDirectory=fcpBool(read),
+ WantWriteDirectory=fcpBool(write),
+ )
+ JobBase.__init__(self, fcpClient, directory, message)
+ self.fcpTmpFile = None
+
+ def displayName(self):
+ return 'TestDDA'
+
+ def handleTestDDAReply(self, msg):
+ fpathWrite = msg.params.get('WriteFilename', None)
+ fpathRead = msg.params.get('ReadFilename', None)
+ readContent = ''
+ if fpathWrite is not None:
+ written = saveWriteFile(fpathWrite, msg['ContentToWrite'])
+ if not written:
+ if os.path.isfile(fpathWrite):
+ os.remove(fpathWrite)
+ else:
+ self.fcpTmpFile = fpathWrite
+
+ if fpathRead is not None:
+ readContent = saveReadFile(fpathRead)
+ if readContent is None:
+ readContent = ''
+
+ self.fcpClient.sendMessage(
+ Messages.TestDDAResponse,
+ Directory=msg['Directory'],
+ ReadContent=readContent,
+ )
+
+
+ def error(self, msg):
+ JobBase.error(self, msg)
+ saveRemoveFile(self.fcpTmpFile)
+ self.fcpTmpFile = None
+
+
+ def stop(self, msg):
+ JobBase.stop(self, msg)
+ saveRemoveFile(self.fcpTmpFile)
+ self.fcpTmpFile = None
+
+#**************************************************************************
+# fcp client
+#**************************************************************************
+class LogMessages:
+ """Message strings used for log infos"""
+ Connecting = 'Connecting to node...'
+ Connected = 'Connected to node'
+ ConnectionRetry = 'Connecting to node failed... retrying'
+ ConnectingFailed = 'Connecting to node failed'
+
+ ClientClose = 'Closing client'
+
+ MessageSend = 'Message send'
+ MessageReceived = 'Message received'
+
+ JobStart = 'Starting job: '
+ JobStop = 'Stopping job: '
+ JobsCompleted = 'All jobs completed'
+
+ KeyboardInterrupt = 'Keyboard interrupt'
+ SocketDead = 'Socket is dead'
+
+
+#TODO: no idea what happens on reconnect if socket died. What about running jobs?
+#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
+#TODO: no idea if to add support for pending jobs and queue management here
+class FcpClient(object):
+ """Fcp client implementation"""
+
+
+ def __init__(self,
+ name='',
+ errorHandler=None,
+ verbosity=logging.WARNING,
+ logMessages=LogMessages
+ ):
+ """
+ @param name: name of the client instance or '' (for debugging)
+ @param errorHandler: will be called if the socket conncetion to the node is dead
+ with two params: FcpSocketError + details. When the handler is called the client
+ is already closed.
+ @param verbosity: verbosity level for debugging
+ @param logMessages: LogMessages class containing messages
+ """
+
+ self._isConnected = False
+ self._jobs = {
+ 'all': {},
+ 'pending': [], # ???
+ 'running': [],
+ 'complete': [], # ???
+ }
+ self._errorHandler = errorHandler
+ self._log = logging.getLogger(NameClient + ':' + name)
+ self._logMessages = logMessages
+ self._lock = thread.allocate_lock()
+ self._socket = None
+
+ self.setVerbosity(verbosity)
+ atexit.register(self.close)
+
+ def close(self):
+ """Closes the client
+ @note: make shure to call close() when done with the client
+ """
+ self._log.info(self._logMessages.ClientClose)
+ if self._socket is not None:
+ self._socket.close()
+ self._socket = None
+
+
+ #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
+ def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
+ """Establishes the connection to a freenet node
+ @param host: (str) host of th node
+ @param port: (int) port of the node
+ @param repeat: (int) how many seconds try to connect before giving up
+ @param timeout: (int) how much time to wait before another attempt to connect
+ @return: (Message) NodeHello if successful,None otherwise
+ """
+ self._clientHello = None
+ self._log.info(self._logMessages.Connecting)
+
+ # poll untill freenet responds
+ time_elapsed = 0
+ while time_elapsed <= repeat:
+
+ # try to Connect socket
+ if self._socket is not None:
+ self.close()
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.settimeout(SocketTimeout)
+ try:
+ self._socket.connect((host, port))
+ except Exception, d:
+ pass
+ else:
+ self._log.info(self._logMessages.Connected)
+ job = JobClientHello(self)
+ self.jobAdd(job, synchron=True)
+ assert job.fcpError is None, 'Error should have been caught by handleMessage()'
+ return job.fcpResult
+
+ self._log.info(self._logMessages.ConnectionRetry)
+
+ # continue polling
+ time_elapsed += timeout
+ time.sleep(timeout)
+
+ self._log.info(self._logMessages.ConnectingFailed)
+ return None
+
+
+ def handleMessage(self, msg):
+ """Handles the next message from the freenet node
+ @param msg: Message() to handle
+ """
+ self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
+
+ if msg.name == Messages.NodeHello:
+ #connectionIdentifier = msg['ConnectionIdentifier']
+ self.jobStop(JobIdentifiers.ClientHello, msg)
+
+ elif msg.name == Messages.ProtocolError:
+ code = msg['Code']
+ #if code == ProtocolErrors.NoLateClientHellos:
+ # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+ #elif code == ProtocolErrors.ClientHelloMustBeFirst:
+ # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+ #else:
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ #TODO: inform caller
+ raise FcpProtocolError(msg)
+ else:
+ self.jobStop(identifier, msg, error=True)
+
+ elif msg.name == Messages.Peer:
+ self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg)
+
+ elif msg.name == Messages.EndListPeers:
+ self.jobStop(IdentifierListPeers, msg)
+
+ elif msg.name == Messages.GetFailed:
+ self.jobStop(msg['Identifier'], msg, error=True)
+
+ elif msg.name == Messages.SimpleProgress:
+ self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+
+ elif msg.name == Messages.TestDDAReply:
+ self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
+
+ elif msg.name == Messages.TestDDAComplete:
+ self.jobStop(msg['Directory'], msg)
+
+ elif msg.name == Messages.IdentifierCollision:
+ pass
+
+
+ def jobAdd(self, job, synchron=False):
+ """Adds a job to the client
+ @param job: (Job*) job to add
+ @param synchron: if True, wait untill the job is completed, if False return emidiately
+ """
+ self._lock.acquire(True)
+ try:
+ if job.fcpIdentifier in self._jobs['all']:
+ raise ValueError('Duplicate job: %r' % job.identifier)
+ self._jobs['all'][job.fcpIdentifier] = job
+ self._jobs['running'].append(job)
+ finally:
+ self._lock.release()
+
+ self._log.info(self._logMessages.JobStart + job.displayName())
+ job.start()
+ if synchron:
+ while not job.fcpStopped:
+ self.next()
+
+
+
+ def jobNotify(self, identifier, handler, msg):
+ """Notifies a job about an event while it is running
+ @param identifier: identifier of the job to notify
+ @param handler: (str) method of the job to call to handle the notification
+ @param msg: Message() to pass to the job
+ """
+ self._lock.acquire(True)
+ try:
+ job = self._jobs['all'].get(identifier, None)
+ finally:
+ self._lock.release()
+ if job is None:
+ raise ValueError('No such job: %r' % identifier)
+ getattr(job, handler)(msg)
+
+
+ #TODO: quite unclear when to remove a job
+ def jobStop(self, identifier, msg, error=False):
+ """Stops a job
+ @param identifier: identifier of the job to stop
+ @param msg: Message() to pass to the job as result
+ @param error: set to True to indicate unsuccessful completion of the job, True otherwisse
+ """
+ self._lock.acquire(True)
+ try:
+ job = self._jobs['all'].get(identifier, None)
+ if job is not None:
+ self._jobs['running'].remove(job)
+ if job._fcp_auto_remove_:
+ del self._jobs['all'][identifier]
+ else:
+ self._jobs['complete'].append(job)
+ finally:
+ self._lock.release()
+
+ if job is None:
+ raise ValueError('No such job: %r' % identifier)
+ self._log.info(self._logMessages.JobStop + job.displayName())
+ if error:
+ job.error(msg)
+ else:
+ job.stop(msg)
+
+
+ #TODO: some info when all jobs are completed
+ def next(self):
+ """Pumps the next message waiting
+ @note: use this method instead of run() to run the client step by step
+ """
+ msg = self.readMessage()
+ self.handleMessage(msg)
+ return msg
+
+ def readMessage(self):
+ """Reads the next message directly from the socket and dispatches it
+ @return: (Message) the next message read from the socket
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
+ """
+ msg = Message(None)
+ buf = []
+ while True:
+
+ try:
+ p = self._socket.recv(1)
+ if not p: raise ValueError('Socket is dead')
+ except socket.timeout, d: # no new messages in queue
+ msg = MessageSocketTimeout()
+ break
+ except Exception, d:
+ self._log.info(self._logMessages.SocketDead)
+ self.close()
+ if self._errorHandler is not None:
+ self._errorHandler(FcpSocketError, d)
+ raise FcpSocketError(d) #!!
+
+ if p == '\r': # ignore
+ continue
+
+ if p != '\n':
+ buf.append(p)
+ continue
+
+ line = ''.join(buf)
+ if line in ('End', "EndMessage"):
+ break
+ buf = []
+
+ if msg.name is None:
+ msg.name = line
+ elif line == 'Data':
+ n = int(msg.params['DataLength'])
+ try:
+ msg.data = self._socket.recv(n)
+ if not msg.data: raise ValueError('Socket is dead')
+ except Exception, d:
+ self._log.info(self._logMessages.SocketDead)
+ self.close()
+ if self._errorHandler is not None:
+ self._errorHandler(FcpSocketError, d)
+ raise FcpSocketError(d) #!!
+
+ else:
+ head, sep, tail = line.partition('=')
+ msg.params[head] = tail
+ if not sep:
+ # TODO: chek for invalid messages or not
+ pass
+
+ return msg
+
+
+ def run(self):
+ """Runs the client untill all jobs passed to it are completed
+ @note: use KeyboardInterrupt to stop prematurely
+ """
+
+ # TODO:
+ # x. push pending jobs
+ try:
+ while True:
+ if not self._lock.acquire(False):
+ continue
+
+ try:
+ if not self._jobs['pending'] and not self._jobs['running']:
+ self._log.info(self._logMessages.JobsCompleted)
+ break
+ finally:
+ self._lock.release()
+
+ self.next()
+ except KeyboardInterrupt:
+ self._log(self._logMessages.KeyboardInterrupt)
+ self.close()
+
+
+ def sendMessage(self, name, data=None, **params):
+ """Sends a message to freenet
+ @param name: name of the message to send
+ @param data: data to atatch to the message
+ @param params: {para-name: param-calue, ...} of parameters to pass along
+ with the message (see freenet protocol)
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
+ """
+ return self.sendMessageEx(Message(name, data=data, **params))
+
+
+ def sendMessageEx(self, msg):
+ """Sends a message to freenet
+ @param msg: (Message) message to send
+ @return: Message
+ @raise FcpSocketError: if the socket connection to the node dies unexpectedly.
+ If an error handler is passed to the client it is called emidiately before the error
+ is raised.
+ """
+ rawMsg = msg.toString()
+ self._log.debug(self._logMessages.Mes...
[truncated message content] |