SF.net SVN: fclient: [101] trunk/sandbox/fcp/fcp2_0_client.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <ju...@us...> - 2008-02-01 14:46:05
|
Revision: 101
http://fclient.svn.sourceforge.net/fclient/?rev=101&view=rev
Author: jurner
Date: 2008-02-01 06:46:08 -0800 (Fri, 01 Feb 2008)
Log Message:
-----------
added plugin related mesages
experimental Upload class to tackle single / multiple item uploads
Modified Paths:
--------------
trunk/sandbox/fcp/fcp2_0_client.py
Modified: trunk/sandbox/fcp/fcp2_0_client.py
===================================================================
--- trunk/sandbox/fcp/fcp2_0_client.py 2008-02-01 14:44:41 UTC (rev 100)
+++ trunk/sandbox/fcp/fcp2_0_client.py 2008-02-01 14:46:08 UTC (rev 101)
@@ -23,28 +23,64 @@
# problem of knowing when the node has actually registered a request. The node does not send
# an EndListPersistentRequests on connect so it is impossible to tell when or if to restore one of
# the pending requests we stored.
+#
+#FIX: None
#---------------------------------------------------------------------------------------------------------------------------------------------
# [0001893: CloseConnectionDuplicateClientName bug or feature?]
#
# CloseConnectionDuplicateClientName
# currently fcp takes down a our connection if another client (...) uses the same connection name.
+#
+#FIX: None
#----------------------------------------------------------------------------------------------------------------------------------------------
# [0001888: explicite connection locale]
#
# Many strings are already translated by freenet, but there is no way to tell the node wich language
# to use to talk to a client. Maybe a good idea, maybe not.
+#
+#FIX: None
#-----------------------------------------------------------------------------------------------------------------------------------------------
# [0001781: unregister directories registered via TestDDARequest]
#
# With current state of the art DDA handling it is not possiblr to unregister directories (may pile
# up in memory over time).
+#
+#FIX: None
#------------------------------------------------------------------------------------------------------------------------------------------------
# [0002019: Socket dies if first message is not ClientHello]
#
# minor one
+#
+#FIX: None
+#------------------------------------------------------------------------------------------------------------------------------------------------
+# [0001965: Persistence vs PersistenceType]
+#
+# PersistentGet passes persistence field as "PersistenceType", PersistentPut as "Persistence"
+# already fixed this here in the client
+#
+# FIX: implemented in client
#-------------------------------------------------------------------------------------------------------------------------------------------------
+# [0002015: Drop the global queue]
+#
+# this one is somewhat related to [0001931: Send EndListPersistentRequests following client connect]
+#
+# We never use or watch the global queue. It is to dangerous. But problems remain when it comes
+# to restoring persistent requests. Shure these are our requests? Worst case is a client with a colliding
+# connection name flooding our client with an unknown number of left overs.
+#
+#FIX: None (that is, this case is handled as savely as possible - except from possible slowdowns - but no
+# guarantee that no unknown request may slip through)
+#-------------------------------------------------------------------------------------------------------------------------------------------------
+# [0001894: HandleCollision field in ClientGet]
+#
+# minor one. When downloading a file, filename collisions may occur. Fcp does not handle these very well
+# It checks if the tempfile (filename ?) can be created newly when the request is started. IIRC In the final
+# rename of the tempfile to filename no check is done and filename will get overwritten.
+#
+#FIX: we handle collisions in the client as savely as possible. But no guarantee either when a colliding file
+# (...) finds his way into the download directory while downloading another.
+#------------------------------------------------------------------------------------------------------------------------------------------------
-
import atexit
import cPickle
import logging
@@ -143,13 +179,16 @@
except ValueError:
DefaultFcpPort = 9481
+
+
#TODO: check if required
# suggested by Mathew Toseland to use about 32k for mimeType requests
# basic sizes of keys are: 1k for SSks and 32k for CHKs
# without MaxSize DataFound will have DataLength set to 0 (?!)
MaxSizeKeyInfo = '32000'
SocketTimeout = 0.1
- Version = '2.0'
+ ExpectedFcpVersion = 2.0
+ ExpectedNodeBuild = 1107
from fcp2_0_config import Config
from fcp2_0_consts import (
@@ -159,6 +198,7 @@
FetchError,
FilenameCollision,
InsertError,
+ KeyType,
LogMessages,
PeerNodeStatus,
PeerNoteType,
@@ -207,8 +247,14 @@
'PeerNote',
+ # plugins
+ 'PluginInfo',
+ 'PluginInfoFailed',
+ 'PluginMessage'
+ 'PluginMessagefailed',
+
# others
- 'SSKKeypair',
+ 'KeypairGenerated',
###############################
@@ -232,10 +278,11 @@
"""
self._connectionName = self.setConnectionName(connectionName)
self._ddaTests = [] # currently running DDA tests (request0, ... requestN)
- self._sskRequests = {} # currently pending ssk requests (sskIdentifier --> request)
+ self._sskRequests = [] # currently pending ssk requests (identifier1... identiferN)
self._requests = {} # currently running requests (requestIdentifier --> request)
self._log = logging.getLogger(self.__class__.__name__)
+ self._nodeHelloMessage = None
self._socket = None
self.events = self.Events()
@@ -359,8 +406,11 @@
# fix some Fcp inconsistencies ClientGet vs. PersistentGet
if msg.name == self.Message.MessagePersistentGet:
del msg.params['Started']
+ #FIX: [0001965: Persistence vs PersistenceType]
if 'PersistenceType' in msg.params:
msg['Persistence'] = msg.params.pop('PersistenceType')
+ elif msg.name == self.Message.MessagePersistentPut:
+ del msg.params['Started']
return msg
@@ -385,9 +435,9 @@
self._ddaTests = []
self._requests = {}
- self._sskRequests = {}
+ self._sskRequests = []
-
+
def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, duration=20, timeout=0.5):
"""Iterator to stablish a connection to a freenet node
@param host: (str) host of th node
@@ -428,18 +478,31 @@
self.sendMessage(
self.Message.MessageClientHello,
Name=self._connectionName,
- ExpectedVersion=self.Version,
+ ExpectedVersion=self.ExpectedFcpVersion,
)
while timeElapsed <= duration:
msg = self.next(dispatch=False)
+
if msg.name == self.Message.MessageClientSocketTimeout:
timeElapsed += self.SocketTimeout
yield None
+
elif msg.name == self.Message.MessageNodeHello:
+ self._nodeHelloMessage = msg
self._log.debug(self.LogMessages.MessageReceived + msg.pprint())
- self.events.ClientConnected(msg)
- yield msg
+ # check if version is ok
+ if self.versionCheckNodeHello(msg):
+ self.events.ClientConnected(msg)
+ else:
+ self.close()
+ msg = self.Message(
+ self.Message.MessageClientDisconnected,
+ DisconnectReason=self.DisconnectReason.VersionMissmatch
+ )
+ self.events.ClientDisconnected(msg)
+ yield self._nodeHelloMessage
raise StopIteration
+
else:
self._log.debug(self.LogMessages.MessageReceived + msg.pprint())
break
@@ -497,6 +560,18 @@
stdout, stderr = p.communicate()
return stdout
+
+ def versionCheckNodeHello(self, nodeHelloMessage):
+ """Performa a version check of the client against the specified NodeHello message
+ @return: (bool) True if version is ok, False otherwise
+ @note: if this check returns False the client will emidiately disconnect in L{connect()}
+ and triggers a ClientDisconnected event. Overwrite to adjust
+ """
+ if nodeHelloMessage['FCPVersion'] >= self.ExpectedFcpVersion:
+ if nodeHelloMessage['Build'] >= self.ExpectedNodeBuild:
+ return True
+ return False
+
#########################################################
##
## runtime related methods
@@ -553,7 +628,7 @@
self.Message.MessageClientDisconnected,
DisconnectReason=DisconnectReason.Shutdown,
)
- self.events.ClientDisconnect(msg)
+ self.events.ClientDisconnected(msg)
return True
@@ -564,12 +639,8 @@
ddaRequestMsg['WantWriteDirectory'] = True
directory = os.path.dirname(initialRequest['Filename'])
else:
-
- #TODO: determine directory for other cases
- raise RuntimeError(NotImplemented)
-
ddaRequestMsg['WantReadDirectory'] = True
- directory = None
+ directory = os.path.dirname(initialRequest['Filename'])
ddaRequestMsg['Directory'] = directory
@@ -607,6 +678,30 @@
initialRequest['FcFilenameCollision'] &= ~self.FilenameCollision.CollisionHandled
+ # handle plugin related request failures
+ elif code == self.ProtocolError.NoSuchPlugin:
+ if initialRequest.name == self.Message.MessagePluginInfo:
+ del self._requests[requestIdentifier]
+ self.events.PluginInfoFailed(initialRequest)
+ return True
+ elif initialRequest.name == self.Message.MessageFCPPluginMessage:
+ del self._requests[requestIdentifier]
+ self.events.PluginMessageFailed(initialRequest)
+ return True
+
+ elif code == self.ProtocolError.AccessDenied:
+ if initialRequest.name == self.Message.MessagePluginInfo:
+ del self._requests[requestIdentifier]
+ self.events.PluginInfoFailed(initialRequest)
+ return True
+ # TODO: just a guess that FCPPluginMessage can trigger an AccessDenied error
+ elif initialRequest.name == self.Message.MessageFCPPluginMessage:
+ del self._requests[requestIdentifier]
+ self.events.PluginMessageFailed(initialRequest)
+ return True
+
+
+
# only requests should get through to here
# NOTE: Fcp already removed the request
@@ -776,10 +871,6 @@
elif msg.name == self.Message.MessagePersistentGet:
- # NOTE:
- # Fcp does no good job in handling persistent requests and identifiers. Already dropped some
- # notes and reports regarding this. See freenet-tech mailing list [Fcp notes and issues]
-
# unknown request... try to restore it
if initialRequest is None:
restoredRequest = self._restorePersistentRequestFromNode(msg)
@@ -793,7 +884,10 @@
)
return True
+ # determine initial message name
restoredRequest.name = self.Message.MessageClientGet
+
+ # restore request
self._requests[requestIdentifier] = restoredRequest
restoredRequest['FcStatus'] = self.Message.StatusStarted
self.events.RequestRestored(restoredRequest)
@@ -810,7 +904,48 @@
return True
+ elif msg.name == self.Message.MessagePersistentPut:
+
+ # unknown request... try to restore it
+ if initialRequest is None:
+ restoredRequest = self._restorePersistentRequestFromNode(msg)
+
+ # not one of our requests... so cancel it
+ if restoredRequest is None or CancelPersistentRequests:
+ self.sendMessage(
+ self.Message.MessageRemovePersistentRequest,
+ Identifier=msg['Identifier'],
+ Global=msg['Global'],
+ )
+ return True
+
+ # determine initial message name
+ if restoredRequest['FcSubType'] == self.Message.SubTypePut:
+ restoredRequest.name = self.Message.MessageClientPut
+ elif restoredRequest['FcSubType'] == self.Message.SubTypePutDiskDir:
+ restoredRequest.name = self.Message.MessageClientPutDiskDir
+ elif restoredRequest['FcSubType'] == self.Message.SubTypePutComplexDir:
+ restoredRequest.name = self.Message.MessageClientPutComplexDir
+
+ # restore request
+ self._requests[requestIdentifier] = restoredRequest
+ restoredRequest['FcStatus'] = self.Message.StatusStarted
+ self.events.RequestRestored(restoredRequest)
+ return True
+
+ # known request... filter out multiple PersistentGets
+ if initialRequest['FcStatus'] == self.Message.StatusPending:
+ initialRequest['FcStatus'] = self.Message.StatusStarted
+
+ #TODO: update initialRequest with params from PersistentPut?
+ #TODO: update initialRequest with params from PersistentPut?
+
+ self.events.RequestStarted(initialRequest)
+ return True
+
+ return True
+
elif msg.name == self.Message.MessagePersistentRequestModified:
if initialRequest is None:
return False
@@ -925,21 +1060,49 @@
####################################################
##
+ ## plugins
+ ##
+ ####################################################
+ elif msg.name == self.Message.MessagePluginInfo:
+ if initialRequest is None:
+ return False
+
+ del self._requests[requestIdentifier]
+ self.events.PluginInfo(msg)
+ return True
+
+ elif msg.name == self.Message.MessageFCPPluginReply:
+ if initialRequest is None:
+ return False
+
+ del self._requests[requestIdentifier]
+ self.events.PluginMessage(msg)
+ return True
+
+ ####################################################
+ ##
## others
##
####################################################
elif msg.name == self.Message.MessageSSKKeypair:
- if initialRequest is None:
- self.events.SSKKeypair(msg)
- return True
+ if requestIdentifier not in self._sskRequests:
+ return False
- initialRequest['Uri'] = msg['InsertUri']
- initialRequest['FcRequestUri'] = msg['RequestUri']
- self.sendMessageEx(initialRequest)
+ self._sskRequests.remove(requestIdentifier)
+ #TODO:no idea if the node may pass uris with prefixes like 'freenet:'... strip it anyways
+ insertURI = self.Uri(msg['InsertURI']).uri
+ requestURI = self.Uri(msg['RequestURI']).uri
+ keyType = consts.KeyType.USK if requestIdentifier.startswith(consts.KeyType.USK) else consts.KeyType.SSK
+ if keyType == consts.KeyType.USK:
+ insertURI = requestURI.replace(consts.KeyType.SSK, consts.KeyType.USK, 1)
+ requestURI = requestURI.replace(consts.KeyType.SSK, consts.KeyType.USK, 1)
+
+ msg['InsertURI'] = insertURI
+ msg['RequestURI'] = requestURI
+ msg['FcKeyType'] = keyType
+ self.events.KeypairGenerated(msg)
return True
-
-
elif msg.name == self.Message.MessageCloseConnectionDuplicateClientName:
msg = self.Message(
@@ -1273,6 +1436,38 @@
## ClientPut related methods
##
########################################################
+ def clientPutUpload(self, upload, userData=None, persistentUserData=''):
+
+ msg = upload.getMessage(self.Message)
+ if msg is None:
+ raise ValueError('Nothing to upload')
+
+ # determine SubType
+ if msg.name == self.Message.MessageClientPut:
+ messageSubType = self.Message.SubTypePut
+ elif msg.name == self.Message.MessageClientPutDiskDir:
+ messageSubType = self.Message.SubTypePutDiskDir
+ else:
+ messageSubType = self.Message.SubTypePutComplexDir
+
+ self._registerRequest(
+ msg,
+ userData,
+ messageSubType,
+ time.time(),
+ persistentUserData,
+ #filenameCollision=filenameCollision,
+ )
+
+ if upload.keyType in (consts.KeyType.SSK, consts.KeyType.USK):
+ msg['FcRequestUri'] = upload.publicKey
+ #NOTE: the caller may use the 'FcInsertUri' member to store the private key
+
+
+ self.sendMessageEx(msg)
+ return msg['Identifier']
+
+
def clientPut(self,
uri,
messageSubType,
@@ -1303,9 +1498,11 @@
#TODO: method names
#CHK
+
+
def putData(self,
data,
-
+
contentType=None,
dontCompress=None,
maxRetries=None,
@@ -1316,10 +1513,11 @@
userData=None,
persistentUserData='',
):
+ """"""
return self.clientPut(
- self.Uri.KeyCHK,
- self.Message.SubTypePutData,
+ consts.KeyType.CHK,
+ self.Message.SubTypePut,
userData,
persistentUserData,
data,
@@ -1334,34 +1532,48 @@
MaxRetries=maxRetries,
DontCompress=dontCompress,
Persistence=persistence,
- TergetFilename=targetFilename,
+ TargetFilename=targetFilename,
UploadFrom=self.UploadFrom.Direct,
Verbosity=self.Verbosity.ReportProgress | self.Verbosity.ReportCompression,
)
- def putFile(self):
- pass
+ def putFile(self,
+ filename,
+
+ contentType=None,
+ dontCompress=None,
+ maxRetries=None,
+ persistence=consts.Persistence.Connection,
+ priorityClass=consts.Priority.Medium,
+ targetFilename=None,
+
+ userData=None,
+ persistentUserData='',
+
+ ):
+ return self.clientPut(
+ concts.KeyType.CHK,
+ self.Message.SubTypePut,
+ userData,
+ persistentUserData,
+ None,
+
+ # fcp params
+ Filename=filename,
+ ContentType=contentType,
+ #EarlyEncode='false',
+ #GetCHKOnly='false',
+ Global=False,
+ Identifier=None,
+ MaxRetries=maxRetries,
+ DontCompress=dontCompress,
+ Persistence=persistence,
+ TergetFilename=targetFilename,
+ UploadFrom=self.UploadFrom.Disk,
+ Verbosity=self.Verbosity.ReportProgress | self.Verbosity.ReportCompression,
+ )
- #KSK
- def putNamedData(self):
- pass
-
- def putReference(self):
- pass
-
-
- #SSK
- def putUpdatableData(self):
- pass
-
-
- #USK
- def putVersionedData(self):
- pass
-
-
-
########################################################
##
## request related methods
@@ -1530,18 +1742,79 @@
##########################################################
##
+ ## plugins
+ ##
+ ##########################################################
+ #TODO: curently it is just a guess the a plugin may respond with a IdentifierCollision
+ def getPluginInfo(self, pluginName, detailed=False):
+ """Requests information about a plugin
+ @param pluginName: (str) name of the plugin to request info for
+ @param detailed: (bool) If True, detailed information is returned
+ @return: (str) request identifier
+ """
+ while True:
+ identifier = uuid.uuid_time()
+ if identifier not in self._requests:
+ break
+ msg = self.Message(
+ self.Message.MessageGetPluginInfo,
+ Identifier=identifier,
+ PluginName=pluginName,
+ Detailed=detailed,
+ )
+ self._requests[identifier] = msg
+ self.sendMessageEx(msg)
+ return identifier
+
+
+ def sendPluginMessage(self, pluginName, params, data=None):
+ """Sends a message to a plugin
+ @param pluginName: name of the plugin to send the message to
+ @param poarams: (dict) additional params to pass to the plugin (each parameter has to be prefixed with 'Param.')
+ @param data: (str) data to pass along with the messaage or None
+ @return: (str) request identifier
+ """
+ while True:
+ identifier = uuid.uuid_time()
+ if identifier not in self._requests:
+ break
+ msg = self.Message(
+ self.Message.MessageGetPluginInfo,
+ Identifier=identifier,
+ PluginName=pluginName,
+ **params
+ )
+ if data is not None:
+ msg['DataLength'] = len(data)
+ msg.data = data
+
+ self._requests[identifier] = msg
+ self.sendMessageEx(msg)
+ return identifier
+
+
+
+
+ ##########################################################
+ ##
## others
##
##########################################################
- def generateSSK(self):
+ def generateKeypair(self, keypairType=consts.KeyType.SSK):
"""
- @event: SSKKeypair(event, params), triggered when the request is complete
+ @param keypairType: type of keypair to generate (either L{KeyType.SSK} or L{KeyType.SSK})
@return: identifier of the request
+ @event: KeypairGenerated(event, params) is triggered when the request is complete
"""
+
+ if keypairType not in (consts.KeyType.SSK, consts.KeyType.USK):
+ raise ValueError('keypairType must be %s or %s' % (consts.KeyType.SSK, consts.KeyType.USK))
+
while True:
- identifier = uuid.uuid_time()
+ identifier = keypairType + uuid.uuid_time()
if identifier not in self._sskRequests:
break
+ self._sskRequests.append(identifier)
self.sendMessage(
self.Message.MessageGenerateSSK,
Identifier=identifier,
@@ -1562,7 +1835,7 @@
if nodeHello is not None:
- #for i in xrange(5):
+ #for i in xrange(50):
# c.next()
@@ -1623,9 +1896,26 @@
def testPutData():
- identifier = c.putData(
+ myIdentifier = c.putData(
'test123',
+ #persistence=c.Persistence.Reboot,
)
+
+ for i in xrange(100):
+ c.next()
+ c.removeRequest(myIdentifier)
+ for i in xrange(5):
+ c.next()
+
+ #testPutData()
+
+
+ def testPutFile():
+ fpath = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test.jpg')
+
+ identifier = c.putFile(
+ fpath,
+ )
for i in xrange(1000):
c.next()
@@ -1633,11 +1923,9 @@
for i in xrange(5):
c.next()
- #testPutData()
-
+ #testPutFile()
-
def testConfigData():
from fcp2_0_config import Config
@@ -1688,32 +1976,39 @@
#testNodeData()
- def testGenerateSSK():
+ def testGenerateKeypair():
- #def cb(event, msg):
- # pass
-
-
- #c.events.NodeData += cb
- c.generateSSK()
+ def cb(event, msg):
+ print msg.pprint()
+
+ c.events.KeypairGenerated += cb
+ c.generateKeypair('SSK@')
+ c.generateKeypair('USK@')
for i in xrange(10):
c.next()
- #testGenerateSSK()
+ #testGenerateKeypair()
- def testListPeers():
- c.listPeers()
- for i in xrange(10):
- c.next()
-
- #testListPeers()
+ def testListPeers():
+ c.listPeers()
+ for i in xrange(10):
+ c.next()
+
- def testGetNode():
- c.getNode()
- for i in xrange(10):
- c.next()
+ #testListPeers()
- #testGetNode()
+ def testGetNode():
+ c.getNode()
+ for i in xrange(10):
+ c.next()
+
+ #testGetNode()
-
\ No newline at end of file
+ def testGetPluginInfo():
+ c.getPluginInfo('plugins.XMLLibrarian')
+ for i in xrange(10):
+ c.next()
+
+ #testGetPluginInfo()
+
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|