SF.net SVN: fclient: [4] trunk
Status: Pre-Alpha
Brought to you by:
jurner
From: <ju...@us...> - 2007-10-15 17:42:21
|
Revision: 4 http://fclient.svn.sourceforge.net/fclient/?rev=4&view=rev Author: jurner Date: 2007-10-15 10:42:24 -0700 (Mon, 15 Oct 2007) Log Message: ----------- started implementing fcp20 client Added Paths: ----------- trunk/fclient/ trunk/fclient/__init__.py trunk/fclient/fclient_lib/ trunk/fclient/fclient_lib/__init__.py trunk/fclient/fclient_lib/fcp/ trunk/fclient/fclient_lib/fcp/__init__.py trunk/fclient/fclient_lib/fcp/fcp20.py Added: trunk/fclient/__init__.py =================================================================== --- trunk/fclient/__init__.py (rev 0) +++ trunk/fclient/__init__.py 2007-10-15 17:42:24 UTC (rev 4) @@ -0,0 +1 @@ + Added: trunk/fclient/fclient_lib/__init__.py =================================================================== --- trunk/fclient/fclient_lib/__init__.py (rev 0) +++ trunk/fclient/fclient_lib/__init__.py 2007-10-15 17:42:24 UTC (rev 4) @@ -0,0 +1 @@ + Added: trunk/fclient/fclient_lib/fcp/__init__.py =================================================================== --- trunk/fclient/fclient_lib/fcp/__init__.py (rev 0) +++ trunk/fclient/fclient_lib/fcp/__init__.py 2007-10-15 17:42:24 UTC (rev 4) @@ -0,0 +1 @@ + Added: trunk/fclient/fclient_lib/fcp/fcp20.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp20.py (rev 0) +++ trunk/fclient/fclient_lib/fcp/fcp20.py 2007-10-15 17:42:24 UTC (rev 4) @@ -0,0 +1,715 @@ + +import os +import socket +import time +import thread +import uuid +#************************************************************** +# consts +#************************************************************** +DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip() +DefaultFcpPort = 9481 +try: + DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip()) +except: pass +SocketTimeout = 0.1 +KeyTypes = ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@') + +class JobIdentifiers: + # fixed job identifiers + # note that the client can only handle one job of these at a time + ClientHello = 'ClientHello' + ListPeers = 'ListPeers' + +class Messages: + + # client messages + ClientHello = 'ClientHello' + ListPeer = 'ListPeer' # (since 1045) + ListPeers = 'ListPeers' + ListPeerNotes = 'ListPeerNotes' + AddPeer = 'AddPeer' + ModifyPeer = 'ModifyPeer' + ModifyPeerNote = 'ModifyPeerNote' + RemovePeer = 'RemovePeer' + GetNode = 'GetNode' + GetConfig = 'GetConfig' # (since 1027) + ModifyConfig = 'ModifyConfig' # (since 1027) + TestDDARequest = 'TestDDARequest' # (since 1027) + TestDDAResponse = 'TestDDAResponse' # (since 1027) + GenerateSSK = 'GenerateSSK' + ClientPut = 'ClientPut' + ClientPutDiskDir = 'ClientPutDiskDir' + ClientPutComplexDir = 'ClientPutComplexDir' + ClientGet = 'ClientGet' + SubscribeUSK = 'SubscribeUSK' + WatchGlobal = 'WatchGlobal' + GetRequestStatus = 'GetRequestStatus' + ListPersistentRequests = 'ListPersistentRequests' + RemovePersistentRequest = 'RemovePersistentRequest' + ModifyPersistentRequest = 'ModifyPersistentRequest' + Shutdown = 'Shutdown' + + # node messages + NodeHello = 'NodeHello' + CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName' + Peer = 'Peer' + PeerNote = 'PeerNote' + EndListPeers = 'EndListPeers' + EndListPeerNotes = 'EndListPeerNotes' + PeerRemoved = 'PeerRemoved' + NodeData = 'NodeData' + ConfigData = 'ConfigData' # (since 1027) + TestDDAReply = 'TestDDAReply' # (since 1027) + TestDDAComplete = 'TestDDAComplete' # (since 1027) + SSKKeypair = 'SSKKeypair' + PersistentGet = 'PersistentGet' + PersistentPut = 'PersistentPut' + PersistentPutDir = 'PersistentPutDir' + URIGenerated = 'URIGenerated' + PutSuccessful = 'PutSuccessful' + PutFetchable = 'PutFetchable' + DataFound = 'DataFound' + AllData = 'AllData' + StartedCompression = 'StartedCompression' + FinishedCompression = 'FinishedCompression' + SimpleProgress = 'SimpleProgress' + EndListPersistentRequests = 'EndListPersistentRequests' + PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016) + PersistentRequestModified = 'PersistentRequestModified' # (since 1016) + PutFailed = 'PutFailed' + GetFailed = 'GetFailed' + ProtocolError = 'ProtocolError' + IdentifierCollision = 'IdentifierCollision' + UnknownNodeIdentifier = 'UnknownNodeIdentifier' + UnknownPeerNoteType = 'UnknownPeerNoteType' + SubscribedUSKUpdate = 'SubscribedUSKUpdate' + + +class Priorities: + Maximum = 0 + Interactive = 1 + SemiInteractive = 2 + Updatable = 3 + Bulk = 4 + Prefetch = 5 + Minimum = 6 + + PriorityMin = Minimum + PriorityDefault = Bulk + + +# errors + +class FetchErrors: + MaxArchiveRecursionExceeded = '1' + UnknownSplitfileMetadata = '2' + UnknownMetadata = '3' + InvalidMetadata = '4' + ArchiveFailure = '5' + BlockDecodeError = '6' + MaxMetadataLevelsExceeded = '7' + MaxArchiveRestartsExceeded = '8' + MaxRecursionLevelExceeded = '9' + NotAnArchve = '10' + TooManyMetastrings = '11' + BucketError = '12' + DataNotFound = '13' + RouteNotFound = '14' + RejectedOverload = '15' + TooManyRedirects = '16' + InternalError = '17' + TransferFailed = '18' + SplitfileError = '19' + InvalidUri = '20' + TooBig = '21' + MetadataTooBig = '22' + TooManyBlocks = '23' + NotEnoughMetastrings = '24' + Canceled = '25' + ArchiveRestart = '26' + PermanentRedirect = '27' + NotAllDataFound = '28' + + +class InsertErrors: + InvalidUri = '1' + BucketError = '2' + InternalError = '3' + RejectedOverload = '4' + RouteNotFound = '5' + FatalErrorInBlocks = '6' + TooManyRetriesInBlock = '7' + RouteReallyNotFound = '8' + Collision = '9' + Canceled = '10' + + +class ProtocolErrors: + ClientHelloMustBeFirst = '1' + NoLateClientHellos = '2' + MessageParseError = '3' + UriParseError = '4' + MissingField = '5' + ErrorParsingNumber = '6' + InvalidMessage = '7' + InvalidField = '8' + FileNotFound = '9' + DiskTargetExists = '10' + SameDirectoryExpected = '11' + CouldNotCreateFile = '12' + CouldNotWriteFile = '13' + CouldNotRenameFile = '14' + NoSuchIdentifier = '15' + NotSupported = '16' + InternalError = '17' + ShuttingDown = '18' + NoSuchNodeIdentifier = '19' # Unused since 995 + UrlParseError = '20' + ReferenceParseError = '21' + FileParseError = '22' + NotAFile = '23' + AccessDenied = '24' + DDADenied = '25' + CouldNotReadFile = '26' + ReferenceSignature = '27' + CanNotPeerWithSelf = '28' + PeerExists = '29' + OpennetDisabled = '30' + DarknetOnly = '31' + +#********************************************************************** +# functions +#********************************************************************** +def newIdentifier(): + return str(uuid.uuid4()) + +def saveReadFile(fpath): + """Reads contents of a file in the savest manner possible + @param fpath: file to write + @return: contents of the file if successful, None otherwise + """ + read = None + try: + fp = open(fpath, 'rb') + except: pass + else: + try: + read = fp.read() + except: pass + fp.close() + return read + + +def saveWriteFile(fpath, data): + """Writes data to a file i the savest manner possible + @param fpath: file to write + @param data: data to write to file + @return: True if successful, False otherwise + """ + written = False + try: + fp = open(fpath, 'wb') + except: pass + else: + try: + fp.write(data) + written = True + except: + fp.Close() + else: + fp.close() + return written + +def fcpBool(pythonBool): + """Converts a python bool to a fcp bool + @param pythonBool: (bool) + @return: (str) 'true' or 'false' + """ + if pythonBool: + return 'true' + return 'false' + +def pythonBool(fcpBool): + """Converts a fcp bool to a python bool + @param pythonBool: 'true' or 'false' + @return: (bool) True or False + """ + return fcpBool == 'true' + +#********************************************************************** +# classes +#********************************************************************** +class FcpSocketError(Exception): pass +class Message(object): + """Class wrapping a freenet message""" + + Name = 'UMessage' + + + def __init__(self, name, data=None, **params): + """ + @param name: messge name + @param data: data associated to the messge (not yet implemented) + @param params: {field-name: value, ...} of parameters of the message + @note: all params can be accessed as attributes of the class + """ + self.data = data + self.name = name + self.params = params + + + def toString(self): + """Returns a string with the formated message, ready to be send""" + + # TODO: "Data" not yet implemented + out = [self.name, ] + for param, value in self.params.items(): + out.append('%s=%s' % (param, value)) + out.append('EndMessage\n') + return '\n'.join(out) + + + def pprint(self): + """Returns the message as nicely formated human readable string""" + + out = [self.name, ] + for param, value in self.params.items(): + out.append(' %s=%s' % (param, value)) + out.append('EndMessage') + out.append('') + return '\n'.join(out) + + def __getitem__(self, name): + """Returns the message parameter 'name' """ + return self.params[name] + + def get(self, name, default=None): + """Returns the message parameter 'name' or 'default' """ + return self.params.get(name, default) + + def __setitem__(self, name, value): + """Sets the message parameter 'name' to 'value' """ + self.params[name] = value + + +class MessageSocketTimeout(Message): + + Name = 'USocketTimeout' + + def __init__(self): + Message.__init__(self, self.Name) + +#************************************************************************** +# jobs +#************************************************************************** +#TODO: do somrthing that this class does not lock the queue +class JobBase(object): + """Base class for jobs""" + + _fcp_auto_remove_ = True + + def __init__(self, fcpClient, identifier, message): + + self.fcpClient = fcpClient + self.fcpIdentifier = identifier + self.fcpMessage = message + self.fcpResult = None + self.fcpTime = 0 + + def start(self): + self.fcpTime = time.time() + self.fcpClient.sendMessageEx(self.fcpMessage) + + def stop(self, result): + self.fcpTime = time.time() - self.fcpTime + self.fcpResult = result + + +class JobNodeHello(JobBase): + + _fcp_auto_remove_ = True + + def __init__(self, fcpClient, expectedVersion='2.0'): + message = Message( + Messages.ClientHello, + Name=newIdentifier(), + ExpectedVersion=expectedVersion, + ) + JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message) + + + +class JobListPeers(JobBase): + + _fcp_auto_remove_ = True + + def __init__(self, fcpClient, withMetaData=False, withVolantile=False): + message = Message( + Messages.ListPeers, + WithMetadata='true' if withMetaData else 'false', + WithVolatile='true' if withVolantile else 'false', + ) + JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message) + + + def handlePeer(self,msg): + pass + + + +class JobFileInfo(JobBase): + + _fcp_auto_remove_ = False + + def __init__(self, fcpClient, uri, **params): + """ + @param fcpClient: FcpClient() instance + @param uri: uri of the file to retrieve info for + @param params: additional parameters: + IgnoreDS='true' / 'false' + DSOnly='true' / 'false' + MaxRetries=-1 ...N + PriorityClass=Priority* + + """ + identifier = newIdentifier() + message = Message( + Messages.ClientGet, + Identifier=identifier, + URI=uri, + MaxSize='ase0', + ReturnType='none', + Verbosity='1', + **params + ) + JobBase.__init__(self, fcpClient, identifier, message) + + + def handleProgress(self, msg): + pass + + + def stop(self, msg): + JobBase.stop(self, msg) + error = result = None + if msg.name == Messages.GetFailed: + if msg['Code'] == FetchErrors.TooBig: + result = ( + msg.get('ExpectedMetadata.ContentType', ''), + msg.get('ExpectedDataLength', '') + ) + else: + error, result = msg['Code'], msg + + elif msg.name == Messages.DataFound: + result = ( + msg.get('Metadata.ContentType', ''), + msg.get('DataLength', '') + ) + + elif msg.name == Messages.ProtocolError: + error, result = msg['Code'], msg + + else: + raise ValueError('Unhandled message: %s' % msg.name) + + self.fcpResult = error, result + + +#TODO: handle case where directories are registered multiple times +class JobTestDDA(JobBase): + + _fcp_auto_remove_ = False + + def __init__(self, fcpClient, directory, read=True, write=True): + message = Message( + Messages.TestDDARequest, + Directory=directory, + WantReadDirectory=fcpBool(read), + WantWriteDirectory=fcpBool(write), + ) + JobBase.__init__(self, fcpClient, directory, message) + self.fcpTmpFile = None + + + def handleTestDDAReply(self, msg): + fpathWrite = msg.params.get('WriteFilename', None) + fpathRead = msg.params.get('ReadFilename', None) + readContent = '' + if fpathWrite is not None: + written = saveWriteFile(fpathWrite, msg['ContentToWrite']) + if not written: + if os.path.isfile(fpathWrite): + os.remove(fpathWrite) + else: + self.fcpTmpFile = fpathWrite + + if fpathRead is not None: + readContent = saveReadFile(fpathRead) + if readContent is None: + readContent = '' + + self.fcpClient.sendMessage( + Messages.TestDDAResponse, + Directory=msg['Directory'], + ReadContent=readContent, + ) + + def stop(self, msg): + JobBase.stop(self, msg) + if self.fcpTmpFile is not None: + if os.path.isfile(self.fcpTmpFile): + os.remove(self.fcpTmpFile) + +#************************************************************************** +# fcp client +#************************************************************************** +class FcpClient(object): + + def __init__(self): + + self._isConnected = False + self._jobs = { + 'all': {}, + 'pending': [], + 'running': [], + 'complete': [], + } + self._lock = thread.allocate_lock() + self._socket = None + + + + def close(self): + if self._socket is not None: + self._socket.close() + self._socket = None + + + def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5): + # poll untill freenet responds + time_elapsed = 0 + while time_elapsed <= repeat: + + # try to Connect socket + self.close() + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(SocketTimeout) + try: + self._socket.connect((host, port)) + except Exception, d: + pass + else: + #self._isConnected = True + return True + + # continue polling + time_elapsed += timeout + time.sleep(timeout) + + return False + + + #def __nonzero__(self): + # return self._isConnected + + def addJob(self, job): + self._lock.acquire(True) + try: + if job.fcpIdentifier in self._jobs['all']: + raise ValueError('Duplicate job: %r' % job.identifier) + self._jobs['all'][job.fcpIdentifier] = job + self._jobs['running'].append(job) + finally: + self._lock.release() + job.start() + + def finishJob(self, identifier, msg): + self._lock.acquire(True) + try: + job = self._jobs['all'].get(identifier, None) + if job is not None: + self._jobs['running'].remove(job) + if job._fcp_auto_remove_: + del self._jobs['all'][identifier] + else: + self._jobs['complete'].append(job) + finally: + self._lock.release() + + if job is None: + raise ValueError('No such job: %r' % identifier) + job.stop(msg) + + + def notifyJob(self, identifier, handler, msg): + self._lock.acquire(True) + try: + job = self._jobs['all'].get(identifier, None) + finally: + self._lock.release() + if job is None: + raise ValueError('No such job: %r' % identifier) + getattr(job, handler)(msg) + + + def run(self): + + # TODO: + # x. push pending jobs + # x. on error stop this thingy + + n = 0 + while True: + if not self._lock.acquire(False): + continue + + try: + if not self._jobs['pending'] and not self._jobs['running']: + break + finally: + self._lock.release() + + msg = self.readMessage() + self.handleMessage(msg) + + + n += 1 + if n > 50: break + + + def next(self): + msg = self.readMessage() + self.handleMessage(msg) + + + def handleMessage(self, msg): + + print msg.pprint() + + if msg.name == Messages.NodeHello: + #connectionIdentifier = msg['ConnectionIdentifier'] + self.finishJob(JobIdentifiers.ClientHello, msg) + + elif msg.name == Messages.ProtocolError: + code = msg['Code'] + + if code == ProtocolErrors.NoLateClientHellos: + self.finishJob(JobIdentifiers.ClientHello, msg) + + else: + identifier = msg.get('Identifier', None) + if identifier is None: + pass # raise ??? + else: + self.finishJob(identifier, msg) + + elif msg.name == Messages.Peer: + self.notifyJob(JobIdentifiers.ListPeers, 'handlePeer', msg) + + elif msg.name == Messages.EndListPeers: + self.finishJob(IdentifierListPeers, msg) + + elif msg.name == Messages.GetFailed: + self.finishJob(msg['Identifier'], msg) + + elif msg.name == Messages.SimpleProgress: + self.notifyJob(msg['Identifier'], 'handleProgress', msg) + + elif msg.name == Messages.TestDDAReply: + self.notifyJob(msg['Directory'], 'handleTestDDAReply', msg) + + elif msg.name == Messages.TestDDAComplete: + self.finishJob(msg['Directory'], msg) + + elif msg.name == Messages.IdentifierCollision: + pass + + + def readMessage(self): + """Reads the next message directly from the socket and dispatches it + @return: valid or invalid Message() + """ + msg = Message(None) + buf = [] + while True: + + try: + p = self._socket.recv(1) + if not p: raise ValueError('Socket is dead') + except socket.timeout, d: # no new messages in queue + msg = MessageSocketTimeout() + break + except Exception, d: + raise FcpSocketError(d) #!! + + if p == '\r': # ignore + continue + + if p != '\n': + buf.append(p) + continue + + line = ''.join(buf) + if line in ('End', "EndMessage"): + break + buf = [] + + if msg.name is None: + msg.name = line + elif line == 'Data': + n = int(msg.params['DataLength']) + try: + msg.data = self._socket.recv(n) + except Exception, d: + raise FcpSocketError(d) #!! + + else: + head, sep, tail = line.partition('=') + msg.params[head] = tail + if not sep: + # TODO: chek for invalid messages or not + pass + + return msg + + + + def sendMessage(self, name, data=None, **params): + """Sends a message to freenet + @param name: name of the message to send + @param data: data to atatch to the message + @param params: {para-name: param-calue, ...} of parameters to pass along + with the message (see freenet protocol) + """ + return self.sendMessageEx(Message(name, data=data, **params)) + + + def sendMessageEx(self, msg): + """Sends a message to freenet + @param msg: (Message) message to send + @return: Message + """ + #self.log.info('SendMessage\n' + msg.pprint()) + rawMsg = msg.toString() + try: + self._socket.sendall(rawMsg) + except Exception, d: + raise FcpSocketError(d) + #TODO: allow for an error handler to handle + return msg + +#***************************************************************************** +# +#***************************************************************************** +if __name__ == '__main__': + c = FcpClient() + if c.connect(): + job1 = JobNodeHello(c) + c.addJob(job1) + + c.run() + print '---------------------------' + print job1.fcpResult.pprint() + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |