SF.net SVN: fclient: [4] trunk
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <ju...@us...> - 2007-10-15 17:42:21
|
Revision: 4
http://fclient.svn.sourceforge.net/fclient/?rev=4&view=rev
Author: jurner
Date: 2007-10-15 10:42:24 -0700 (Mon, 15 Oct 2007)
Log Message:
-----------
started implementing fcp20 client
Added Paths:
-----------
trunk/fclient/
trunk/fclient/__init__.py
trunk/fclient/fclient_lib/
trunk/fclient/fclient_lib/__init__.py
trunk/fclient/fclient_lib/fcp/
trunk/fclient/fclient_lib/fcp/__init__.py
trunk/fclient/fclient_lib/fcp/fcp20.py
Added: trunk/fclient/__init__.py
===================================================================
--- trunk/fclient/__init__.py (rev 0)
+++ trunk/fclient/__init__.py 2007-10-15 17:42:24 UTC (rev 4)
@@ -0,0 +1 @@
+
Added: trunk/fclient/fclient_lib/__init__.py
===================================================================
--- trunk/fclient/fclient_lib/__init__.py (rev 0)
+++ trunk/fclient/fclient_lib/__init__.py 2007-10-15 17:42:24 UTC (rev 4)
@@ -0,0 +1 @@
+
Added: trunk/fclient/fclient_lib/fcp/__init__.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/__init__.py (rev 0)
+++ trunk/fclient/fclient_lib/fcp/__init__.py 2007-10-15 17:42:24 UTC (rev 4)
@@ -0,0 +1 @@
+
Added: trunk/fclient/fclient_lib/fcp/fcp20.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp20.py (rev 0)
+++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-15 17:42:24 UTC (rev 4)
@@ -0,0 +1,715 @@
+
+import os
+import socket
+import time
+import thread
+import uuid
+#**************************************************************
+# consts
+#**************************************************************
+DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
+DefaultFcpPort = 9481
+try:
+ DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
+except: pass
+SocketTimeout = 0.1
+KeyTypes = ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@')
+
+class JobIdentifiers:
+ # fixed job identifiers
+ # note that the client can only handle one job of these at a time
+ ClientHello = 'ClientHello'
+ ListPeers = 'ListPeers'
+
+class Messages:
+
+ # client messages
+ ClientHello = 'ClientHello'
+ ListPeer = 'ListPeer' # (since 1045)
+ ListPeers = 'ListPeers'
+ ListPeerNotes = 'ListPeerNotes'
+ AddPeer = 'AddPeer'
+ ModifyPeer = 'ModifyPeer'
+ ModifyPeerNote = 'ModifyPeerNote'
+ RemovePeer = 'RemovePeer'
+ GetNode = 'GetNode'
+ GetConfig = 'GetConfig' # (since 1027)
+ ModifyConfig = 'ModifyConfig' # (since 1027)
+ TestDDARequest = 'TestDDARequest' # (since 1027)
+ TestDDAResponse = 'TestDDAResponse' # (since 1027)
+ GenerateSSK = 'GenerateSSK'
+ ClientPut = 'ClientPut'
+ ClientPutDiskDir = 'ClientPutDiskDir'
+ ClientPutComplexDir = 'ClientPutComplexDir'
+ ClientGet = 'ClientGet'
+ SubscribeUSK = 'SubscribeUSK'
+ WatchGlobal = 'WatchGlobal'
+ GetRequestStatus = 'GetRequestStatus'
+ ListPersistentRequests = 'ListPersistentRequests'
+ RemovePersistentRequest = 'RemovePersistentRequest'
+ ModifyPersistentRequest = 'ModifyPersistentRequest'
+ Shutdown = 'Shutdown'
+
+ # node messages
+ NodeHello = 'NodeHello'
+ CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
+ Peer = 'Peer'
+ PeerNote = 'PeerNote'
+ EndListPeers = 'EndListPeers'
+ EndListPeerNotes = 'EndListPeerNotes'
+ PeerRemoved = 'PeerRemoved'
+ NodeData = 'NodeData'
+ ConfigData = 'ConfigData' # (since 1027)
+ TestDDAReply = 'TestDDAReply' # (since 1027)
+ TestDDAComplete = 'TestDDAComplete' # (since 1027)
+ SSKKeypair = 'SSKKeypair'
+ PersistentGet = 'PersistentGet'
+ PersistentPut = 'PersistentPut'
+ PersistentPutDir = 'PersistentPutDir'
+ URIGenerated = 'URIGenerated'
+ PutSuccessful = 'PutSuccessful'
+ PutFetchable = 'PutFetchable'
+ DataFound = 'DataFound'
+ AllData = 'AllData'
+ StartedCompression = 'StartedCompression'
+ FinishedCompression = 'FinishedCompression'
+ SimpleProgress = 'SimpleProgress'
+ EndListPersistentRequests = 'EndListPersistentRequests'
+ PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
+ PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
+ PutFailed = 'PutFailed'
+ GetFailed = 'GetFailed'
+ ProtocolError = 'ProtocolError'
+ IdentifierCollision = 'IdentifierCollision'
+ UnknownNodeIdentifier = 'UnknownNodeIdentifier'
+ UnknownPeerNoteType = 'UnknownPeerNoteType'
+ SubscribedUSKUpdate = 'SubscribedUSKUpdate'
+
+
+class Priorities:
+ Maximum = 0
+ Interactive = 1
+ SemiInteractive = 2
+ Updatable = 3
+ Bulk = 4
+ Prefetch = 5
+ Minimum = 6
+
+ PriorityMin = Minimum
+ PriorityDefault = Bulk
+
+
+# errors
+
+class FetchErrors:
+ MaxArchiveRecursionExceeded = '1'
+ UnknownSplitfileMetadata = '2'
+ UnknownMetadata = '3'
+ InvalidMetadata = '4'
+ ArchiveFailure = '5'
+ BlockDecodeError = '6'
+ MaxMetadataLevelsExceeded = '7'
+ MaxArchiveRestartsExceeded = '8'
+ MaxRecursionLevelExceeded = '9'
+ NotAnArchve = '10'
+ TooManyMetastrings = '11'
+ BucketError = '12'
+ DataNotFound = '13'
+ RouteNotFound = '14'
+ RejectedOverload = '15'
+ TooManyRedirects = '16'
+ InternalError = '17'
+ TransferFailed = '18'
+ SplitfileError = '19'
+ InvalidUri = '20'
+ TooBig = '21'
+ MetadataTooBig = '22'
+ TooManyBlocks = '23'
+ NotEnoughMetastrings = '24'
+ Canceled = '25'
+ ArchiveRestart = '26'
+ PermanentRedirect = '27'
+ NotAllDataFound = '28'
+
+
+class InsertErrors:
+ InvalidUri = '1'
+ BucketError = '2'
+ InternalError = '3'
+ RejectedOverload = '4'
+ RouteNotFound = '5'
+ FatalErrorInBlocks = '6'
+ TooManyRetriesInBlock = '7'
+ RouteReallyNotFound = '8'
+ Collision = '9'
+ Canceled = '10'
+
+
+class ProtocolErrors:
+ ClientHelloMustBeFirst = '1'
+ NoLateClientHellos = '2'
+ MessageParseError = '3'
+ UriParseError = '4'
+ MissingField = '5'
+ ErrorParsingNumber = '6'
+ InvalidMessage = '7'
+ InvalidField = '8'
+ FileNotFound = '9'
+ DiskTargetExists = '10'
+ SameDirectoryExpected = '11'
+ CouldNotCreateFile = '12'
+ CouldNotWriteFile = '13'
+ CouldNotRenameFile = '14'
+ NoSuchIdentifier = '15'
+ NotSupported = '16'
+ InternalError = '17'
+ ShuttingDown = '18'
+ NoSuchNodeIdentifier = '19' # Unused since 995
+ UrlParseError = '20'
+ ReferenceParseError = '21'
+ FileParseError = '22'
+ NotAFile = '23'
+ AccessDenied = '24'
+ DDADenied = '25'
+ CouldNotReadFile = '26'
+ ReferenceSignature = '27'
+ CanNotPeerWithSelf = '28'
+ PeerExists = '29'
+ OpennetDisabled = '30'
+ DarknetOnly = '31'
+
+#**********************************************************************
+# functions
+#**********************************************************************
+def newIdentifier():
+ return str(uuid.uuid4())
+
+def saveReadFile(fpath):
+ """Reads contents of a file in the savest manner possible
+ @param fpath: file to write
+ @return: contents of the file if successful, None otherwise
+ """
+ read = None
+ try:
+ fp = open(fpath, 'rb')
+ except: pass
+ else:
+ try:
+ read = fp.read()
+ except: pass
+ fp.close()
+ return read
+
+
+def saveWriteFile(fpath, data):
+ """Writes data to a file i the savest manner possible
+ @param fpath: file to write
+ @param data: data to write to file
+ @return: True if successful, False otherwise
+ """
+ written = False
+ try:
+ fp = open(fpath, 'wb')
+ except: pass
+ else:
+ try:
+ fp.write(data)
+ written = True
+ except:
+ fp.Close()
+ else:
+ fp.close()
+ return written
+
+def fcpBool(pythonBool):
+ """Converts a python bool to a fcp bool
+ @param pythonBool: (bool)
+ @return: (str) 'true' or 'false'
+ """
+ if pythonBool:
+ return 'true'
+ return 'false'
+
+def pythonBool(fcpBool):
+ """Converts a fcp bool to a python bool
+ @param pythonBool: 'true' or 'false'
+ @return: (bool) True or False
+ """
+ return fcpBool == 'true'
+
+#**********************************************************************
+# classes
+#**********************************************************************
+class FcpSocketError(Exception): pass
+class Message(object):
+ """Class wrapping a freenet message"""
+
+ Name = 'UMessage'
+
+
+ def __init__(self, name, data=None, **params):
+ """
+ @param name: messge name
+ @param data: data associated to the messge (not yet implemented)
+ @param params: {field-name: value, ...} of parameters of the message
+ @note: all params can be accessed as attributes of the class
+ """
+ self.data = data
+ self.name = name
+ self.params = params
+
+
+ def toString(self):
+ """Returns a string with the formated message, ready to be send"""
+
+ # TODO: "Data" not yet implemented
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append('%s=%s' % (param, value))
+ out.append('EndMessage\n')
+ return '\n'.join(out)
+
+
+ def pprint(self):
+ """Returns the message as nicely formated human readable string"""
+
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append(' %s=%s' % (param, value))
+ out.append('EndMessage')
+ out.append('')
+ return '\n'.join(out)
+
+ def __getitem__(self, name):
+ """Returns the message parameter 'name' """
+ return self.params[name]
+
+ def get(self, name, default=None):
+ """Returns the message parameter 'name' or 'default' """
+ return self.params.get(name, default)
+
+ def __setitem__(self, name, value):
+ """Sets the message parameter 'name' to 'value' """
+ self.params[name] = value
+
+
+class MessageSocketTimeout(Message):
+
+ Name = 'USocketTimeout'
+
+ def __init__(self):
+ Message.__init__(self, self.Name)
+
+#**************************************************************************
+# jobs
+#**************************************************************************
+#TODO: do somrthing that this class does not lock the queue
+class JobBase(object):
+ """Base class for jobs"""
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, identifier, message):
+
+ self.fcpClient = fcpClient
+ self.fcpIdentifier = identifier
+ self.fcpMessage = message
+ self.fcpResult = None
+ self.fcpTime = 0
+
+ def start(self):
+ self.fcpTime = time.time()
+ self.fcpClient.sendMessageEx(self.fcpMessage)
+
+ def stop(self, result):
+ self.fcpTime = time.time() - self.fcpTime
+ self.fcpResult = result
+
+
+class JobNodeHello(JobBase):
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, expectedVersion='2.0'):
+ message = Message(
+ Messages.ClientHello,
+ Name=newIdentifier(),
+ ExpectedVersion=expectedVersion,
+ )
+ JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
+
+
+
+class JobListPeers(JobBase):
+
+ _fcp_auto_remove_ = True
+
+ def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
+ message = Message(
+ Messages.ListPeers,
+ WithMetadata='true' if withMetaData else 'false',
+ WithVolatile='true' if withVolantile else 'false',
+ )
+ JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
+
+
+ def handlePeer(self,msg):
+ pass
+
+
+
+class JobFileInfo(JobBase):
+
+ _fcp_auto_remove_ = False
+
+ def __init__(self, fcpClient, uri, **params):
+ """
+ @param fcpClient: FcpClient() instance
+ @param uri: uri of the file to retrieve info for
+ @param params: additional parameters:
+ IgnoreDS='true' / 'false'
+ DSOnly='true' / 'false'
+ MaxRetries=-1 ...N
+ PriorityClass=Priority*
+
+ """
+ identifier = newIdentifier()
+ message = Message(
+ Messages.ClientGet,
+ Identifier=identifier,
+ URI=uri,
+ MaxSize='ase0',
+ ReturnType='none',
+ Verbosity='1',
+ **params
+ )
+ JobBase.__init__(self, fcpClient, identifier, message)
+
+
+ def handleProgress(self, msg):
+ pass
+
+
+ def stop(self, msg):
+ JobBase.stop(self, msg)
+ error = result = None
+ if msg.name == Messages.GetFailed:
+ if msg['Code'] == FetchErrors.TooBig:
+ result = (
+ msg.get('ExpectedMetadata.ContentType', ''),
+ msg.get('ExpectedDataLength', '')
+ )
+ else:
+ error, result = msg['Code'], msg
+
+ elif msg.name == Messages.DataFound:
+ result = (
+ msg.get('Metadata.ContentType', ''),
+ msg.get('DataLength', '')
+ )
+
+ elif msg.name == Messages.ProtocolError:
+ error, result = msg['Code'], msg
+
+ else:
+ raise ValueError('Unhandled message: %s' % msg.name)
+
+ self.fcpResult = error, result
+
+
+#TODO: handle case where directories are registered multiple times
+class JobTestDDA(JobBase):
+
+ _fcp_auto_remove_ = False
+
+ def __init__(self, fcpClient, directory, read=True, write=True):
+ message = Message(
+ Messages.TestDDARequest,
+ Directory=directory,
+ WantReadDirectory=fcpBool(read),
+ WantWriteDirectory=fcpBool(write),
+ )
+ JobBase.__init__(self, fcpClient, directory, message)
+ self.fcpTmpFile = None
+
+
+ def handleTestDDAReply(self, msg):
+ fpathWrite = msg.params.get('WriteFilename', None)
+ fpathRead = msg.params.get('ReadFilename', None)
+ readContent = ''
+ if fpathWrite is not None:
+ written = saveWriteFile(fpathWrite, msg['ContentToWrite'])
+ if not written:
+ if os.path.isfile(fpathWrite):
+ os.remove(fpathWrite)
+ else:
+ self.fcpTmpFile = fpathWrite
+
+ if fpathRead is not None:
+ readContent = saveReadFile(fpathRead)
+ if readContent is None:
+ readContent = ''
+
+ self.fcpClient.sendMessage(
+ Messages.TestDDAResponse,
+ Directory=msg['Directory'],
+ ReadContent=readContent,
+ )
+
+ def stop(self, msg):
+ JobBase.stop(self, msg)
+ if self.fcpTmpFile is not None:
+ if os.path.isfile(self.fcpTmpFile):
+ os.remove(self.fcpTmpFile)
+
+#**************************************************************************
+# fcp client
+#**************************************************************************
+class FcpClient(object):
+
+ def __init__(self):
+
+ self._isConnected = False
+ self._jobs = {
+ 'all': {},
+ 'pending': [],
+ 'running': [],
+ 'complete': [],
+ }
+ self._lock = thread.allocate_lock()
+ self._socket = None
+
+
+
+ def close(self):
+ if self._socket is not None:
+ self._socket.close()
+ self._socket = None
+
+
+ def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
+ # poll untill freenet responds
+ time_elapsed = 0
+ while time_elapsed <= repeat:
+
+ # try to Connect socket
+ self.close()
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.settimeout(SocketTimeout)
+ try:
+ self._socket.connect((host, port))
+ except Exception, d:
+ pass
+ else:
+ #self._isConnected = True
+ return True
+
+ # continue polling
+ time_elapsed += timeout
+ time.sleep(timeout)
+
+ return False
+
+
+ #def __nonzero__(self):
+ # return self._isConnected
+
+ def addJob(self, job):
+ self._lock.acquire(True)
+ try:
+ if job.fcpIdentifier in self._jobs['all']:
+ raise ValueError('Duplicate job: %r' % job.identifier)
+ self._jobs['all'][job.fcpIdentifier] = job
+ self._jobs['running'].append(job)
+ finally:
+ self._lock.release()
+ job.start()
+
+ def finishJob(self, identifier, msg):
+ self._lock.acquire(True)
+ try:
+ job = self._jobs['all'].get(identifier, None)
+ if job is not None:
+ self._jobs['running'].remove(job)
+ if job._fcp_auto_remove_:
+ del self._jobs['all'][identifier]
+ else:
+ self._jobs['complete'].append(job)
+ finally:
+ self._lock.release()
+
+ if job is None:
+ raise ValueError('No such job: %r' % identifier)
+ job.stop(msg)
+
+
+ def notifyJob(self, identifier, handler, msg):
+ self._lock.acquire(True)
+ try:
+ job = self._jobs['all'].get(identifier, None)
+ finally:
+ self._lock.release()
+ if job is None:
+ raise ValueError('No such job: %r' % identifier)
+ getattr(job, handler)(msg)
+
+
+ def run(self):
+
+ # TODO:
+ # x. push pending jobs
+ # x. on error stop this thingy
+
+ n = 0
+ while True:
+ if not self._lock.acquire(False):
+ continue
+
+ try:
+ if not self._jobs['pending'] and not self._jobs['running']:
+ break
+ finally:
+ self._lock.release()
+
+ msg = self.readMessage()
+ self.handleMessage(msg)
+
+
+ n += 1
+ if n > 50: break
+
+
+ def next(self):
+ msg = self.readMessage()
+ self.handleMessage(msg)
+
+
+ def handleMessage(self, msg):
+
+ print msg.pprint()
+
+ if msg.name == Messages.NodeHello:
+ #connectionIdentifier = msg['ConnectionIdentifier']
+ self.finishJob(JobIdentifiers.ClientHello, msg)
+
+ elif msg.name == Messages.ProtocolError:
+ code = msg['Code']
+
+ if code == ProtocolErrors.NoLateClientHellos:
+ self.finishJob(JobIdentifiers.ClientHello, msg)
+
+ else:
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ pass # raise ???
+ else:
+ self.finishJob(identifier, msg)
+
+ elif msg.name == Messages.Peer:
+ self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg)
+
+ elif msg.name == Messages.EndListPeers:
+ self.finishJob(IdentifierListPeers, msg)
+
+ elif msg.name == Messages.GetFailed:
+ self.finishJob(msg['Identifier'], msg)
+
+ elif msg.name == Messages.SimpleProgress:
+ self.notifyJob(msg['Identifier'], 'handleProgress', msg)
+
+ elif msg.name == Messages.TestDDAReply:
+ self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg)
+
+ elif msg.name == Messages.TestDDAComplete:
+ self.finishJob(msg['Directory'], msg)
+
+ elif msg.name == Messages.IdentifierCollision:
+ pass
+
+
+ def readMessage(self):
+ """Reads the next message directly from the socket and dispatches it
+ @return: valid or invalid Message()
+ """
+ msg = Message(None)
+ buf = []
+ while True:
+
+ try:
+ p = self._socket.recv(1)
+ if not p: raise ValueError('Socket is dead')
+ except socket.timeout, d: # no new messages in queue
+ msg = MessageSocketTimeout()
+ break
+ except Exception, d:
+ raise FcpSocketError(d) #!!
+
+ if p == '\r': # ignore
+ continue
+
+ if p != '\n':
+ buf.append(p)
+ continue
+
+ line = ''.join(buf)
+ if line in ('End', "EndMessage"):
+ break
+ buf = []
+
+ if msg.name is None:
+ msg.name = line
+ elif line == 'Data':
+ n = int(msg.params['DataLength'])
+ try:
+ msg.data = self._socket.recv(n)
+ except Exception, d:
+ raise FcpSocketError(d) #!!
+
+ else:
+ head, sep, tail = line.partition('=')
+ msg.params[head] = tail
+ if not sep:
+ # TODO: chek for invalid messages or not
+ pass
+
+ return msg
+
+
+
+ def sendMessage(self, name, data=None, **params):
+ """Sends a message to freenet
+ @param name: name of the message to send
+ @param data: data to atatch to the message
+ @param params: {para-name: param-calue, ...} of parameters to pass along
+ with the message (see freenet protocol)
+ """
+ return self.sendMessageEx(Message(name, data=data, **params))
+
+
+ def sendMessageEx(self, msg):
+ """Sends a message to freenet
+ @param msg: (Message) message to send
+ @return: Message
+ """
+ #self.log.info('SendMessage\n' + msg.pprint())
+ rawMsg = msg.toString()
+ try:
+ self._socket.sendall(rawMsg)
+ except Exception, d:
+ raise FcpSocketError(d)
+ #TODO: allow for an error handler to handle
+ return msg
+
+#*****************************************************************************
+#
+#*****************************************************************************
+if __name__ == '__main__':
+ c = FcpClient()
+ if c.connect():
+ job1 = JobNodeHello(c)
+ c.addJob(job1)
+
+ c.run()
+ print '---------------------------'
+ print job1.fcpResult.pprint()
+
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|