SF.net SVN: fclient: [12] trunk/fclient/fclient_lib/fcp/fcp2_0.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <jU...@us...> - 2007-10-25 14:04:41
|
Revision: 12
http://fclient.svn.sourceforge.net/fclient/?rev=12&view=rev
Author: jUrner
Date: 2007-10-25 05:59:16 -0700 (Thu, 25 Oct 2007)
Log Message:
-----------
bit more work on protocol
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-25 12:59:16 UTC (rev 12)
@@ -1,5 +1,16 @@
+'''Freenet client protocol 2.0 implementation
+'''
+#NOTE:
+#
+# downloading data to disk is not supported st the moment. TestDDA code is quite unwritable
+# and as far as I can see there are plans to get rid of it. So wait...
+#
+#
+
+
import atexit
+import base64
import logging
import os
import re
@@ -187,7 +198,7 @@
"""Returns a new unique identifier
@return: (str) uuid
"""
- return str(uuid.uuid4())
+ return 'fclient::' + str(uuid.uuid4())
def pythonBool(fcpBool):
@@ -251,7 +262,7 @@
@param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat')
@return: (string) whatever freenet returns
"""
- #TODO: on windows it may be necessary to hide the comand window
+ #TODO: on windows it may be necessary to hide the command window
p = subprocess.Popen(
args=cmdline,
shell=True,
@@ -368,10 +379,10 @@
ModifyPeerNote = 'ModifyPeerNote'
RemovePeer = 'RemovePeer'
GetNode = 'GetNode'
- GetConfig = 'GetConfig' # (since 1027)
+ GetConfig = 'GetConfig' # (since 1027)
ModifyConfig = 'ModifyConfig' # (since 1027)
- TestDDARequest = 'TestDDARequest' # (since 1027)
- TestDDAResponse = 'TestDDAResponse' # (since 1027)
+ TestDDARequest = 'TestDDARequest' # (since 1027)
+ TestDDAResponse = 'TestDDAResponse' # (since 1027)
GenerateSSK = 'GenerateSSK'
ClientPut = 'ClientPut'
ClientPutDiskDir = 'ClientPutDiskDir'
@@ -433,7 +444,7 @@
self.params = params
def toString(self):
- """Returns a string with the formated message, ready to be send"""
+ """Returns the message as formated string ready to be send"""
# TODO: "Data" not yet implemented
out = [self.name, ]
for param, value in self.params.items():
@@ -447,7 +458,6 @@
for param, value in self.params.items():
out.append('>> %s=%s' % (param, value))
out.append('>>EndMessage')
- out.append('')
return '\n'.join(out)
def __getitem__(self, name):
@@ -508,6 +518,8 @@
self.jobTimeStart = time.time()
self.jobClient.sendMessageEx(self.jobMessage)
+
+ # XXX
def handleStop(self, flagError, msg):
"""Called on job completion to stop the job
@param flagError: True if an error was encountered, False otherwise
@@ -569,13 +581,18 @@
"""
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
+ """
+ @param withMetaData: include meta data for each peer?
+ @param withVolantile: include volantile data for each peer?
+ @ivar jobResult: on job completion, will be a list containing all perrs as one 'Peer' message for each peer
+ """
message = Message(
Message.ListPeers,
WithMetadata=fcpBool(withMetaData),
WithVolatile=fcpBool(withVolantile),
)
JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
-
+
def handleMessage(self,msg):
if msg.name == Message.EndListPeers:
@@ -585,17 +602,72 @@
else:
raise ValueError('Unexpected message: %s' % msg.name)
+
def handlePeer(self, msg):
+ if self.jobResult is None:
+ self.jobResult = [msg, ]
+ else:
+ self.jobResult.append(msg)
return True
def handleEndListPeers(self, msg):
self.jobTimeStop = time.time()
- self.jobResult = msg
+ if self.jobResult is None:
+ self.jobResult = []
self.jobClient.jobRemove(self.jobIdentifier)
return True
+
+class JobListPeerNotes(JobBase):
+ """Lists all notes associated to a peer of the node
+ """
+
+ def __init__(self, fcpClient, identifier):
+ """
+ @param identifier: identifier of the peer to list notes for (peer identity)
+ @ivar jobResult: on job completion will be a list containing all notes associated to the peer
+
+ @note: notes are only available for darknet peers (opennet == false)
+ """
+
+ message = Message(
+ Message.ListPeerNotes,
+ NodeIdentifier=identifier
+ )
+ JobBase.__init__(self, fcpClient, identifier, message)
+
+
+ def handleMessage(self,msg):
+ if msg.name == Message.EndListPeerNotes:
+ return self.handleEndListPeerNotes(msg)
+ elif msg.name == Message.PeerNote:
+ return self.handlePeerNote(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+ def handlePeerNote(self, msg):
+ note = msg.get('NoteText', '')
+ if note:
+ note = base64.decodestring(note)
+ if self.jobResult is None:
+ self.jobResult = [note, ]
+ else:
+ self.jobResult.append(note)
+ return True
+
+
+ def handleEndListPeerNotes(self, msg):
+ self.jobTimeStop = time.time()
+ if self.jobResult is None:
+ self.jobResult = []
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
+
+
#TODO: identifier collisions are not yet handled
class JobGetFileInfo(JobBase):
"""Tries to retieve information about a file. If everything goes well
@@ -735,7 +807,13 @@
"""
- def __init__(self, fcpClient, directory, read=True, write=True):
+ def __init__(self, fcpClient, directory, read=False, write=False):
+ """
+
+ @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed)
+ """
+
+
if not os.path.isdir(directory):
raise ValueError('No such directory: %r' % directory)
@@ -747,8 +825,7 @@
)
JobBase.__init__(self, fcpClient, directory, message)
self.jobTmpFile = None
-
-
+
def handleMessage(self, msg):
if msg.name == Message.TestDDAReply:
@@ -786,13 +863,50 @@
def handleTestDDAComplete(self, msg):
self.jobTimeStop = time.time()
- self.jobResult = msg
+ self.jobResult = (
+ pythonBool(msg.get('ReadDirectoryAllowed', 'false')),
+ pythonBool(msg.get('WriteDirectoryAllowed', 'false')),
+ )
saveRemoveFile(self.jobTmpFile)
self.jobTmpFile = None
self.jobClient.jobRemove(self.jobIdentifier)
return True
+
+class JobGenerateSSK(JobBase):
+ """Job to generate a SSK key pair
+ """
+
+
+ def __init__(self, fcpClient):
+ """
+ @ivar jobResult: on job completion, a tuple(insertURI, requestURI) of the generated
+ SSK key
+ """
+
+ identifier = newIdentifier()
+ message = Message(
+ Message.GenerateSSK,
+ Identifier=identifier,
+ )
+ JobBase.__init__(self, fcpClient, identifier, message)
+
+
+ def handleMessage(self, msg):
+ if msg.name == Message.SSKKeypair:
+ return self.handleSSKKeypair(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+
+ def handleSSKKeypair(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = (msg['InsertURI'], msg['RequestURI'])
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
#**************************************************************************
# fcp client
#**************************************************************************
@@ -840,20 +954,19 @@
with two params: SocketError + details. When the handler is called the client
is already closed.
@param verbosity: verbosity level for debugging
- @param logMessages: LogMessages class containing messages
+ @param logMessages: LogMessages class containing message strings
"""
self._isConnected = False
self._jobs = {
- #TODO: check if JobList is still required
- 'JobMapping': {},
- 'JobList': [],
+ 'Jobs': {},
+ 'PendingJobs': [],
'RegisteredDirectories': [],
}
- self._errorHandler = errorHandler
+ self._errorHandler = errorHandler #TODO: check!
self._log = logging.getLogger(name)
self._logMessages = logMessages
- self._lock = thread.allocate_lock() # lovk when resources are accessed
+ self._lock = thread.allocate_lock() # lock when resources are accessed
self._socket = None
self.setVerbosity(verbosity)
@@ -933,7 +1046,7 @@
if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst:
return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
- elif code == ProtocolError.Shutdown:
+ elif code == ProtocolError.ShuttingDown:
if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg):
# ########################################
@@ -956,6 +1069,11 @@
identifier = msg['Directory']
elif msg.name == Message.TestDDAComplete:
identifier = msg['Directory']
+ elif msg.name == Message.PeerNote:
+ identifier = msg['NodeIdentifier']
+ elif msg.name == Message.EndListPeerNotes:
+ identifier = msg['NodeIdentifier']
+
else:
identifier = msg.get('Identifier', None)
@@ -971,6 +1089,13 @@
elif msg.name == Message.Peer:
return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
+ #elif msg.name == Message.PeerNote:
+ # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg)
+
+ #elif msg.name == Message.EndListPeerNotes:
+ # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg)
+
+
# more here.....
else:
@@ -993,9 +1118,12 @@
"""
self._lock.acquire(True)
try:
- result = bool(self._jobs['JobList'])
+ result = self._jobs['Jobs'] or self._jobs['PendingJobs']
finally:
self._lock.release()
+
+
+
return result
@@ -1006,20 +1134,16 @@
"""
self._lock.acquire(True)
try:
- if job.jobIdentifier in self._jobs['JobMapping']:
- raise ValueError('Job with that identifier already present')
-
- if job.jobIdentifier in self._jobs['JobMapping']:
+ if job.jobIdentifier in self._jobs['Jobs']:
raise ValueError('Duplicate job: %r' % job.jobIdentifier)
- self._jobs['JobMapping'][job.jobIdentifier] = job
- self._jobs['JobList'].append(job)
+ self._jobs['Jobs'][job.jobIdentifier] = job
finally:
self._lock.release()
self._log.info(self._logMessages.JobStart + job.jobMessage.name)
job.handleStart()
if synchron:
- while job.jobResult is None:
+ while self.jobGet(job.jobIdentifier):
self.next()
@@ -1042,7 +1166,7 @@
"""
self._lock.acquire(True)
try:
- result = self._jobs['JobMapping'].get(identifier, None)
+ result = self._jobs['Jobs'].get(identifier, None)
finally:
self._lock.release()
return result
@@ -1055,7 +1179,7 @@
"""
self._lock.acquire(True)
try:
- result = identifier in self._jobs['JobMapping']
+ result = identifier in self._jobs['Jobs']
finally:
self._lock.release()
return result
@@ -1068,10 +1192,9 @@
"""
self._lock.acquire(True)
try:
- job = self._jobs['JobMapping'].get(identifier, None)
+ job = self._jobs['Jobs'].get(identifier, None)
if job is not None:
- self._jobs['JobList'].remove(job)
- del self._jobs['JobMapping'][identifier]
+ del self._jobs['Jobs'][identifier]
finally:
self._lock.release()
if job is None:
@@ -1209,17 +1332,86 @@
self._log.setLevel(verbosity)
+
########################################################
##
########################################################
def getFileInfo(self, job):
pass
- def getFile(self, job):
- pass
+
+
+ #########################################################
+ ## boilerplate code to tackle TestDDA
+ ##
+ ## ...but I don't trust it ;-) I was not yet alble to wrap my head around
+ ## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not.
+ ##
+ ## Another problem is that there is no way to know when a directory is no longer
+ ## needed. And I fon't want to write code in a Gui to tackle a problem that will hopefully
+ ## go away in the near future.
+ ##
+ ## see: https://bugs.freenetproject.org/view.php?id=1753
+ ##
+ #########################################################
+ def testWriteAccess(self, directory):
+ canRead, canWrite = False, False
+ result = self._jobs.get('RegisteredDirectories', None)
+ if result is not None:
+ canRead, canWrite = result
+ if not canWrite:
+ job = JobTestDDA(directory, read=canRead, write=True)
+ self.addJob(job, synchron=True)
+ canWrite = job.jobResult[1]
+ self._jobs['RegisteredDirectories'] = (canRead, canWrite)
+ return canWrite
+
+ def testReadAccess(self, directory):
+ canRead, canWrite = False, False
+ result = self._jobs.get('RegisteredDirectories', None)
+ if result is not None:
+ canRead, canWrite = result
+ if not canRead:
+ job = JobTestDDA(directory, read=True, write=canWrite)
+ self.addJob(job, synchron=True)
+ canRead = job.jobResult[0]
+ self._jobs['RegisteredDirectories'] = (canRead, canWrite)
+ return canRead
+
+
+ def downloadFile(self, directory, job):
+ if not os.path.isdir(directory):
+ raise IOError('No such directory')
+
+ self._jobs['PendingJobs'].append(job)
+ try:
+ result = self.testWriteAccess(directory)
+ if result:
+ self.addJob(job)
+ finally:
+ self._jobs['PendingJobs'].remove(job)
+ return result
+
+ def uploadFile(self, directory, job):
+ if not os.path.isdir(directory):
+ raise IOError('No such directory')
+
+ self._jobs['PendingJobs'].append(job)
+ try:
+ result = self.testReadAccess(directory)
+ if result:
+ self.addJob(job)
+ finally:
+ self._jobs['PendingJobs'].remove(job)
+ return result
+
+
+
+
+
#*****************************************************************************
#
#*****************************************************************************
@@ -1239,7 +1431,16 @@
#foo()
+
def foo():
+ job = JobGenerateSSK(c)
+ c.jobAdd(job, synchron=True)
+ print job.jobResult
+ foo()
+
+
+
+ def foo():
d = os.path.dirname(os.path.abspath(__file__))
job2 = JobTestDDA(c, d)
c.jobAdd(job2)
@@ -1250,12 +1451,21 @@
#foo()
def foo():
- job2 = JobListPeers(c)
- c.jobAdd(job2)
+ job = JobListPeers(c)
+ c.jobAdd(job)
c.run()
print '---------------------------'
- print job2.jobResult
+ print job.jobResult
print '---------------------------'
+
+
+ for peer in job.jobResult:
+ if not pythonBool(peer['opennet']):
+ job = JobListPeerNotes(c, peer['identity'])
+ c.jobAdd(job, synchron=True)
+ print '>>', job.jobResult
+ #.get('NoteText')
+
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|