SF.net SVN: fclient: [12] trunk/fclient/fclient_lib/fcp/fcp2_0.py
Status: Pre-Alpha
Brought to you by:
jurner
From: <jU...@us...> - 2007-10-25 14:04:41
|
Revision: 12 http://fclient.svn.sourceforge.net/fclient/?rev=12&view=rev Author: jUrner Date: 2007-10-25 05:59:16 -0700 (Thu, 25 Oct 2007) Log Message: ----------- bit more work on protocol Modified Paths: -------------- trunk/fclient/fclient_lib/fcp/fcp2_0.py Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py =================================================================== --- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11) +++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-25 12:59:16 UTC (rev 12) @@ -1,5 +1,16 @@ +'''Freenet client protocol 2.0 implementation +''' +#NOTE: +# +# downloading data to disk is not supported st the moment. TestDDA code is quite unwritable +# and as far as I can see there are plans to get rid of it. So wait... +# +# + + import atexit +import base64 import logging import os import re @@ -187,7 +198,7 @@ """Returns a new unique identifier @return: (str) uuid """ - return str(uuid.uuid4()) + return 'fclient::' + str(uuid.uuid4()) def pythonBool(fcpBool): @@ -251,7 +262,7 @@ @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat') @return: (string) whatever freenet returns """ - #TODO: on windows it may be necessary to hide the comand window + #TODO: on windows it may be necessary to hide the command window p = subprocess.Popen( args=cmdline, shell=True, @@ -368,10 +379,10 @@ ModifyPeerNote = 'ModifyPeerNote' RemovePeer = 'RemovePeer' GetNode = 'GetNode' - GetConfig = 'GetConfig' # (since 1027) + GetConfig = 'GetConfig' # (since 1027) ModifyConfig = 'ModifyConfig' # (since 1027) - TestDDARequest = 'TestDDARequest' # (since 1027) - TestDDAResponse = 'TestDDAResponse' # (since 1027) + TestDDARequest = 'TestDDARequest' # (since 1027) + TestDDAResponse = 'TestDDAResponse' # (since 1027) GenerateSSK = 'GenerateSSK' ClientPut = 'ClientPut' ClientPutDiskDir = 'ClientPutDiskDir' @@ -433,7 +444,7 @@ self.params = params def toString(self): - """Returns a string with the formated message, ready to be send""" + """Returns the message as formated string ready to be send""" # TODO: "Data" not yet implemented out = [self.name, ] for param, value in self.params.items(): @@ -447,7 +458,6 @@ for param, value in self.params.items(): out.append('>> %s=%s' % (param, value)) out.append('>>EndMessage') - out.append('') return '\n'.join(out) def __getitem__(self, name): @@ -508,6 +518,8 @@ self.jobTimeStart = time.time() self.jobClient.sendMessageEx(self.jobMessage) + + # XXX def handleStop(self, flagError, msg): """Called on job completion to stop the job @param flagError: True if an error was encountered, False otherwise @@ -569,13 +581,18 @@ """ def __init__(self, fcpClient, withMetaData=False, withVolantile=False): + """ + @param withMetaData: include meta data for each peer? + @param withVolantile: include volantile data for each peer? + @ivar jobResult: on job completion, will be a list containing all perrs as one 'Peer' message for each peer + """ message = Message( Message.ListPeers, WithMetadata=fcpBool(withMetaData), WithVolatile=fcpBool(withVolantile), ) JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message) - + def handleMessage(self,msg): if msg.name == Message.EndListPeers: @@ -585,17 +602,72 @@ else: raise ValueError('Unexpected message: %s' % msg.name) + def handlePeer(self, msg): + if self.jobResult is None: + self.jobResult = [msg, ] + else: + self.jobResult.append(msg) return True def handleEndListPeers(self, msg): self.jobTimeStop = time.time() - self.jobResult = msg + if self.jobResult is None: + self.jobResult = [] self.jobClient.jobRemove(self.jobIdentifier) return True + +class JobListPeerNotes(JobBase): + """Lists all notes associated to a peer of the node + """ + + def __init__(self, fcpClient, identifier): + """ + @param identifier: identifier of the peer to list notes for (peer identity) + @ivar jobResult: on job completion will be a list containing all notes associated to the peer + + @note: notes are only available for darknet peers (opennet == false) + """ + + message = Message( + Message.ListPeerNotes, + NodeIdentifier=identifier + ) + JobBase.__init__(self, fcpClient, identifier, message) + + + def handleMessage(self,msg): + if msg.name == Message.EndListPeerNotes: + return self.handleEndListPeerNotes(msg) + elif msg.name == Message.PeerNote: + return self.handlePeerNote(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + def handlePeerNote(self, msg): + note = msg.get('NoteText', '') + if note: + note = base64.decodestring(note) + if self.jobResult is None: + self.jobResult = [note, ] + else: + self.jobResult.append(note) + return True + + + def handleEndListPeerNotes(self, msg): + self.jobTimeStop = time.time() + if self.jobResult is None: + self.jobResult = [] + self.jobClient.jobRemove(self.jobIdentifier) + return True + + + + #TODO: identifier collisions are not yet handled class JobGetFileInfo(JobBase): """Tries to retieve information about a file. If everything goes well @@ -735,7 +807,13 @@ """ - def __init__(self, fcpClient, directory, read=True, write=True): + def __init__(self, fcpClient, directory, read=False, write=False): + """ + + @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed) + """ + + if not os.path.isdir(directory): raise ValueError('No such directory: %r' % directory) @@ -747,8 +825,7 @@ ) JobBase.__init__(self, fcpClient, directory, message) self.jobTmpFile = None - - + def handleMessage(self, msg): if msg.name == Message.TestDDAReply: @@ -786,13 +863,50 @@ def handleTestDDAComplete(self, msg): self.jobTimeStop = time.time() - self.jobResult = msg + self.jobResult = ( + pythonBool(msg.get('ReadDirectoryAllowed', 'false')), + pythonBool(msg.get('WriteDirectoryAllowed', 'false')), + ) saveRemoveFile(self.jobTmpFile) self.jobTmpFile = None self.jobClient.jobRemove(self.jobIdentifier) return True + +class JobGenerateSSK(JobBase): + """Job to generate a SSK key pair + """ + + + def __init__(self, fcpClient): + """ + @ivar jobResult: on job completion, a tuple(insertURI, requestURI) of the generated + SSK key + """ + + identifier = newIdentifier() + message = Message( + Message.GenerateSSK, + Identifier=identifier, + ) + JobBase.__init__(self, fcpClient, identifier, message) + + + def handleMessage(self, msg): + if msg.name == Message.SSKKeypair: + return self.handleSSKKeypair(msg) + else: + raise ValueError('Unexpected message: %s' % msg.name) + + + def handleSSKKeypair(self, msg): + self.jobTimeStop = time.time() + self.jobResult = (msg['InsertURI'], msg['RequestURI']) + self.jobClient.jobRemove(self.jobIdentifier) + return True + + #************************************************************************** # fcp client #************************************************************************** @@ -840,20 +954,19 @@ with two params: SocketError + details. When the handler is called the client is already closed. @param verbosity: verbosity level for debugging - @param logMessages: LogMessages class containing messages + @param logMessages: LogMessages class containing message strings """ self._isConnected = False self._jobs = { - #TODO: check if JobList is still required - 'JobMapping': {}, - 'JobList': [], + 'Jobs': {}, + 'PendingJobs': [], 'RegisteredDirectories': [], } - self._errorHandler = errorHandler + self._errorHandler = errorHandler #TODO: check! self._log = logging.getLogger(name) self._logMessages = logMessages - self._lock = thread.allocate_lock() # lovk when resources are accessed + self._lock = thread.allocate_lock() # lock when resources are accessed self._socket = None self.setVerbosity(verbosity) @@ -933,7 +1046,7 @@ if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst: return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg) - elif code == ProtocolError.Shutdown: + elif code == ProtocolError.ShuttingDown: if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg): # ######################################## @@ -956,6 +1069,11 @@ identifier = msg['Directory'] elif msg.name == Message.TestDDAComplete: identifier = msg['Directory'] + elif msg.name == Message.PeerNote: + identifier = msg['NodeIdentifier'] + elif msg.name == Message.EndListPeerNotes: + identifier = msg['NodeIdentifier'] + else: identifier = msg.get('Identifier', None) @@ -971,6 +1089,13 @@ elif msg.name == Message.Peer: return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg) + #elif msg.name == Message.PeerNote: + # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg) + + #elif msg.name == Message.EndListPeerNotes: + # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg) + + # more here..... else: @@ -993,9 +1118,12 @@ """ self._lock.acquire(True) try: - result = bool(self._jobs['JobList']) + result = self._jobs['Jobs'] or self._jobs['PendingJobs'] finally: self._lock.release() + + + return result @@ -1006,20 +1134,16 @@ """ self._lock.acquire(True) try: - if job.jobIdentifier in self._jobs['JobMapping']: - raise ValueError('Job with that identifier already present') - - if job.jobIdentifier in self._jobs['JobMapping']: + if job.jobIdentifier in self._jobs['Jobs']: raise ValueError('Duplicate job: %r' % job.jobIdentifier) - self._jobs['JobMapping'][job.jobIdentifier] = job - self._jobs['JobList'].append(job) + self._jobs['Jobs'][job.jobIdentifier] = job finally: self._lock.release() self._log.info(self._logMessages.JobStart + job.jobMessage.name) job.handleStart() if synchron: - while job.jobResult is None: + while self.jobGet(job.jobIdentifier): self.next() @@ -1042,7 +1166,7 @@ """ self._lock.acquire(True) try: - result = self._jobs['JobMapping'].get(identifier, None) + result = self._jobs['Jobs'].get(identifier, None) finally: self._lock.release() return result @@ -1055,7 +1179,7 @@ """ self._lock.acquire(True) try: - result = identifier in self._jobs['JobMapping'] + result = identifier in self._jobs['Jobs'] finally: self._lock.release() return result @@ -1068,10 +1192,9 @@ """ self._lock.acquire(True) try: - job = self._jobs['JobMapping'].get(identifier, None) + job = self._jobs['Jobs'].get(identifier, None) if job is not None: - self._jobs['JobList'].remove(job) - del self._jobs['JobMapping'][identifier] + del self._jobs['Jobs'][identifier] finally: self._lock.release() if job is None: @@ -1209,17 +1332,86 @@ self._log.setLevel(verbosity) + ######################################################## ## ######################################################## def getFileInfo(self, job): pass - def getFile(self, job): - pass + + + ######################################################### + ## boilerplate code to tackle TestDDA + ## + ## ...but I don't trust it ;-) I was not yet alble to wrap my head around + ## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not. + ## + ## Another problem is that there is no way to know when a directory is no longer + ## needed. And I fon't want to write code in a Gui to tackle a problem that will hopefully + ## go away in the near future. + ## + ## see: https://bugs.freenetproject.org/view.php?id=1753 + ## + ######################################################### + def testWriteAccess(self, directory): + canRead, canWrite = False, False + result = self._jobs.get('RegisteredDirectories', None) + if result is not None: + canRead, canWrite = result + if not canWrite: + job = JobTestDDA(directory, read=canRead, write=True) + self.addJob(job, synchron=True) + canWrite = job.jobResult[1] + self._jobs['RegisteredDirectories'] = (canRead, canWrite) + return canWrite + + def testReadAccess(self, directory): + canRead, canWrite = False, False + result = self._jobs.get('RegisteredDirectories', None) + if result is not None: + canRead, canWrite = result + if not canRead: + job = JobTestDDA(directory, read=True, write=canWrite) + self.addJob(job, synchron=True) + canRead = job.jobResult[0] + self._jobs['RegisteredDirectories'] = (canRead, canWrite) + return canRead + + + def downloadFile(self, directory, job): + if not os.path.isdir(directory): + raise IOError('No such directory') + + self._jobs['PendingJobs'].append(job) + try: + result = self.testWriteAccess(directory) + if result: + self.addJob(job) + finally: + self._jobs['PendingJobs'].remove(job) + return result + + def uploadFile(self, directory, job): + if not os.path.isdir(directory): + raise IOError('No such directory') + + self._jobs['PendingJobs'].append(job) + try: + result = self.testReadAccess(directory) + if result: + self.addJob(job) + finally: + self._jobs['PendingJobs'].remove(job) + return result + + + + + #***************************************************************************** # #***************************************************************************** @@ -1239,7 +1431,16 @@ #foo() + def foo(): + job = JobGenerateSSK(c) + c.jobAdd(job, synchron=True) + print job.jobResult + foo() + + + + def foo(): d = os.path.dirname(os.path.abspath(__file__)) job2 = JobTestDDA(c, d) c.jobAdd(job2) @@ -1250,12 +1451,21 @@ #foo() def foo(): - job2 = JobListPeers(c) - c.jobAdd(job2) + job = JobListPeers(c) + c.jobAdd(job) c.run() print '---------------------------' - print job2.jobResult + print job.jobResult print '---------------------------' + + + for peer in job.jobResult: + if not pythonBool(peer['opennet']): + job = JobListPeerNotes(c, peer['identity']) + c.jobAdd(job, synchron=True) + print '>>', job.jobResult + #.get('NoteText') + #foo() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |