Thread: SF.net SVN: fclient: [9] trunk/fclient/fclient_lib/fcp/fcp2_0.py
Status: Pre-Alpha
Brought to you by:
jurner
|
From: <jU...@us...> - 2007-10-20 13:37:47
|
Revision: 9
http://fclient.svn.sourceforge.net/fclient/?rev=9&view=rev
Author: jUrner
Date: 2007-10-20 06:37:51 -0700 (Sat, 20 Oct 2007)
Log Message:
-----------
bit of refactoring
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 10:02:28 UTC (rev 8)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 13:37:51 UTC (rev 9)
@@ -22,79 +22,14 @@
except: pass
SocketTimeout = 0.1
-class JobIdentifiers:
+class FixedJobIdentifiers:
"""Fixed job identifiers
@note: he client can only handle one job of these at a time
"""
ClientHello = 'ClientHello'
ListPeers = 'ListPeers'
-class Messages:
- """All messages supported by the client"""
-
- # client messages
- ClientHello = 'ClientHello'
- ListPeer = 'ListPeer' # (since 1045)
- ListPeers = 'ListPeers'
- ListPeerNotes = 'ListPeerNotes'
- AddPeer = 'AddPeer'
- ModifyPeer = 'ModifyPeer'
- ModifyPeerNote = 'ModifyPeerNote'
- RemovePeer = 'RemovePeer'
- GetNode = 'GetNode'
- GetConfig = 'GetConfig' # (since 1027)
- ModifyConfig = 'ModifyConfig' # (since 1027)
- TestDDARequest = 'TestDDARequest' # (since 1027)
- TestDDAResponse = 'TestDDAResponse' # (since 1027)
- GenerateSSK = 'GenerateSSK'
- ClientPut = 'ClientPut'
- ClientPutDiskDir = 'ClientPutDiskDir'
- ClientPutComplexDir = 'ClientPutComplexDir'
- ClientGet = 'ClientGet'
- SubscribeUSK = 'SubscribeUSK'
- WatchGlobal = 'WatchGlobal'
- GetRequestStatus = 'GetRequestStatus'
- ListPersistentRequests = 'ListPersistentRequests'
- RemovePersistentRequest = 'RemovePersistentRequest'
- ModifyPersistentRequest = 'ModifyPersistentRequest'
- Shutdown = 'Shutdown'
-
- # node messages
- NodeHello = 'NodeHello'
- CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
- Peer = 'Peer'
- PeerNote = 'PeerNote'
- EndListPeers = 'EndListPeers'
- EndListPeerNotes = 'EndListPeerNotes'
- PeerRemoved = 'PeerRemoved'
- NodeData = 'NodeData'
- ConfigData = 'ConfigData' # (since 1027)
- TestDDAReply = 'TestDDAReply' # (since 1027)
- TestDDAComplete = 'TestDDAComplete' # (since 1027)
- SSKKeypair = 'SSKKeypair'
- PersistentGet = 'PersistentGet'
- PersistentPut = 'PersistentPut'
- PersistentPutDir = 'PersistentPutDir'
- URIGenerated = 'URIGenerated'
- PutSuccessful = 'PutSuccessful'
- PutFetchable = 'PutFetchable'
- DataFound = 'DataFound'
- AllData = 'AllData'
- StartedCompression = 'StartedCompression'
- FinishedCompression = 'FinishedCompression'
- SimpleProgress = 'SimpleProgress'
- EndListPersistentRequests = 'EndListPersistentRequests'
- PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
- PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
- PutFailed = 'PutFailed'
- GetFailed = 'GetFailed'
- ProtocolError = 'ProtocolError'
- IdentifierCollision = 'IdentifierCollision'
- UnknownNodeIdentifier = 'UnknownNodeIdentifier'
- UnknownPeerNoteType = 'UnknownPeerNoteType'
- SubscribedUSKUpdate = 'SubscribedUSKUpdate'
-
class Priorities:
"""All priorities supported by the client"""
@@ -109,12 +44,23 @@
PriorityMin = Minimum
PriorityDefault = Bulk
-
-# errors
-
-class FetchErrors:
+#************************************************************************************
+# exceptions
+#************************************************************************************
+class FetchError(Exception):
"""All fetch errors supported by the client"""
+ def __init__(self, msg):
+ """
+ @param msg: (Message) GetFailed message
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+ def __str__(self): return self.value
+
MaxArchiveRecursionExceeded = '1'
UnknownSplitfileMetadata = '2'
UnknownMetadata = '3'
@@ -145,9 +91,20 @@
NotAllDataFound = '28'
-class InsertErrors:
+class InsertError(Exception):
"""All insert errors supported by the client"""
+ def __init__(self, msg):
+ """
+ @param msg: (Message) PutFailed message
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+ def __str__(self): return self.value
+
InvalidUri = '1'
BucketError = '2'
InternalError = '3'
@@ -160,9 +117,20 @@
Canceled = '10'
-class ProtocolErrors:
+class ProtocolError(Exception):
"""All protocol errors supported by the client"""
+ def __init__(self, msg):
+ """
+ @param msg: (Message) ProtocolError message
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+ def __str__(self): return self.value
+
ClientHelloMustBeFirst = '1'
NoLateClientHellos = '2'
MessageParseError = '3'
@@ -195,15 +163,35 @@
OpennetDisabled = '30'
DarknetOnly = '31'
+class SocketError(Exception): pass
#**********************************************************************
# functions
#**********************************************************************
+def fcpBool(pythonBool):
+ """Converts a python bool to a fcp bool
+ @param pythonBool: (bool)
+ @return: (str) 'true' or 'false'
+ """
+ if pythonBool:
+ return 'true'
+ return 'false'
+
+
def newIdentifier():
"""Returns a new unique identifier
@return: (str) uuid
"""
return str(uuid.uuid4())
+
+def pythonBool(fcpBool):
+ """Converts a fcp bool to a python bool
+ @param pythonBool: 'true' or 'false'
+ @return: (bool) True or False
+ """
+ return fcpBool == 'true'
+
+
def saveReadFile(fpath):
"""Reads contents of a file in the savest manner possible
@param fpath: file to write
@@ -267,42 +255,9 @@
stdout, stderr = p.communicate()
return stdout
-
-def fcpBool(pythonBool):
- """Converts a python bool to a fcp bool
- @param pythonBool: (bool)
- @return: (str) 'true' or 'false'
- """
- if pythonBool:
- return 'true'
- return 'false'
-
-def pythonBool(fcpBool):
- """Converts a fcp bool to a python bool
- @param pythonBool: 'true' or 'false'
- @return: (bool) True or False
- """
- return fcpBool == 'true'
-
#**********************************************************************
# classes
#**********************************************************************
-class FcpSocketError(Exception): pass
-class FcpProtocolError(Exception):
- def __init__(self, msg):
- """
- @param msg: (Message) ProtocolError message
- """
- self.value = '%s (%s, %s)' % (
- msg.get('CodeDescription', 'Unknown error') ,
- msg['Code'],
- msg.get('ExtraDescription', '...'),
- )
-
- def __str__(self): return self.value
-
-
-
class FcpUri(object):
"""Wrapper class for freenet uris"""
@@ -397,8 +352,68 @@
class Message(object):
"""Class wrapping a freenet message"""
- Name = 'UMessage'
+ # client messages
+ ClientHello = 'ClientHello'
+ ListPeer = 'ListPeer' # (since 1045)
+ ListPeers = 'ListPeers'
+ ListPeerNotes = 'ListPeerNotes'
+ AddPeer = 'AddPeer'
+ ModifyPeer = 'ModifyPeer'
+ ModifyPeerNote = 'ModifyPeerNote'
+ RemovePeer = 'RemovePeer'
+ GetNode = 'GetNode'
+ GetConfig = 'GetConfig' # (since 1027)
+ ModifyConfig = 'ModifyConfig' # (since 1027)
+ TestDDARequest = 'TestDDARequest' # (since 1027)
+ TestDDAResponse = 'TestDDAResponse' # (since 1027)
+ GenerateSSK = 'GenerateSSK'
+ ClientPut = 'ClientPut'
+ ClientPutDiskDir = 'ClientPutDiskDir'
+ ClientPutComplexDir = 'ClientPutComplexDir'
+ ClientGet = 'ClientGet'
+ SubscribeUSK = 'SubscribeUSK'
+ WatchGlobal = 'WatchGlobal'
+ GetRequestStatus = 'GetRequestStatus'
+ ListPersistentRequests = 'ListPersistentRequests'
+ RemovePersistentRequest = 'RemovePersistentRequest'
+ ModifyPersistentRequest = 'ModifyPersistentRequest'
+ Shutdown = 'Shutdown'
+ # node messages
+ NodeHello = 'NodeHello'
+ CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
+ Peer = 'Peer'
+ PeerNote = 'PeerNote'
+ EndListPeers = 'EndListPeers'
+ EndListPeerNotes = 'EndListPeerNotes'
+ PeerRemoved = 'PeerRemoved'
+ NodeData = 'NodeData'
+ ConfigData = 'ConfigData' # (since 1027)
+ TestDDAReply = 'TestDDAReply' # (since 1027)
+ TestDDAComplete = 'TestDDAComplete' # (since 1027)
+ SSKKeypair = 'SSKKeypair'
+ PersistentGet = 'PersistentGet'
+ PersistentPut = 'PersistentPut'
+ PersistentPutDir = 'PersistentPutDir'
+ URIGenerated = 'URIGenerated'
+ PutSuccessful = 'PutSuccessful'
+ PutFetchable = 'PutFetchable'
+ DataFound = 'DataFound'
+ AllData = 'AllData'
+ StartedCompression = 'StartedCompression'
+ FinishedCompression = 'FinishedCompression'
+ SimpleProgress = 'SimpleProgress'
+ EndListPersistentRequests = 'EndListPersistentRequests'
+ PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
+ PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
+ PutFailed = 'PutFailed'
+ GetFailed = 'GetFailed'
+ ProtocolError = 'ProtocolError'
+ IdentifierCollision = 'IdentifierCollision'
+ UnknownNodeIdentifier = 'UnknownNodeIdentifier'
+ UnknownPeerNoteType = 'UnknownPeerNoteType'
+ SubscribedUSKUpdate = 'SubscribedUSKUpdate'
+
def __init__(self, name, data=None, **params):
"""
@@ -410,29 +425,25 @@
self.data = data
self.name = name
self.params = params
-
-
+
def toString(self):
"""Returns a string with the formated message, ready to be send"""
-
# TODO: "Data" not yet implemented
out = [self.name, ]
for param, value in self.params.items():
out.append('%s=%s' % (param, value))
out.append('EndMessage\n')
return '\n'.join(out)
-
-
+
def pprint(self):
"""Returns the message as nicely formated human readable string"""
-
- out = [self.name, ]
+ out = ['', '>>' + self.name, ]
for param, value in self.params.items():
- out.append(' %s=%s' % (param, value))
- out.append('EndMessage')
+ out.append('>> %s=%s' % (param, value))
+ out.append('>>EndMessage')
out.append('')
return '\n'.join(out)
-
+
def __getitem__(self, name):
"""Returns the message parameter 'name' """
return self.params[name]
@@ -444,14 +455,13 @@
def __setitem__(self, name, value):
"""Sets the message parameter 'name' to 'value' """
self.params[name] = value
-
+
+
class MessageSocketTimeout(Message):
- Name = 'USocketTimeout'
-
def __init__(self):
- Message.__init__(self, self.Name)
+ Message.__init__(self, 'USocketTimeOut')
@@ -474,47 +484,29 @@
@ivar fcpIdentifier: identifier of the job
@ivar fcpMessage: initial message send to the node
@ivar fcpResult: if no error was encountered, holding the result of the job when complete
- @ivar fcpTime: when the job is complete, holding the time the job took to complete
+ @ivar fcpTimeStart: time the job was started
+ @ivar fcpTimeStop: time the job was stopped
"""
-
self.fcpClient = fcpClient # FcpClient() instance the job belongs to
- self.fcpError = None # last error (either this is set or dcpResult)
- self.fcpIdentifier = identifier #
+ self.fcpIdentifier = identifier # job identifier
self.fcpMessage = message # message send to node
- self.fcpResult = None # job result
+ self.fcpResult = None # job result (bool error, Message msg)
self.fcpTimeStart = 0 # time the job was started
self.fcpTimeStop = 0 # time the job was stopped
- self.fcpStopped = False
-
- def displayName(self):
- """Returns the display name of the job
- @return: (str) display name
- """
- return 'JobBase'
-
- def start(self):
+
+ def handleStart(self):
"""Starts the job"""
- self.fcpStopped = False
+ self.fcpResult = None
self.fcpTimeStart = time.time()
self.fcpClient.sendMessageEx(self.fcpMessage)
-
- def error(self, msg):
- """Called on job completion if an error was encounterd while runnng the job
- @param msg: (Message) to pass to the job
- """
- self.fcpStopped = True
- self.fcpTimeStop = time.time()
- self.fcpError = msg
- self.fcpResult = None
-
- def stop(self, msg):
+
+ def handleStop(self, flagError, msg):
"""Called on job completion to stop the job
+ @param flagError: True if an error was encountered, False otherwise
@param msg: (Message) to pass to the job
"""
- self.fcpStopped = True
self.fcpTimeStop = time.time()
- self.fcpError = None
- self.fcpResult = msg
+ self.fcpResult = (flagError, msg)
class JobClientHello(JobBase):
@@ -532,11 +524,11 @@
@param expectedVersion: (str) node version expected
"""
message = Message(
- Messages.ClientHello,
+ Message.ClientHello,
Name=name if name is not None else newIdentifier(),
ExpectedVersion=expectedVersion,
)
- JobBase.__init__(self, fcpClient, JobIdentifiers.ClientHello, message)
+ JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ClientHello, message)
def displayName(self):
return 'ClientHello'
@@ -550,16 +542,13 @@
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
message = Message(
- Messages.ListPeers,
+ Message.ListPeers,
WithMetadata=fcpBool(withMetaData),
WithVolatile=fcpBool(withVolantile),
)
- JobBase.__init__(self, fcpClient, JobIdentifiers.ListPeers, message)
+ JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
- def displayName(self):
- return 'ListPeers'
-
def handlePeer(self,msg):
"""Handles the next peer send by the node in form of a 'Peer' message
while the job is running. Overwrite to process.
@@ -592,7 +581,7 @@
"""
identifier = newIdentifier()
message = Message(
- Messages.ClientGet,
+ Message.ClientGet,
Identifier=identifier,
URI=uri,
MaxSize='0',
@@ -602,37 +591,25 @@
)
JobBase.__init__(self, fcpClient, identifier, message)
-
- def displayName(self):
- return 'GetFileInfo'
-
+
def handleProgress(self, msg):
"""Handles the next progress made. Overwrite to process.
"""
-
-
- def error(self, msg):
- JobBase.error(self, msg)
- if msg.name == Messages.GetFailed:
- if msg['Code'] == FetchErrors.TooBig:
- self.fcpError = None
- self.fcpResult = (
- msg.get('ExpectedMetadata.ContentType', ''),
- msg.get('ExpectedDataLength', '')
- )
- #else:
- # raise ValueError('Unhandled message: %s' % msg.name)
-
- def stop(self, msg):
- JobBase.stop(self, msg)
- if msg.name == Messages.DataFound:
+ def handleStop(self, flagError, msg):
+ JobBase.handleStop(self,flagError, msg)
+ if msg.name == Message.DataFound:
self.fcpResult = (
msg.get('Metadata.ContentType', ''),
msg.get('DataLength', '')
)
- else:
- raise ValueError('Unhandled message: %s' % msg.name)
+ elif msg.name == Message.GetFailed:
+ if msg['Code'] == FetchError.TooBig:
+ self.fcpResult = (False, msg)
+ self.fcpResult = (
+ msg.get('ExpectedMetadata.ContentType', ''),
+ msg.get('ExpectedDataLength', '')
+ )
@@ -648,7 +625,7 @@
raise ValueError('No such directory: %r' % directory)
message = Message(
- Messages.TestDDARequest,
+ Message.TestDDARequest,
Directory=directory,
WantReadDirectory=fcpBool(read),
WantWriteDirectory=fcpBool(write),
@@ -656,9 +633,7 @@
JobBase.__init__(self, fcpClient, directory, message)
self.fcpTmpFile = None
- def displayName(self):
- return 'TestDDA'
-
+
def handleTestDDAReply(self, msg):
fpathWrite = msg.params.get('WriteFilename', None)
fpathRead = msg.params.get('ReadFilename', None)
@@ -677,22 +652,16 @@
readContent = ''
self.fcpClient.sendMessage(
- Messages.TestDDAResponse,
+ Message.TestDDAResponse,
Directory=msg['Directory'],
ReadContent=readContent,
)
- def error(self, msg):
- JobBase.error(self, msg)
+ def handleStop(self, flagError, msg):
+ JobBase.handleStop(self, flagError, msg)
saveRemoveFile(self.fcpTmpFile)
self.fcpTmpFile = None
-
-
- def stop(self, msg):
- JobBase.stop(self, msg)
- saveRemoveFile(self.fcpTmpFile)
- self.fcpTmpFile = None
#**************************************************************************
# fcp client
@@ -706,8 +675,8 @@
ClientClose = 'Closing client'
- MessageSend = 'Message send'
- MessageReceived = 'Message received'
+ MessageSend = 'SendMessage'
+ MessageReceived = 'ReceivedMessage'
JobStart = 'Starting job: '
JobStop = 'Stopping job: '
@@ -733,7 +702,7 @@
"""
@param name: name of the client instance or '' (for debugging)
@param errorHandler: will be called if the socket conncetion to the node is dead
- with two params: FcpSocketError + details. When the handler is called the client
+ with two params: SocketError + details. When the handler is called the client
is already closed.
@param verbosity: verbosity level for debugging
@param logMessages: LogMessages class containing messages
@@ -747,7 +716,7 @@
'complete': [], # ???
}
self._errorHandler = errorHandler
- self._log = logging.getLogger(NameClient + ':' + name)
+ self._log = logging.getLogger(name)
self._logMessages = logMessages
self._lock = thread.allocate_lock()
self._socket = None
@@ -792,10 +761,15 @@
pass
else:
self._log.info(self._logMessages.Connected)
+
+ #NOTE: thought I could leave ClientHelloing up to the caller
+ # but instad of responding with ClientHelloMustBeFirst
+ # as expected the socket simply breaks. So take it over.
job = JobClientHello(self)
self.jobAdd(job, synchron=True)
- assert job.fcpError is None, 'Error should have been caught by handleMessage()'
- return job.fcpResult
+ error, msg = job.fcpResult
+ assert not error, 'Error should have been caught by handleMessage()'
+ return msg
self._log.info(self._logMessages.ConnectionRetry)
@@ -811,45 +785,45 @@
"""Handles the next message from the freenet node
@param msg: Message() to handle
"""
- self._log.debug(self._logMessages.MessageReceived + '\n' + msg.pprint())
+ self._log.debug(self._logMessages.MessageReceived + msg.pprint())
- if msg.name == Messages.NodeHello:
+ if msg.name == Message.NodeHello:
#connectionIdentifier = msg['ConnectionIdentifier']
- self.jobStop(JobIdentifiers.ClientHello, msg)
+ self.jobStop(FixedJobIdentifiers.ClientHello, msg)
- elif msg.name == Messages.ProtocolError:
+ elif msg.name == Message.ProtocolError:
code = msg['Code']
- #if code == ProtocolErrors.NoLateClientHellos:
- # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
- #elif code == ProtocolErrors.ClientHelloMustBeFirst:
- # self.jobStop(JobIdentifiers.ClientHello, msg, error=True)
+ #if code == ProtocolError.NoLateClientHellos:
+ # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True)
+ #elif code == ProtocolError.ClientHelloMustBeFirst:
+ # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True)
#else:
identifier = msg.get('Identifier', None)
if identifier is None:
#TODO: inform caller
- raise FcpProtocolError(msg)
+ raise ProtocolError(msg)
else:
- self.jobStop(identifier, msg, error=True)
+ self.jobStop(identifier, msg, flagError=True)
- elif msg.name == Messages.Peer:
- self.jobNotify(JobIdentifiers.ListPeers, 'handlePeer', msg)
+ elif msg.name == Message.Peer:
+ self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
- elif msg.name == Messages.EndListPeers:
- self.jobStop(IdentifierListPeers, msg)
+ elif msg.name == Message.EndListPeers:
+ self.jobStop(FixedJobIdentifiers.ListPeers, msg)
- elif msg.name == Messages.GetFailed:
- self.jobStop(msg['Identifier'], msg, error=True)
+ elif msg.name == Message.GetFailed:
+ self.jobStop(msg['Identifier'], msg, flagError=True)
- elif msg.name == Messages.SimpleProgress:
+ elif msg.name == Message.SimpleProgress:
self.jobNotify(msg['Identifier'], 'handleProgress', msg)
- elif msg.name == Messages.TestDDAReply:
+ elif msg.name == Message.TestDDAReply:
self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
- elif msg.name == Messages.TestDDAComplete:
+ elif msg.name == Message.TestDDAComplete:
self.jobStop(msg['Directory'], msg)
- elif msg.name == Messages.IdentifierCollision:
+ elif msg.name == Message.IdentifierCollision:
pass
@@ -867,10 +841,10 @@
finally:
self._lock.release()
- self._log.info(self._logMessages.JobStart + job.displayName())
- job.start()
+ self._log.info(self._logMessages.JobStart + job.fcpMessage.name)
+ job.handleStart()
if synchron:
- while not job.fcpStopped:
+ while job.fcpResult is None:
self.next()
@@ -892,11 +866,11 @@
#TODO: quite unclear when to remove a job
- def jobStop(self, identifier, msg, error=False):
+ def jobStop(self, identifier, msg, flagError=False):
"""Stops a job
@param identifier: identifier of the job to stop
@param msg: Message() to pass to the job as result
- @param error: set to True to indicate unsuccessful completion of the job, True otherwisse
+ @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse
"""
self._lock.acquire(True)
try:
@@ -912,11 +886,8 @@
if job is None:
raise ValueError('No such job: %r' % identifier)
- self._log.info(self._logMessages.JobStop + job.displayName())
- if error:
- job.error(msg)
- else:
- job.stop(msg)
+ self._log.info(self._logMessages.JobStop + job.fcpMessage.name)
+ job.handleStop(flagError, msg)
#TODO: some info when all jobs are completed
@@ -931,7 +902,7 @@
def readMessage(self):
"""Reads the next message directly from the socket and dispatches it
@return: (Message) the next message read from the socket
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ @raise SocketError: if the socket connection to the node dies unexpectedly
If an error handler is passed to the client it is called emidiately before the error
is raised.
"""
@@ -949,8 +920,8 @@
self._log.info(self._logMessages.SocketDead)
self.close()
if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d) #!!
+ self._errorHandler(SocketError, d)
+ raise SocketError(d) #!!
if p == '\r': # ignore
continue
@@ -975,8 +946,8 @@
self._log.info(self._logMessages.SocketDead)
self.close()
if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d) #!!
+ self._errorHandler(SocketError, d)
+ raise SocketError(d) #!!
else:
head, sep, tail = line.partition('=')
@@ -1019,7 +990,7 @@
@param data: data to atatch to the message
@param params: {para-name: param-calue, ...} of parameters to pass along
with the message (see freenet protocol)
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly
+ @raise SocketError: if the socket connection to the node dies unexpectedly
If an error handler is passed to the client it is called emidiately before the error
is raised.
"""
@@ -1030,20 +1001,19 @@
"""Sends a message to freenet
@param msg: (Message) message to send
@return: Message
- @raise FcpSocketError: if the socket connection to the node dies unexpectedly.
+ @raise SocketError: if the socket connection to the node dies unexpectedly.
If an error handler is passed to the client it is called emidiately before the error
is raised.
"""
- rawMsg = msg.toString()
- self._log.debug(self._logMessages.MessageSend + '\n' + msg.pprint())
+ self._log.debug(self._logMessages.MessageSend + msg.pprint())
try:
- self._socket.sendall(rawMsg)
- except Exception, d:
+ self._socket.sendall(msg.toString())
+ except socket.error, d:
self._log.info(self._logMessages.SocketDead)
self.close()
if self._errorHandler is not None:
- self._errorHandler(FcpSocketError, d)
- raise FcpSocketError(d)
+ self._errorHandler(SocketError, d)
+ raise SocketError(d)
return msg
@@ -1061,7 +1031,8 @@
#*****************************************************************************
if __name__ == '__main__':
c = FcpClient(name='test', verbosity=logging.DEBUG)
- if c.connect():
+ nodeHello = c.connect()
+ if nodeHello is not None:
def foo():
job1 = JobClientHello(c)
c.jobAdd(job1)
@@ -1070,8 +1041,8 @@
print '---------------------------'
print job1.fcpError
print job1.fcpResult
- print job1.fcpTime
print '---------------------------'
+ # should raise
#foo()
@@ -1081,18 +1052,25 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpError
print job2.fcpResult
- print job2.fcpTime
print '---------------------------'
-
+ #foo()
+
def foo():
+ job2 = JobListPeers(c)
+ c.jobAdd(job2)
+ c.run()
+ print '---------------------------'
+ print job2.fcpResult
+ print '---------------------------'
+ #foo()
+
+
+ def foo():
job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpError
print job2.fcpResult
- print job2.fcpTime
print '---------------------------'
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-22 09:02:02
|
Revision: 10
http://fclient.svn.sourceforge.net/fclient/?rev=10&view=rev
Author: jUrner
Date: 2007-10-22 02:02:04 -0700 (Mon, 22 Oct 2007)
Log Message:
-----------
continued working on protocol implementation
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-20 13:37:51 UTC (rev 9)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10)
@@ -16,10 +16,10 @@
#**************************************************************
NameClient = 'Fcp20Client'
DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
-DefaultFcpPort = 9481
try:
DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
-except: pass
+except ValueError:
+ DefaultFcpPort = 9481
SocketTimeout = 0.1
class FixedJobIdentifiers:
@@ -33,13 +33,13 @@
class Priorities:
"""All priorities supported by the client"""
- Maximum = 0
- Interactive = 1
- SemiInteractive = 2
- Updatable = 3
- Bulk = 4
- Prefetch = 5
- Minimum = 6
+ Maximum = '0'
+ Interactive = '1'
+ SemiInteractive = '2'
+ Updatable = '3'
+ Bulk = '4'
+ Prefetch = '5'
+ Minimum = '6'
PriorityMin = Minimum
PriorityDefault = Bulk
@@ -471,42 +471,40 @@
class JobBase(object):
"""Base class for jobs"""
- _fcp_auto_remove_ = True
-
+
def __init__(self, fcpClient, identifier, message):
"""
@param fcpClient: FcpClient() instance
@param identifier: (str) identifier of the job
@param message: (Message) to send to the node whne the job ist started
- @ivar fcpClient: FcpClient() instance
- @ivar fcpError: holding the error message if an error was encountered while running
- the job, None otherwise
- @ivar fcpIdentifier: identifier of the job
- @ivar fcpMessage: initial message send to the node
- @ivar fcpResult: if no error was encountered, holding the result of the job when complete
- @ivar fcpTimeStart: time the job was started
- @ivar fcpTimeStop: time the job was stopped
+ @ivar jobClient: FcpClient() instance of the job
+ @ivar jobIdentifier: identifier of the job
+ @ivar jobMessage: message to be send to the node
+ @ivar jobResult: if no error was encountered, holding the result of the job when complete
+ @ivar jobTimeStart: time the job was started
+ @ivar jobTimeStop: time the job was stopped
"""
- self.fcpClient = fcpClient # FcpClient() instance the job belongs to
- self.fcpIdentifier = identifier # job identifier
- self.fcpMessage = message # message send to node
- self.fcpResult = None # job result (bool error, Message msg)
- self.fcpTimeStart = 0 # time the job was started
- self.fcpTimeStop = 0 # time the job was stopped
+ self.jobClient = fcpClient
+ self.jobIdentifier = identifier
+ self.jobMessage = message
+ self.jobResult = None
+ self.jobTimeStart = 0
+ self.jobTimeStop = 0
+
def handleStart(self):
"""Starts the job"""
- self.fcpResult = None
- self.fcpTimeStart = time.time()
- self.fcpClient.sendMessageEx(self.fcpMessage)
+ self.jobResult = None
+ self.jobTimeStart = time.time()
+ self.jobClient.sendMessageEx(self.jobMessage)
def handleStop(self, flagError, msg):
"""Called on job completion to stop the job
@param flagError: True if an error was encountered, False otherwise
@param msg: (Message) to pass to the job
"""
- self.fcpTimeStop = time.time()
- self.fcpResult = (flagError, msg)
+ self.jobTimeStop = time.time()
+ self.jobResult = (flagError, msg)
class JobClientHello(JobBase):
@@ -516,8 +514,7 @@
goes well, you will get a NodeHello in response.
"""
- _fcp_auto_remove_ = True
-
+
def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
"""
@param name: (str) connection name or None, to use an arbitrary name
@@ -538,8 +535,7 @@
"""Lists all known peers of the node
"""
- _fcp_auto_remove_ = True
-
+
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
message = Message(
Message.ListPeers,
@@ -561,13 +557,10 @@
On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
Note, that both members may be '' (empty string)
-
"""
- _fcp_auto_remove_ = False
-
-
- # idea is to provoke a GetFailed message and take mimetype and size from it
+
+ # idea is to provoke a GetFailed message and take mimetype and size from 'GetFailed'
def __init__(self, fcpClient, uri, **params):
"""
@param fcpClient: FcpClient() instance
@@ -584,7 +577,9 @@
Message.ClientGet,
Identifier=identifier,
URI=uri,
- MaxSize='0',
+ # suggested by Mathew Toseland to use about 32k for mimeType requests
+ # basic sizes of keys are: 1k for SSks and 32k for CHKs
+ MaxSize='32000',
ReturnType='none',
Verbosity='1',
**params
@@ -592,34 +587,77 @@
JobBase.__init__(self, fcpClient, identifier, message)
+ def getPrority(self):
+ return self.jobMessage.get('PriorityClass', Priorities.PriorityDefault)
+
+
+ def setPriority(self, priority):
+ if not priority in Priorities:
+ raise ValueError('Invalid priority: %r' % priority)
+ self.jobClient.sendMessage(
+ Message.ModifyPersistentRequest,
+ Identifier=self.jobIdentifier,
+ Global=fcpBool(False),
+ PriorityClass=priority,
+ )
+ # not shure if the response arrives in any case, so set it here
+ self.jobMessage['PriorityClass'] = priority
+
+
+ def stopRequest(self):
+ self.jobClient.sendMessage(
+ Message.RemovePersistentRequest,
+ Global=fcpBool(False),
+ Identifier=self.jobIdentifier,
+ )
+
+
+ def handlePriorityChanged(self):
+ self.jobMessage['PriorityClass'] = priority
+
def handleProgress(self, msg):
"""Handles the next progress made. Overwrite to process.
"""
+
+ def handleRequestRemoved(self, msg):
+ if self.jobClient.jobIsRunning(self.jobIdentifier):
+ self.jobClient.jobStop(self.jobIdentifier)
+
def handleStop(self, flagError, msg):
+ """called when a job is finished
+ @note: jobResult may contain an 'IdentifierCollision' message. Make shure
+ to handle this appropriately
+ """
JobBase.handleStop(self,flagError, msg)
if msg.name == Message.DataFound:
- self.fcpResult = (
+ self.jobResult = (
msg.get('Metadata.ContentType', ''),
msg.get('DataLength', '')
)
elif msg.name == Message.GetFailed:
if msg['Code'] == FetchError.TooBig:
- self.fcpResult = (False, msg)
- self.fcpResult = (
+ self.jobResult = (False, msg)
+ self.jobResult = (
msg.get('ExpectedMetadata.ContentType', ''),
msg.get('ExpectedDataLength', '')
)
-
+ # not shure when removing the request is actually required
+ # so send it anyway
+ self.jobClient.sendMessage(
+ Message.RemovePersistentRequest,
+ Global=fcpBool(False),
+ Identifier=self.jobIdentifier,
+ )
+
-
+
#TODO: handle case where directories are registered multiple times
class JobTestDDA(JobBase):
"""Tests a directory for read / write accesss
"""
- _fcp_auto_remove_ = False
-
+
def __init__(self, fcpClient, directory, read=True, write=True):
if not os.path.isdir(directory):
raise ValueError('No such directory: %r' % directory)
@@ -631,7 +669,7 @@
WantWriteDirectory=fcpBool(write),
)
JobBase.__init__(self, fcpClient, directory, message)
- self.fcpTmpFile = None
+ self.jobTmpFile = None
def handleTestDDAReply(self, msg):
@@ -644,14 +682,14 @@
if os.path.isfile(fpathWrite):
os.remove(fpathWrite)
else:
- self.fcpTmpFile = fpathWrite
+ self.jobTmpFile = fpathWrite
if fpathRead is not None:
readContent = saveReadFile(fpathRead)
if readContent is None:
readContent = ''
- self.fcpClient.sendMessage(
+ self.jobClient.sendMessage(
Message.TestDDAResponse,
Directory=msg['Directory'],
ReadContent=readContent,
@@ -660,8 +698,8 @@
def handleStop(self, flagError, msg):
JobBase.handleStop(self, flagError, msg)
- saveRemoveFile(self.fcpTmpFile)
- self.fcpTmpFile = None
+ saveRemoveFile(self.jobTmpFile)
+ self.jobTmpFile = None
#**************************************************************************
# fcp client
@@ -682,6 +720,7 @@
JobStop = 'Stopping job: '
JobsCompleted = 'All jobs completed'
+
KeyboardInterrupt = 'Keyboard interrupt'
SocketDead = 'Socket is dead'
@@ -710,10 +749,10 @@
self._isConnected = False
self._jobs = {
- 'all': {},
- 'pending': [], # ???
- 'running': [],
- 'complete': [], # ???
+ #TODO: check if JobList is still required
+ 'JobMapping': {},
+ 'JobList': [],
+ 'RegisteredDirectories': [],
}
self._errorHandler = errorHandler
self._log = logging.getLogger(name)
@@ -734,7 +773,7 @@
self._socket = None
- #TODO: an iterator would be nice for Guis, to enshure they stay responsitive in the call
+ #TODO: an iterator would be nice to enshure Guis stay responsitive in the call
def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
"""Establishes the connection to a freenet node
@param host: (str) host of th node
@@ -767,7 +806,7 @@
# as expected the socket simply breaks. So take it over.
job = JobClientHello(self)
self.jobAdd(job, synchron=True)
- error, msg = job.fcpResult
+ error, msg = job.jobResult
assert not error, 'Error should have been caught by handleMessage()'
return msg
@@ -782,9 +821,13 @@
def handleMessage(self, msg):
- """Handles the next message from the freenet node
- @param msg: Message() to handle
+ """Handles a message from the freenet node
+ @param msg: (Message) to handle
+ @return: True if the message was handled, False otherwise
"""
+
+
+ #if msg.name != 'USocketTimeOut':
self._log.debug(self._logMessages.MessageReceived + msg.pprint())
if msg.name == Message.NodeHello:
@@ -803,30 +846,57 @@
#TODO: inform caller
raise ProtocolError(msg)
else:
- self.jobStop(identifier, msg, flagError=True)
+ return self.jobStop(identifier, msg, flagError=True)
- elif msg.name == Message.Peer:
- self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
+ if msg.name == Message.DataFound:
+ return self.jobStop(msg['Identifier'], msg, flagError=True)
elif msg.name == Message.EndListPeers:
- self.jobStop(FixedJobIdentifiers.ListPeers, msg)
+ return self.jobStop(FixedJobIdentifiers.ListPeers, msg)
elif msg.name == Message.GetFailed:
- self.jobStop(msg['Identifier'], msg, flagError=True)
+ return self.jobStop(msg['Identifier'], msg, flagError=True)
+ elif msg.name == Message.PersistentRequestRemoved:
+ return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg)
+
elif msg.name == Message.SimpleProgress:
- self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+ return self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+ elif msg.name == Message.TestDDAComplete:
+ return self.jobStop(msg['Directory'], msg)
+
elif msg.name == Message.TestDDAReply:
- self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
+ return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
- elif msg.name == Message.TestDDAComplete:
- self.jobStop(msg['Directory'], msg)
+ elif msg.name == Message.IdentifierCollision:
+ # identifier is unique to us, but for some reason not unique to the node
+ #TODO: maybe for future implementations find a way to automise
+ # reinsertion and a notification via a handleJobChanged() for the
+ # caller to overwrite
+ return self.jobStop(msg['Identifier'], msg, flagError=True)
- elif msg.name == Message.IdentifierCollision:
- pass
+ elif msg.name == Message.Peer:
+ return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
+
+ return False
+ #########################################################
+ ## jobs
+ #########################################################
+ def hasJobsRunning(self):
+ """Checks if the client has running jobs
+ @return: (bool) True if so, False otherwise
+ """
+ self._lock.acquire(True)
+ try:
+ result = bool(self._jobs['JobList'])
+ finally:
+ self._lock.release()
+ return result
+
+
def jobAdd(self, job, synchron=False):
"""Adds a job to the client
@param job: (Job*) job to add
@@ -834,35 +904,62 @@
"""
self._lock.acquire(True)
try:
- if job.fcpIdentifier in self._jobs['all']:
- raise ValueError('Duplicate job: %r' % job.identifier)
- self._jobs['all'][job.fcpIdentifier] = job
- self._jobs['running'].append(job)
+ if job.jobIdentifier in self._jobs['JobMapping']:
+ raise ValueError('Duplicate job: %r' % job.jobIdentifier)
+ self._jobs['JobMapping'][job.jobIdentifier] = job
+ self._jobs['JobList'].append(job)
finally:
self._lock.release()
- self._log.info(self._logMessages.JobStart + job.fcpMessage.name)
+ self._log.info(self._logMessages.JobStart + job.jobMessage.name)
job.handleStart()
if synchron:
- while job.fcpResult is None:
+ while job.jobResult is None:
self.next()
+ def jobGet(self, identifier):
+ """Returns a job given its identifier
+ @param identifier: identifier of the job
+ @return: (Job*) instance or None, if no corrosponding job was found
+ """
+ self._lock.acquire(True)
+ try:
+ result = self._jobs['JobMapping'].get(identifier, None)
+ finally:
+ self._lock.release()
+ return result
+
+ def jobIsRunning(self, identifier):
+ """Checks if a job is running
+ @param identifier: identifier of the job
+ @return: True if so, False otherwise
+ """
+ self._lock.acquire(True)
+ try:
+ result = identifier in self._jobs['JobMapping']
+ finally:
+ self._lock.release()
+ return result
+
+
def jobNotify(self, identifier, handler, msg):
"""Notifies a job about an event while it is running
@param identifier: identifier of the job to notify
@param handler: (str) method of the job to call to handle the notification
@param msg: Message() to pass to the job
+ @return: True if a job was found to be notified, False otherwise
"""
self._lock.acquire(True)
try:
- job = self._jobs['all'].get(identifier, None)
+ job = self._jobs['JobMapping'].get(identifier, None)
finally:
self._lock.release()
if job is None:
- raise ValueError('No such job: %r' % identifier)
+ return False
getattr(job, handler)(msg)
+ return True
#TODO: quite unclear when to remove a job
@@ -871,25 +968,23 @@
@param identifier: identifier of the job to stop
@param msg: Message() to pass to the job as result
@param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse
+ @return: True if a job was found to be stopped, False otherwise
"""
self._lock.acquire(True)
try:
- job = self._jobs['all'].get(identifier, None)
+ job = self._jobs['JobMapping'].get(identifier, None)
if job is not None:
- self._jobs['running'].remove(job)
- if job._fcp_auto_remove_:
- del self._jobs['all'][identifier]
- else:
- self._jobs['complete'].append(job)
+ self._jobs['JobList'].remove(job)
+ del self._jobs['JobMapping'][identifier]
finally:
self._lock.release()
-
if job is None:
- raise ValueError('No such job: %r' % identifier)
- self._log.info(self._logMessages.JobStop + job.fcpMessage.name)
+ return False
+ self._log.info(self._logMessages.JobStop + job.jobMessage.name)
job.handleStop(flagError, msg)
+ return True
+
-
#TODO: some info when all jobs are completed
def next(self):
"""Pumps the next message waiting
@@ -963,21 +1058,8 @@
"""Runs the client untill all jobs passed to it are completed
@note: use KeyboardInterrupt to stop prematurely
"""
-
- # TODO:
- # x. push pending jobs
try:
- while True:
- if not self._lock.acquire(False):
- continue
-
- try:
- if not self._jobs['pending'] and not self._jobs['running']:
- self._log.info(self._logMessages.JobsCompleted)
- break
- finally:
- self._lock.release()
-
+ while self.hasJobsRunning():
self.next()
except KeyboardInterrupt:
self._log(self._logMessages.KeyboardInterrupt)
@@ -1026,6 +1108,16 @@
""""""
self._log.setLevel(verbosity)
+
+ ########################################################
+ ##
+ ########################################################
+ def getFileInfo(self, job):
+ pass
+
+ def getFile(self, job):
+ pass
+
#*****************************************************************************
#
#*****************************************************************************
@@ -1039,8 +1131,7 @@
c.run()
print '---------------------------'
- print job1.fcpError
- print job1.fcpResult
+ print job1.jobResult
print '---------------------------'
# should raise
#foo()
@@ -1052,7 +1143,7 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpResult
+ print job2.jobResult
print '---------------------------'
#foo()
@@ -1061,7 +1152,7 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpResult
+ print job2.jobResult
print '---------------------------'
#foo()
@@ -1071,6 +1162,6 @@
c.jobAdd(job2)
c.run()
print '---------------------------'
- print job2.fcpResult
+ print job2.jobResult
print '---------------------------'
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-23 09:19:10
|
Revision: 11
http://fclient.svn.sourceforge.net/fclient/?rev=11&view=rev
Author: jUrner
Date: 2007-10-23 02:19:12 -0700 (Tue, 23 Oct 2007)
Log Message:
-----------
combed over message dispatch
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-22 09:02:04 UTC (rev 10)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11)
@@ -14,7 +14,6 @@
#**************************************************************
# consts
#**************************************************************
-NameClient = 'Fcp20Client'
DefaultFcpHost = os.environ.get('FCP_HOST', '127.0.0.1').strip()
try:
DefaultFcpPort = int(os.environ.get('FCP_PORT', '').strip())
@@ -28,6 +27,13 @@
"""
ClientHello = 'ClientHello'
ListPeers = 'ListPeers'
+ ListPeerNotes = 'ListPeerNotes'
+ GetNode = 'GetNode'
+ GetConfig = 'GetConfig'
+ ModifyConfig = 'ModifyConfig'
+ WatchGlobal = 'WatchGlobal'
+ Shutdown = 'Shutdown'
+
class Priorities:
@@ -492,6 +498,10 @@
self.jobTimeStop = 0
+ def handleMessage(self, msg):
+ return False
+
+
def handleStart(self):
"""Starts the job"""
self.jobResult = None
@@ -514,7 +524,6 @@
goes well, you will get a NodeHello in response.
"""
-
def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
"""
@param name: (str) connection name or None, to use an arbitrary name
@@ -529,13 +538,36 @@
def displayName(self):
return 'ClientHello'
+
+
+ def handleMessage(self, msg):
+ if msg.name == Message.NodeHello:
+ return self.handleNodeHello(msg)
+ elif msg.name == Message.ProtocolError:
+ return self.handleProtocolError(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+ def handleNodeHello(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = msg
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
+ def handleProtocolError(self, msg):
+ raise ProtocolError(msg)
+
+
+
+
+
class JobListPeers(JobBase):
"""Lists all known peers of the node
"""
-
-
+
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
message = Message(
Message.ListPeers,
@@ -545,13 +577,26 @@
JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
- def handlePeer(self,msg):
- """Handles the next peer send by the node in form of a 'Peer' message
- while the job is running. Overwrite to process.
- """
+ def handleMessage(self,msg):
+ if msg.name == Message.EndListPeers:
+ return self.handleEndListPeers(msg)
+ elif msg.name == Message.Peer:
+ return self.handlePeer(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+ def handlePeer(self, msg):
+ return True
+
+ def handleEndListPeers(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = msg
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
-
+#TODO: identifier collisions are not yet handled
class JobGetFileInfo(JobBase):
"""Tries to retieve information about a file. If everything goes well
@@ -571,6 +616,11 @@
MaxRetries=-1 ...N
PriorityClass=Priority*
+ @ivar jobResult: will be a tuple(bool error, data). If error is True, no information could be
+ retrieved and data will be a GetFailed message containing details. If error is False
+ data will be a tuple(str metadataContentType, str size). Note that both may be empty
+ string and size may not be accurate.
+
"""
identifier = newIdentifier()
message = Message(
@@ -612,46 +662,73 @@
)
- def handlePriorityChanged(self):
- self.jobMessage['PriorityClass'] = priority
+ def handleMessage(self, msg):
+ if msg.name == Message.DataFound:
+ return self.handleDataFound(msg)
+ elif msg.name == Message.GetFailed:
+ return self.handleGetFailed(msg)
+ elif msg.name == Message.IdentifierCollision:
+ return self.handleIdentifierCollision(msg)
+ elif msg.name == Message.PersistentRequestModified:
+ return self.handlePersistentRequestModified(msg)
+ elif msg.name == Message.PersistentRequestRemoved:
+ return self.handlePersistentRequestRemoved(msg)
+ elif msg.name == Message.SimpleProgress:
+ return self.handleSimpleProgress(msg)
+
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
- def handleProgress(self, msg):
- """Handles the next progress made. Overwrite to process.
- """
+ def handleDataFound(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = (
+ False,
+ (
+ msg.get('Metadata.ContentType', ''),
+ msg.get('DataLength', '')
+ )
+ )
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
- def handleRequestRemoved(self, msg):
+ def handleGetFailed(self, msg):
+ self.jobTimeStop = time.time()
+ if msg['Code'] == FetchError.TooBig:
+ self.jobResult = (False, msg)
+ self.jobResult = (
+ False,
+ (
+ msg.get('ExpectedMetadata.ContentType', ''),
+ msg.get('ExpectedDataLength', '')
+ )
+ )
+ else:
+ self.jobResult = (True, msg)
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
+ def handleIdentifierCollision(self, msg):
+ raise
+
+
+ def handleSimpleProgress(self, msg):
+ return True
+
+
+ def handlePersistentRequestModified(self, msg):
+ priorityClass = msg.get('PriorityClass', None)
+ if priorityClass is not None:
+ self.jobMessage['PriorityClass'] = priorityClass
+ return True
+
+ def handlePersistentRequestRemoved(self, msg):
if self.jobClient.jobIsRunning(self.jobIdentifier):
self.jobClient.jobStop(self.jobIdentifier)
+ return True
+
-
- def handleStop(self, flagError, msg):
- """called when a job is finished
- @note: jobResult may contain an 'IdentifierCollision' message. Make shure
- to handle this appropriately
- """
- JobBase.handleStop(self,flagError, msg)
- if msg.name == Message.DataFound:
- self.jobResult = (
- msg.get('Metadata.ContentType', ''),
- msg.get('DataLength', '')
- )
- elif msg.name == Message.GetFailed:
- if msg['Code'] == FetchError.TooBig:
- self.jobResult = (False, msg)
- self.jobResult = (
- msg.get('ExpectedMetadata.ContentType', ''),
- msg.get('ExpectedDataLength', '')
- )
- # not shure when removing the request is actually required
- # so send it anyway
- self.jobClient.sendMessage(
- Message.RemovePersistentRequest,
- Global=fcpBool(False),
- Identifier=self.jobIdentifier,
- )
-
-
-
#TODO: handle case where directories are registered multiple times
class JobTestDDA(JobBase):
"""Tests a directory for read / write accesss
@@ -672,6 +749,16 @@
self.jobTmpFile = None
+
+ def handleMessage(self, msg):
+ if msg.name == Message.TestDDAReply:
+ return self.handleTestDDAReply(msg)
+ elif msg.name == Message.TestDDAComplete:
+ return self.handleTestDDAComplete(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+
def handleTestDDAReply(self, msg):
fpathWrite = msg.params.get('WriteFilename', None)
fpathRead = msg.params.get('ReadFilename', None)
@@ -693,13 +780,18 @@
Message.TestDDAResponse,
Directory=msg['Directory'],
ReadContent=readContent,
- )
+ )
+ return True
+
-
- def handleStop(self, flagError, msg):
- JobBase.handleStop(self, flagError, msg)
+ def handleTestDDAComplete(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = msg
saveRemoveFile(self.jobTmpFile)
self.jobTmpFile = None
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
#**************************************************************************
# fcp client
@@ -728,6 +820,10 @@
#TODO: no idea what happens on reconnect if socket died. What about running jobs?
#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
#TODO: no idea if to add support for pending jobs and queue management here
+
+#TODO: do not mix directories as identifiers with identifiers (might lead to collisions)
+#TODO: how to handle (ProtocolError code 18: Shutting down)?
+
class FcpClient(object):
"""Fcp client implementation"""
@@ -757,7 +853,7 @@
self._errorHandler = errorHandler
self._log = logging.getLogger(name)
self._logMessages = logMessages
- self._lock = thread.allocate_lock()
+ self._lock = thread.allocate_lock() # lovk when resources are accessed
self._socket = None
self.setVerbosity(verbosity)
@@ -806,9 +902,8 @@
# as expected the socket simply breaks. So take it over.
job = JobClientHello(self)
self.jobAdd(job, synchron=True)
- error, msg = job.jobResult
- assert not error, 'Error should have been caught by handleMessage()'
- return msg
+ assert job.jobResult is not None, 'ClientHello is not working as expected'
+ return job.jobResult
self._log.info(self._logMessages.ConnectionRetry)
@@ -827,59 +922,66 @@
"""
- #if msg.name != 'USocketTimeOut':
+ if msg.name == 'USocketTimeOut':
+ return True
+
self._log.debug(self._logMessages.MessageReceived + msg.pprint())
- if msg.name == Message.NodeHello:
- #connectionIdentifier = msg['ConnectionIdentifier']
- self.jobStop(FixedJobIdentifiers.ClientHello, msg)
-
- elif msg.name == Message.ProtocolError:
+
+ if msg.name == Message.ProtocolError:
code = msg['Code']
- #if code == ProtocolError.NoLateClientHellos:
- # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True)
- #elif code == ProtocolError.ClientHelloMustBeFirst:
- # self.jobStop(FixedJobIdentifiers.ClientHello, msg, error=True)
- #else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- #TODO: inform caller
- raise ProtocolError(msg)
+ if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
+
+ elif code == ProtocolError.Shutdown:
+ if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg):
+
+ # ########################################
+ #TODO: ???
+
+ return True
+
else:
- return self.jobStop(identifier, msg, flagError=True)
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ #TODO: inform caller
+ raise ProtocolError(msg)
+ else:
+ return self.jobDispatchMessage(identifier, msg)
+
+ else:
+
+ # check if the is something like an identifier in the message
+ if msg.name == Message.TestDDAReply:
+ identifier = msg['Directory']
+ elif msg.name == Message.TestDDAComplete:
+ identifier = msg['Directory']
+ else:
+ identifier = msg.get('Identifier', None)
- if msg.name == Message.DataFound:
- return self.jobStop(msg['Identifier'], msg, flagError=True)
-
- elif msg.name == Message.EndListPeers:
- return self.jobStop(FixedJobIdentifiers.ListPeers, msg)
-
- elif msg.name == Message.GetFailed:
- return self.jobStop(msg['Identifier'], msg, flagError=True)
+ # dispatch to jobs with fixed identifiers
+ if identifier is None:
+
+ if msg.name == Message.NodeHello:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
- elif msg.name == Message.PersistentRequestRemoved:
- return self.jobNotify(FixedJobIdentifiers.ListPeers, 'RequestRemoved', msg)
-
- elif msg.name == Message.SimpleProgress:
- return self.jobNotify(msg['Identifier'], 'handleProgress', msg)
+ elif msg.name == Message.EndListPeers:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
- elif msg.name == Message.TestDDAComplete:
- return self.jobStop(msg['Directory'], msg)
+ elif msg.name == Message.Peer:
+ return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
- elif msg.name == Message.TestDDAReply:
- return self.jobNotify(msg['Directory'], 'handleTestDDAReply', msg)
+ # more here.....
+
+ else:
+ raise ValueError('Unhandled message: ' + msg.name)
+
+ else:
+ return self.jobDispatchMessage(identifier, msg)
+
+ raise RuntimeError('Should not have endet here: %s' % msg.name)
- elif msg.name == Message.IdentifierCollision:
- # identifier is unique to us, but for some reason not unique to the node
- #TODO: maybe for future implementations find a way to automise
- # reinsertion and a notification via a handleJobChanged() for the
- # caller to overwrite
- return self.jobStop(msg['Identifier'], msg, flagError=True)
-
- elif msg.name == Message.Peer:
- return self.jobNotify(FixedJobIdentifiers.ListPeers, 'handlePeer', msg)
-
- return False
+
#########################################################
@@ -905,6 +1007,9 @@
self._lock.acquire(True)
try:
if job.jobIdentifier in self._jobs['JobMapping']:
+ raise ValueError('Job with that identifier already present')
+
+ if job.jobIdentifier in self._jobs['JobMapping']:
raise ValueError('Duplicate job: %r' % job.jobIdentifier)
self._jobs['JobMapping'][job.jobIdentifier] = job
self._jobs['JobList'].append(job)
@@ -916,7 +1021,19 @@
if synchron:
while job.jobResult is None:
self.next()
+
+ def jobDispatchMessage(self, identifier, msg):
+ """Dispatches a message to a job
+ @param identifier: identifier of the job
+ @param msg: (Message) message to dispatch
+ @return: True if the message was handled, False otherwise
+ """
+ job = self.jobGet(identifier)
+ if job is not None:
+ return job.handleMessage(msg)
+ return False
+
def jobGet(self, identifier):
"""Returns a job given its identifier
@@ -943,36 +1060,15 @@
self._lock.release()
return result
-
- def jobNotify(self, identifier, handler, msg):
- """Notifies a job about an event while it is running
- @param identifier: identifier of the job to notify
- @param handler: (str) method of the job to call to handle the notification
- @param msg: Message() to pass to the job
- @return: True if a job was found to be notified, False otherwise
+
+ def jobRemove(self, identifier):
+ """Removes a job unconditionally
+ @param identifier: identifier of the job to remove
+ @return: True if the job was found, False otherwise
"""
self._lock.acquire(True)
try:
job = self._jobs['JobMapping'].get(identifier, None)
- finally:
- self._lock.release()
- if job is None:
- return False
- getattr(job, handler)(msg)
- return True
-
-
- #TODO: quite unclear when to remove a job
- def jobStop(self, identifier, msg, flagError=False):
- """Stops a job
- @param identifier: identifier of the job to stop
- @param msg: Message() to pass to the job as result
- @param flagError: set to True to indicate unsuccessful completion of the job, True otherwisse
- @return: True if a job was found to be stopped, False otherwise
- """
- self._lock.acquire(True)
- try:
- job = self._jobs['JobMapping'].get(identifier, None)
if job is not None:
self._jobs['JobList'].remove(job)
del self._jobs['JobMapping'][identifier]
@@ -981,10 +1077,9 @@
if job is None:
return False
self._log.info(self._logMessages.JobStop + job.jobMessage.name)
- job.handleStop(flagError, msg)
return True
-
+
#TODO: some info when all jobs are completed
def next(self):
"""Pumps the next message waiting
@@ -1059,8 +1154,13 @@
@note: use KeyboardInterrupt to stop prematurely
"""
try:
+ #n = 0
+
while self.hasJobsRunning():
+ #n += 1
+ #if n > 40: break
self.next()
+
except KeyboardInterrupt:
self._log(self._logMessages.KeyboardInterrupt)
self.close()
@@ -1117,7 +1217,9 @@
def getFile(self, job):
pass
+
+
#*****************************************************************************
#
#*****************************************************************************
@@ -1158,10 +1260,22 @@
def foo():
- job2 = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
- c.jobAdd(job2)
+ job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip')
+
+
+ #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
+ #job.jobIdentifier = job.jobMessage['Identifier'] = 1
+ #job.jobMessage['Identifier'] = 1
+ #job.jobIdentifier = 1
+ c.jobAdd(job)
+
+ #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%201%281%29.jpg')
+ #job.jobMessage['Identifier'] = 1
+ #job.jobIdentifier = 1
+ #c.jobAdd(job)
+
c.run()
print '---------------------------'
- print job2.jobResult
+ print job.jobResult
print '---------------------------'
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-25 14:04:41
|
Revision: 12
http://fclient.svn.sourceforge.net/fclient/?rev=12&view=rev
Author: jUrner
Date: 2007-10-25 05:59:16 -0700 (Thu, 25 Oct 2007)
Log Message:
-----------
bit more work on protocol
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-23 09:19:12 UTC (rev 11)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-25 12:59:16 UTC (rev 12)
@@ -1,5 +1,16 @@
+'''Freenet client protocol 2.0 implementation
+'''
+#NOTE:
+#
+# downloading data to disk is not supported st the moment. TestDDA code is quite unwritable
+# and as far as I can see there are plans to get rid of it. So wait...
+#
+#
+
+
import atexit
+import base64
import logging
import os
import re
@@ -187,7 +198,7 @@
"""Returns a new unique identifier
@return: (str) uuid
"""
- return str(uuid.uuid4())
+ return 'fclient::' + str(uuid.uuid4())
def pythonBool(fcpBool):
@@ -251,7 +262,7 @@
@param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat')
@return: (string) whatever freenet returns
"""
- #TODO: on windows it may be necessary to hide the comand window
+ #TODO: on windows it may be necessary to hide the command window
p = subprocess.Popen(
args=cmdline,
shell=True,
@@ -368,10 +379,10 @@
ModifyPeerNote = 'ModifyPeerNote'
RemovePeer = 'RemovePeer'
GetNode = 'GetNode'
- GetConfig = 'GetConfig' # (since 1027)
+ GetConfig = 'GetConfig' # (since 1027)
ModifyConfig = 'ModifyConfig' # (since 1027)
- TestDDARequest = 'TestDDARequest' # (since 1027)
- TestDDAResponse = 'TestDDAResponse' # (since 1027)
+ TestDDARequest = 'TestDDARequest' # (since 1027)
+ TestDDAResponse = 'TestDDAResponse' # (since 1027)
GenerateSSK = 'GenerateSSK'
ClientPut = 'ClientPut'
ClientPutDiskDir = 'ClientPutDiskDir'
@@ -433,7 +444,7 @@
self.params = params
def toString(self):
- """Returns a string with the formated message, ready to be send"""
+ """Returns the message as formated string ready to be send"""
# TODO: "Data" not yet implemented
out = [self.name, ]
for param, value in self.params.items():
@@ -447,7 +458,6 @@
for param, value in self.params.items():
out.append('>> %s=%s' % (param, value))
out.append('>>EndMessage')
- out.append('')
return '\n'.join(out)
def __getitem__(self, name):
@@ -508,6 +518,8 @@
self.jobTimeStart = time.time()
self.jobClient.sendMessageEx(self.jobMessage)
+
+ # XXX
def handleStop(self, flagError, msg):
"""Called on job completion to stop the job
@param flagError: True if an error was encountered, False otherwise
@@ -569,13 +581,18 @@
"""
def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
+ """
+ @param withMetaData: include meta data for each peer?
+ @param withVolantile: include volantile data for each peer?
+ @ivar jobResult: on job completion, will be a list containing all perrs as one 'Peer' message for each peer
+ """
message = Message(
Message.ListPeers,
WithMetadata=fcpBool(withMetaData),
WithVolatile=fcpBool(withVolantile),
)
JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
-
+
def handleMessage(self,msg):
if msg.name == Message.EndListPeers:
@@ -585,17 +602,72 @@
else:
raise ValueError('Unexpected message: %s' % msg.name)
+
def handlePeer(self, msg):
+ if self.jobResult is None:
+ self.jobResult = [msg, ]
+ else:
+ self.jobResult.append(msg)
return True
def handleEndListPeers(self, msg):
self.jobTimeStop = time.time()
- self.jobResult = msg
+ if self.jobResult is None:
+ self.jobResult = []
self.jobClient.jobRemove(self.jobIdentifier)
return True
+
+class JobListPeerNotes(JobBase):
+ """Lists all notes associated to a peer of the node
+ """
+
+ def __init__(self, fcpClient, identifier):
+ """
+ @param identifier: identifier of the peer to list notes for (peer identity)
+ @ivar jobResult: on job completion will be a list containing all notes associated to the peer
+
+ @note: notes are only available for darknet peers (opennet == false)
+ """
+
+ message = Message(
+ Message.ListPeerNotes,
+ NodeIdentifier=identifier
+ )
+ JobBase.__init__(self, fcpClient, identifier, message)
+
+
+ def handleMessage(self,msg):
+ if msg.name == Message.EndListPeerNotes:
+ return self.handleEndListPeerNotes(msg)
+ elif msg.name == Message.PeerNote:
+ return self.handlePeerNote(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+ def handlePeerNote(self, msg):
+ note = msg.get('NoteText', '')
+ if note:
+ note = base64.decodestring(note)
+ if self.jobResult is None:
+ self.jobResult = [note, ]
+ else:
+ self.jobResult.append(note)
+ return True
+
+
+ def handleEndListPeerNotes(self, msg):
+ self.jobTimeStop = time.time()
+ if self.jobResult is None:
+ self.jobResult = []
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
+
+
#TODO: identifier collisions are not yet handled
class JobGetFileInfo(JobBase):
"""Tries to retieve information about a file. If everything goes well
@@ -735,7 +807,13 @@
"""
- def __init__(self, fcpClient, directory, read=True, write=True):
+ def __init__(self, fcpClient, directory, read=False, write=False):
+ """
+
+ @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed)
+ """
+
+
if not os.path.isdir(directory):
raise ValueError('No such directory: %r' % directory)
@@ -747,8 +825,7 @@
)
JobBase.__init__(self, fcpClient, directory, message)
self.jobTmpFile = None
-
-
+
def handleMessage(self, msg):
if msg.name == Message.TestDDAReply:
@@ -786,13 +863,50 @@
def handleTestDDAComplete(self, msg):
self.jobTimeStop = time.time()
- self.jobResult = msg
+ self.jobResult = (
+ pythonBool(msg.get('ReadDirectoryAllowed', 'false')),
+ pythonBool(msg.get('WriteDirectoryAllowed', 'false')),
+ )
saveRemoveFile(self.jobTmpFile)
self.jobTmpFile = None
self.jobClient.jobRemove(self.jobIdentifier)
return True
+
+class JobGenerateSSK(JobBase):
+ """Job to generate a SSK key pair
+ """
+
+
+ def __init__(self, fcpClient):
+ """
+ @ivar jobResult: on job completion, a tuple(insertURI, requestURI) of the generated
+ SSK key
+ """
+
+ identifier = newIdentifier()
+ message = Message(
+ Message.GenerateSSK,
+ Identifier=identifier,
+ )
+ JobBase.__init__(self, fcpClient, identifier, message)
+
+
+ def handleMessage(self, msg):
+ if msg.name == Message.SSKKeypair:
+ return self.handleSSKKeypair(msg)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+
+ def handleSSKKeypair(self, msg):
+ self.jobTimeStop = time.time()
+ self.jobResult = (msg['InsertURI'], msg['RequestURI'])
+ self.jobClient.jobRemove(self.jobIdentifier)
+ return True
+
+
#**************************************************************************
# fcp client
#**************************************************************************
@@ -840,20 +954,19 @@
with two params: SocketError + details. When the handler is called the client
is already closed.
@param verbosity: verbosity level for debugging
- @param logMessages: LogMessages class containing messages
+ @param logMessages: LogMessages class containing message strings
"""
self._isConnected = False
self._jobs = {
- #TODO: check if JobList is still required
- 'JobMapping': {},
- 'JobList': [],
+ 'Jobs': {},
+ 'PendingJobs': [],
'RegisteredDirectories': [],
}
- self._errorHandler = errorHandler
+ self._errorHandler = errorHandler #TODO: check!
self._log = logging.getLogger(name)
self._logMessages = logMessages
- self._lock = thread.allocate_lock() # lovk when resources are accessed
+ self._lock = thread.allocate_lock() # lock when resources are accessed
self._socket = None
self.setVerbosity(verbosity)
@@ -933,7 +1046,7 @@
if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst:
return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
- elif code == ProtocolError.Shutdown:
+ elif code == ProtocolError.ShuttingDown:
if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg):
# ########################################
@@ -956,6 +1069,11 @@
identifier = msg['Directory']
elif msg.name == Message.TestDDAComplete:
identifier = msg['Directory']
+ elif msg.name == Message.PeerNote:
+ identifier = msg['NodeIdentifier']
+ elif msg.name == Message.EndListPeerNotes:
+ identifier = msg['NodeIdentifier']
+
else:
identifier = msg.get('Identifier', None)
@@ -971,6 +1089,13 @@
elif msg.name == Message.Peer:
return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
+ #elif msg.name == Message.PeerNote:
+ # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg)
+
+ #elif msg.name == Message.EndListPeerNotes:
+ # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg)
+
+
# more here.....
else:
@@ -993,9 +1118,12 @@
"""
self._lock.acquire(True)
try:
- result = bool(self._jobs['JobList'])
+ result = self._jobs['Jobs'] or self._jobs['PendingJobs']
finally:
self._lock.release()
+
+
+
return result
@@ -1006,20 +1134,16 @@
"""
self._lock.acquire(True)
try:
- if job.jobIdentifier in self._jobs['JobMapping']:
- raise ValueError('Job with that identifier already present')
-
- if job.jobIdentifier in self._jobs['JobMapping']:
+ if job.jobIdentifier in self._jobs['Jobs']:
raise ValueError('Duplicate job: %r' % job.jobIdentifier)
- self._jobs['JobMapping'][job.jobIdentifier] = job
- self._jobs['JobList'].append(job)
+ self._jobs['Jobs'][job.jobIdentifier] = job
finally:
self._lock.release()
self._log.info(self._logMessages.JobStart + job.jobMessage.name)
job.handleStart()
if synchron:
- while job.jobResult is None:
+ while self.jobGet(job.jobIdentifier):
self.next()
@@ -1042,7 +1166,7 @@
"""
self._lock.acquire(True)
try:
- result = self._jobs['JobMapping'].get(identifier, None)
+ result = self._jobs['Jobs'].get(identifier, None)
finally:
self._lock.release()
return result
@@ -1055,7 +1179,7 @@
"""
self._lock.acquire(True)
try:
- result = identifier in self._jobs['JobMapping']
+ result = identifier in self._jobs['Jobs']
finally:
self._lock.release()
return result
@@ -1068,10 +1192,9 @@
"""
self._lock.acquire(True)
try:
- job = self._jobs['JobMapping'].get(identifier, None)
+ job = self._jobs['Jobs'].get(identifier, None)
if job is not None:
- self._jobs['JobList'].remove(job)
- del self._jobs['JobMapping'][identifier]
+ del self._jobs['Jobs'][identifier]
finally:
self._lock.release()
if job is None:
@@ -1209,17 +1332,86 @@
self._log.setLevel(verbosity)
+
########################################################
##
########################################################
def getFileInfo(self, job):
pass
- def getFile(self, job):
- pass
+
+
+ #########################################################
+ ## boilerplate code to tackle TestDDA
+ ##
+ ## ...but I don't trust it ;-) I was not yet alble to wrap my head around
+ ## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not.
+ ##
+ ## Another problem is that there is no way to know when a directory is no longer
+ ## needed. And I fon't want to write code in a Gui to tackle a problem that will hopefully
+ ## go away in the near future.
+ ##
+ ## see: https://bugs.freenetproject.org/view.php?id=1753
+ ##
+ #########################################################
+ def testWriteAccess(self, directory):
+ canRead, canWrite = False, False
+ result = self._jobs.get('RegisteredDirectories', None)
+ if result is not None:
+ canRead, canWrite = result
+ if not canWrite:
+ job = JobTestDDA(directory, read=canRead, write=True)
+ self.addJob(job, synchron=True)
+ canWrite = job.jobResult[1]
+ self._jobs['RegisteredDirectories'] = (canRead, canWrite)
+ return canWrite
+
+ def testReadAccess(self, directory):
+ canRead, canWrite = False, False
+ result = self._jobs.get('RegisteredDirectories', None)
+ if result is not None:
+ canRead, canWrite = result
+ if not canRead:
+ job = JobTestDDA(directory, read=True, write=canWrite)
+ self.addJob(job, synchron=True)
+ canRead = job.jobResult[0]
+ self._jobs['RegisteredDirectories'] = (canRead, canWrite)
+ return canRead
+
+
+ def downloadFile(self, directory, job):
+ if not os.path.isdir(directory):
+ raise IOError('No such directory')
+
+ self._jobs['PendingJobs'].append(job)
+ try:
+ result = self.testWriteAccess(directory)
+ if result:
+ self.addJob(job)
+ finally:
+ self._jobs['PendingJobs'].remove(job)
+ return result
+
+ def uploadFile(self, directory, job):
+ if not os.path.isdir(directory):
+ raise IOError('No such directory')
+
+ self._jobs['PendingJobs'].append(job)
+ try:
+ result = self.testReadAccess(directory)
+ if result:
+ self.addJob(job)
+ finally:
+ self._jobs['PendingJobs'].remove(job)
+ return result
+
+
+
+
+
#*****************************************************************************
#
#*****************************************************************************
@@ -1239,7 +1431,16 @@
#foo()
+
def foo():
+ job = JobGenerateSSK(c)
+ c.jobAdd(job, synchron=True)
+ print job.jobResult
+ foo()
+
+
+
+ def foo():
d = os.path.dirname(os.path.abspath(__file__))
job2 = JobTestDDA(c, d)
c.jobAdd(job2)
@@ -1250,12 +1451,21 @@
#foo()
def foo():
- job2 = JobListPeers(c)
- c.jobAdd(job2)
+ job = JobListPeers(c)
+ c.jobAdd(job)
c.run()
print '---------------------------'
- print job2.jobResult
+ print job.jobResult
print '---------------------------'
+
+
+ for peer in job.jobResult:
+ if not pythonBool(peer['opennet']):
+ job = JobListPeerNotes(c, peer['identity'])
+ c.jobAdd(job, synchron=True)
+ print '>>', job.jobResult
+ #.get('NoteText')
+
#foo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-27 17:00:16
|
Revision: 15
http://fclient.svn.sourceforge.net/fclient/?rev=15&view=rev
Author: jUrner
Date: 2007-10-27 10:00:21 -0700 (Sat, 27 Oct 2007)
Log Message:
-----------
started implementing public methods and events
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 16:59:07 UTC (rev 14)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 17:00:21 UTC (rev 15)
@@ -21,6 +21,24 @@
import thread
import uuid
+
+#--> rel import hack
+def parentdir(n, fpath):
+ fpath = os.path.abspath(fpath)
+ for i in xrange(n):
+ fpath = os.path.dirname(fpath)
+ return fpath
+sys.path.insert(0, parentdir(3, __file__))
+
+
+from fclient_lib.pyex import events
+
+
+sys.path.pop(0)
+del parentdir
+#<-- rel import hack
+
+
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
#**************************************************************
# consts
@@ -32,6 +50,14 @@
DefaultFcpPort = 9481
SocketTimeout = 0.1
+
+
+class Verbosity:
+ Debug = logging.DEBUG
+ Info = logging.INFO
+ Warning = logging.WARNING
+
+
class FixedJobIdentifiers:
"""Fixed job identifiers
@note: he client can only handle one job of these at a time
@@ -44,6 +70,7 @@
ModifyConfig = 'ModifyConfig'
WatchGlobal = 'WatchGlobal'
Shutdown = 'Shutdown'
+
@@ -61,6 +88,23 @@
PriorityMin = Minimum
PriorityDefault = Bulk
+
+class PeerNodeStatus:
+ Connected = 1
+ RoutingBackedOff = 2
+ TooNew = 3
+ TooOld = 4
+ Disconnected = 5
+ NeverConnected = 6
+ Disabled = 7
+ Bursting = 8
+ Listening = 9
+ ListenOnly = 10
+ ClockProblem = 11
+ ConnError = 12
+ Disconnecting = 13
+
+
#************************************************************************************
# exceptions
#************************************************************************************
@@ -194,11 +238,13 @@
return 'false'
-def newIdentifier():
+def newIdentifier(prefix=None):
"""Returns a new unique identifier
@return: (str) uuid
"""
- return 'fclient::' + str(uuid.uuid4())
+ if prefix:
+ return prefix + str(uuid.uuid4())
+ return str(uuid.uuid4())
def pythonBool(fcpBool):
@@ -580,7 +626,7 @@
"""Lists all known peers of the node
"""
- def __init__(self, fcpClient, withMetaData=False, withVolantile=False):
+ def __init__(self, fcpClient, withMetaData=True, withVolantile=True):
"""
@param withMetaData: include meta data for each peer?
@param withVolantile: include volantile data for each peer?
@@ -594,6 +640,11 @@
JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
+ def handleStart(self):
+ JobBase.handleStart(self)
+ self.jobClient.EventListPeers()
+
+
def handleMessage(self,msg):
if msg.name == Message.EndListPeers:
return self.handleEndListPeers(msg)
@@ -604,14 +655,16 @@
def handlePeer(self, msg):
+ self.jobClient.EventListNextPeer(msg.params)
if self.jobResult is None:
- self.jobResult = [msg, ]
+ self.jobResult = [msg.params, ]
else:
- self.jobResult.append(msg)
+ self.jobResult.append(msg.params)
return True
def handleEndListPeers(self, msg):
+ self.jobClient.EventEndListPeers()
self.jobTimeStop = time.time()
if self.jobResult is None:
self.jobResult = []
@@ -639,6 +692,11 @@
JobBase.__init__(self, fcpClient, identifier, message)
+ def handleStart(self):
+ JobBase.handleStart(self)
+ self.jobClient.EventListPeerNotes()
+
+
def handleMessage(self,msg):
if msg.name == Message.EndListPeerNotes:
return self.handleEndListPeerNotes(msg)
@@ -649,6 +707,7 @@
def handlePeerNote(self, msg):
note = msg.get('NoteText', '')
+ self.jobClient.EventListNextPeerNote(note)
if note:
note = base64.decodestring(note)
if self.jobResult is None:
@@ -659,6 +718,7 @@
def handleEndListPeerNotes(self, msg):
+ self.jobClient.EventEndListPeerNotes()
self.jobTimeStop = time.time()
if self.jobResult is None:
self.jobResult = []
@@ -812,8 +872,6 @@
@ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed)
"""
-
-
if not os.path.isdir(directory):
raise ValueError('No such directory: %r' % directory)
@@ -823,6 +881,7 @@
WantReadDirectory=fcpBool(read),
WantWriteDirectory=fcpBool(write),
)
+
JobBase.__init__(self, fcpClient, directory, message)
self.jobTmpFile = None
@@ -832,10 +891,24 @@
return self.handleTestDDAReply(msg)
elif msg.name == Message.TestDDAComplete:
return self.handleTestDDAComplete(msg)
+ elif msg.name == Message.ProtocolError:
+ return self.handleProtocolError(msg)
else:
raise ValueError('Unexpected message: %s' % msg.name)
+ def handleProtocolError(self, msg):
+ # most likely code 7 here...
+ # "Both WantReadDirectory and WantWriteDirectory are set to false: what's the point of sending a message?"
+ # ..a stupid response that is ;-)
+ self.jobTimeStop = time.time()
+ self.jobClient.jobRemove(self.jobIdentifier)
+ if msg['Code'] == ProtocolError.InvalidMessage:
+ self.jobResult = (False, False)
+ else:
+ raise ValueError('Unexpected message: %s' % msg.name)
+
+
def handleTestDDAReply(self, msg):
fpathWrite = msg.params.get('WriteFilename', None)
fpathRead = msg.params.get('ReadFilename', None)
@@ -938,14 +1011,25 @@
#TODO: do not mix directories as identifiers with identifiers (might lead to collisions)
#TODO: how to handle (ProtocolError code 18: Shutting down)?
-class FcpClient(object):
+class FcpClient(events.Events):
"""Fcp client implementation"""
+ _events_ = (
+ 'EventListPeers',
+ 'EventListNextPeer',
+ 'EventEndListPeers',
+
+ 'EventListPeerNotes',
+ 'EventListNextPeerNote',
+ 'EventEndListPeerNotes',
+
+ )
+
def __init__(self,
name='',
errorHandler=None,
- verbosity=logging.WARNING,
+ verbosity=Verbosity.Warning,
logMessages=LogMessages
):
"""
@@ -963,7 +1047,7 @@
'PendingJobs': [],
'RegisteredDirectories': [],
}
- self._errorHandler = errorHandler #TODO: check!
+ self._errorHandler = errorHandler #TODO: check if necessary!
self._log = logging.getLogger(name)
self._logMessages = logMessages
self._lock = thread.allocate_lock() # lock when resources are accessed
@@ -1065,6 +1149,7 @@
else:
# check if the is something like an identifier in the message
+ #TODO: we run into troubles when using directories and NodeIdentifiers as identifiers
if msg.name == Message.TestDDAReply:
identifier = msg['Directory']
elif msg.name == Message.TestDDAComplete:
@@ -1405,13 +1490,27 @@
finally:
self._jobs['PendingJobs'].remove(job)
return result
-
-
-
+
+ #################################################
+ ##
+ ## public methods
+ ##
+ #################################################
+ def peerList(self, synchron=False):
+ job = JobListPeers(self)
+ self.jobAdd(job, synchron=synchron)
+ return job.jobResult
+
+ def peerNotes(self, peer, synchron=False):
+ if pythonBool(peer['opennet']):
+ return []
+ job = JobListPeerNotes(self, peer['identity'])
+ self.jobAdd(job, synchron=synchron)
+ return job.jobResult
+
-
#*****************************************************************************
#
#*****************************************************************************
@@ -1436,7 +1535,7 @@
job = JobGenerateSSK(c)
c.jobAdd(job, synchron=True)
print job.jobResult
- foo()
+ #foo()
@@ -1451,29 +1550,18 @@
#foo()
def foo():
- job = JobListPeers(c)
- c.jobAdd(job)
- c.run()
- print '---------------------------'
- print job.jobResult
- print '---------------------------'
+ peers = c.peerList(synchron=True)
+ for peer in peers:
+ print c.peerNotes(peer, synchron=True)
+
+ foo()
- for peer in job.jobResult:
- if not pythonBool(peer['opennet']):
- job = JobListPeerNotes(c, peer['identity'])
- c.jobAdd(job, synchron=True)
- print '>>', job.jobResult
- #.get('NoteText')
-
- #foo()
-
-
def foo():
- job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip')
+ #job = JobGetFileInfo(c, 'USK@IK8rVHfjCz1i1IZwgyafsPNDGODCk~EtBgsKWHmPKoQ,FeDoFZ8YeUsU0vo1-ZI~GRhZjeyXyaGhn-xQkIoGpak,AQACAAE/FreeMulET/15/FreeMulET-0.6.29-lib-jar.zip')
- #job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
+ job = JobGetFileInfo(c, 'CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
#job.jobIdentifier = job.jobMessage['Identifier'] = 1
#job.jobMessage['Identifier'] = 1
#job.jobIdentifier = 1
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-27 20:59:41
|
Revision: 16
http://fclient.svn.sourceforge.net/fclient/?rev=16&view=rev
Author: jUrner
Date: 2007-10-27 13:59:45 -0700 (Sat, 27 Oct 2007)
Log Message:
-----------
bit of refactoring + play ClientHello as save as possible
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 17:00:21 UTC (rev 15)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 20:59:45 UTC (rev 16)
@@ -1,4 +1,54 @@
'''Freenet client protocol 2.0 implementation
+
+
+@newfield event, events
+
+
+
+Sample code::
+
+ client = FcpClient()
+ nodeHello = client.connect()
+ if nodeHello is not None:
+ # do whatever
+
+
+
+Most method calls can be made either synchron or asynchron::
+
+ peers = client.peerList(synchron=True)
+ for peer in peers:
+ # do whatever
+
+
+To get informed about asynchron events you should connect the relevant events the client provides::
+
+ # connect to one single event
+ client.EventListNextPeer += MyCallback
+
+ # connect to multiple events at once
+ client += (
+ (client.EventListPeers, MyCallback1),
+ (client.EventEndListPeers, MyCallback2),
+ )
+
+ # each callback is called with the event as first parameter, followed by additional parameters,
+ # depending on the event triggered.
+ def MyListNextPeerCallback(event, peer):
+ print peer
+
+ client.peerList(synchron=False)
+
+
+ # when event notifications are no longer required, you should always make shure to disconnect from them
+ client.EventListNextPeer -= MyCallback
+ client -= (
+ (client.EventListPeers, MyCallback1),
+ (client.EventEndListPeers, MyCallback2),
+ )
+
+
+
'''
#NOTE:
@@ -478,6 +528,11 @@
SubscribedUSKUpdate = 'SubscribedUSKUpdate'
+ # client messages (internal use only)
+ ClientSocketTimeout = 0
+ ClientSocketDied = 1
+
+
def __init__(self, name, data=None, **params):
"""
@param name: messge name
@@ -489,15 +544,71 @@
self.name = name
self.params = params
- def toString(self):
- """Returns the message as formated string ready to be send"""
- # TODO: "Data" not yet implemented
- out = [self.name, ]
- for param, value in self.params.items():
- out.append('%s=%s' % (param, value))
- out.append('EndMessage\n')
- return '\n'.join(out)
+ def get(self, name, default=None):
+ """Returns the message parameter 'name' or 'default' """
+ return self.params.get(name, default)
+
+ def __getitem__(self, name):
+ """Returns the message parameter 'name' """
+ return self.params[name]
+
+ def __setitem__(self, name, value):
+ """Sets the message parameter 'name' to 'value' """
+ self.params[name] = value
+
+ @classmethod
+ def fromSocket(clss, socketObj):
+ msg = clss(None)
+ buf = []
+ while True:
+
+ try:
+ p = socketObj.recv(1)
+ if not p: raise ValueError('Socket is dead')
+ except socket.timeout, d: # no new messages in queue
+ msg.name = clss.ClientSocketTimeOut
+ return msg
+ except Exception, d:
+ msg.name = clss.ClientSocketDied
+ msg['Exception'] = Exception
+ msg['Details'] = d
+ return msg
+
+ if p == '\r': # ignore
+ continue
+
+ if p != '\n':
+ buf.append(p)
+ continue
+
+ line = ''.join(buf)
+ if line in ('End', "EndMessage"):
+ break
+ buf = []
+
+ if msg.name is None:
+ msg.name = line
+ elif line == 'Data':
+ n = int(msg.params['DataLength'])
+ try:
+ msg.data = socketObj.recv(n)
+ if not msg.data: raise ValueError('Socket is dead')
+ except Exception, d:
+ msg.name = clss.ClientSocketDied
+ msg['Exception'] = Exception
+ msg['Details'] = d
+ return msg
+
+ else:
+ head, sep, tail = line.partition('=')
+ msg.params[head] = tail
+ if not sep:
+ # TODO: chek for invalid messages or not
+ pass
+
+ return msg
+
def pprint(self):
"""Returns the message as nicely formated human readable string"""
out = ['', '>>' + self.name, ]
@@ -505,28 +616,19 @@
out.append('>> %s=%s' % (param, value))
out.append('>>EndMessage')
return '\n'.join(out)
-
- def __getitem__(self, name):
- """Returns the message parameter 'name' """
- return self.params[name]
-
- def get(self, name, default=None):
- """Returns the message parameter 'name' or 'default' """
- return self.params.get(name, default)
-
- def __setitem__(self, name, value):
- """Sets the message parameter 'name' to 'value' """
- self.params[name] = value
-
+ def toString(self):
+ """Returns the message as formated string ready to be send"""
+ # TODO: "Data" not yet implemented
+ if isinstance(self.name, (int, long)):
+ raise ValueError('You can not send client internal messages to the node')
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append('%s=%s' % (param, value))
+ out.append('EndMessage\n')
+ return '\n'.join(out)
-class MessageSocketTimeout(Message):
-
- def __init__(self):
- Message.__init__(self, 'USocketTimeOut')
-
-
#**************************************************************************
# jobs
#**************************************************************************
@@ -1012,7 +1114,8 @@
#TODO: how to handle (ProtocolError code 18: Shutting down)?
class FcpClient(events.Events):
- """Fcp client implementation"""
+ """Fcp client implementation
+ """
_events_ = (
'EventListPeers',
@@ -1094,17 +1197,25 @@
else:
self._log.info(self._logMessages.Connected)
+ # send ClientHello and wait for NodeHello
#NOTE: thought I could leave ClientHelloing up to the caller
# but instad of responding with ClientHelloMustBeFirst
- # as expected the socket simply breaks. So take it over.
+ # as expected when not doing so, the node disconnects.
+ # So take it over here.
job = JobClientHello(self)
- self.jobAdd(job, synchron=True)
- assert job.jobResult is not None, 'ClientHello is not working as expected'
- return job.jobResult
-
+ self.jobAdd(job, synchron=False)
+ while time_elapsed <= repeat:
+ msg = self.next()
+ if msg.name == Message.ClientSocketTimeout:
+ time_elapsed += SocketTimeout
+ elif msg.name == Message.NodeHello:
+ return msg.params
+ else:
+ break
+ break
+
+ # continue polling
self._log.info(self._logMessages.ConnectionRetry)
-
- # continue polling
time_elapsed += timeout
time.sleep(timeout)
@@ -1117,9 +1228,8 @@
@param msg: (Message) to handle
@return: True if the message was handled, False otherwise
"""
-
-
- if msg.name == 'USocketTimeOut':
+
+ if msg.name == Message.ClientSocketTimeout:
return True
self._log.debug(self._logMessages.MessageReceived + msg.pprint())
@@ -1190,8 +1300,7 @@
return self.jobDispatchMessage(identifier, msg)
raise RuntimeError('Should not have endet here: %s' % msg.name)
-
-
+
#########################################################
@@ -1212,6 +1321,7 @@
return result
+ #TODO: not quite clear about the consequences of a synchron job. Have to think this over
def jobAdd(self, job, synchron=False):
"""Adds a job to the client
@param job: (Job*) job to add
@@ -1288,87 +1398,28 @@
return True
- #TODO: some info when all jobs are completed
+ #TODO: some info when all jobs are completed?
def next(self):
"""Pumps the next message waiting
@note: use this method instead of run() to run the client step by step
"""
- msg = self.readMessage()
+ msg = Message.fromSocket(self._socket)
+ if msg.name == Message.ClientSocketDied:
+ raise SocketError(msg['Details'])
self.handleMessage(msg)
return msg
- def readMessage(self):
- """Reads the next message directly from the socket and dispatches it
- @return: (Message) the next message read from the socket
- @raise SocketError: if the socket connection to the node dies unexpectedly
- If an error handler is passed to the client it is called emidiately before the error
- is raised.
- """
- msg = Message(None)
- buf = []
- while True:
-
- try:
- p = self._socket.recv(1)
- if not p: raise ValueError('Socket is dead')
- except socket.timeout, d: # no new messages in queue
- msg = MessageSocketTimeout()
- break
- except Exception, d:
- self._log.info(self._logMessages.SocketDead)
- self.close()
- if self._errorHandler is not None:
- self._errorHandler(SocketError, d)
- raise SocketError(d) #!!
-
- if p == '\r': # ignore
- continue
-
- if p != '\n':
- buf.append(p)
- continue
-
- line = ''.join(buf)
- if line in ('End', "EndMessage"):
- break
- buf = []
-
- if msg.name is None:
- msg.name = line
- elif line == 'Data':
- n = int(msg.params['DataLength'])
- try:
- msg.data = self._socket.recv(n)
- if not msg.data: raise ValueError('Socket is dead')
- except Exception, d:
- self._log.info(self._logMessages.SocketDead)
- self.close()
- if self._errorHandler is not None:
- self._errorHandler(SocketError, d)
- raise SocketError(d) #!!
-
- else:
- head, sep, tail = line.partition('=')
- msg.params[head] = tail
- if not sep:
- # TODO: chek for invalid messages or not
- pass
-
- return msg
-
-
+
def run(self):
"""Runs the client untill all jobs passed to it are completed
@note: use KeyboardInterrupt to stop prematurely
"""
try:
#n = 0
-
while self.hasJobsRunning():
#n += 1
#if n > 40: break
self.next()
-
except KeyboardInterrupt:
self._log(self._logMessages.KeyboardInterrupt)
self.close()
@@ -1429,7 +1480,7 @@
#########################################################
## boilerplate code to tackle TestDDA
##
- ## ...but I don't trust it ;-) I was not yet alble to wrap my head around
+ ## ...but I don't trust it ;-) I was not yet able to wrap my head around
## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not.
##
## Another problem is that there is no way to know when a directory is no longer
@@ -1498,13 +1549,30 @@
##
#################################################
def peerList(self, synchron=False):
+ """Lists all peers of the node
+ @param synchron: if True, waits untill the call is completed, if False returns emidiately
+ @return: (list) of peers in a synchron, always None in an asynchron call
+
+ @event: EventListPeers(event).
+ @event: EventListNextPeer(event, peer).
+ @event: EventEndListPeers(event).
+ """
job = JobListPeers(self)
self.jobAdd(job, synchron=synchron)
return job.jobResult
def peerNotes(self, peer, synchron=False):
- if pythonBool(peer['opennet']):
+ """Lists all text notes associated to a peer
+ @param peer: peer as returned in a call to L{peerList}
+ @param synchron: if True, waits untill the call is completed, if False returns emidiately
+ @return: (list) of notes in a synchron, always None in an asynchron call
+
+ @event: EventListPeerNotes(event).
+ @event: EventListNextPeerNote(event, note).
+ @event: EventEndListPeerNotes(event).
+ """
+ if pythonBool(peer['opennet']): # opennet peers do not have any notes associated
return []
job = JobListPeerNotes(self, peer['identity'])
self.jobAdd(job, synchron=synchron)
@@ -1517,7 +1585,10 @@
if __name__ == '__main__':
c = FcpClient(name='test', verbosity=logging.DEBUG)
nodeHello = c.connect()
- if nodeHello is not None:
+ if nodeHello is not None or 1:
+
+
+
def foo():
job1 = JobClientHello(c)
c.jobAdd(job1)
@@ -1529,8 +1600,13 @@
# should raise
#foo()
-
+ def foo():
+ job = JobGenerateSSK(c)
+ c.jobAdd(job, synchron=True)
+ print job.jobResult
+ #foo()
+
def foo():
job = JobGenerateSSK(c)
c.jobAdd(job, synchron=True)
@@ -1554,7 +1630,7 @@
for peer in peers:
print c.peerNotes(peer, synchron=True)
- foo()
+ #foo()
def foo():
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-28 09:43:18
|
Revision: 17
http://fclient.svn.sourceforge.net/fclient/?rev=17&view=rev
Author: jUrner
Date: 2007-10-28 02:43:20 -0700 (Sun, 28 Oct 2007)
Log Message:
-----------
combed over message object ++ some bug fixes
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-27 20:59:45 UTC (rev 16)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 09:43:20 UTC (rev 17)
@@ -328,8 +328,12 @@
"""
if fpath is not None:
if os.path.isfile(fpath):
- os.remove(fpath)
- return True
+ try:
+ os.remove(fpath)
+ except Exception, d:
+ pass
+ else:
+ return True
return False
@@ -465,6 +469,8 @@
class Message(object):
"""Class wrapping a freenet message"""
+ __slots__ = ('name', 'data', 'params')
+
# client messages
ClientHello = 'ClientHello'
ListPeer = 'ListPeer' # (since 1045)
@@ -527,12 +533,11 @@
UnknownPeerNoteType = 'UnknownPeerNoteType'
SubscribedUSKUpdate = 'SubscribedUSKUpdate'
-
# client messages (internal use only)
ClientSocketTimeout = 0
ClientSocketDied = 1
+
-
def __init__(self, name, data=None, **params):
"""
@param name: messge name
@@ -545,70 +550,102 @@
self.params = params
+ @classmethod
+ def bytesFromSocket(clss, socketObj, n):
+ """Reads n bytes from socket
+ @param socketObj: socket to read bytes from
+ @param n: (int) number of bytes to read
+ @return: (tuple) error-message or None, bytes read or None if an error occured
+ or no bytes could be read
+ """
+ error = p = None
+ try:
+ p = socketObj.recv(n)
+ if not p:
+ p = None
+ raise socket.error('Socket shut down by node')
+ except socket.timeout, d: # no new messages in queue
+ error = clss(clss.ClientSocketTimeout)
+ except socket.error, d:
+ error = clss(clss.ClientSocketDied, Exception=socket.error, Details=d)
+ return error, p
+
+
+ @classmethod
+ def fromSocket(clss, socketObj):
+ """Reads a message from a socket
+ @param socketObj: socket to read a message from
+ @return: L{Message} next message from the socket. If the socket dies
+ unexpectedly a L{ClientSocketDied} message is returned containing the parameters
+ 'Exception' and 'Details'. If the socket times out a L{MessageClientSocketTimout}
+ message is returned.
+ """
+
+ msg = clss(None)
+ buf = []
+
+ #TODO: to buffer or not to buffer?
+ while True:
+
+ # get next line from socket
+ error, p = clss.bytesFromSocket(socketObj, 1)
+ if error:
+ return error
+
+ if p != '\n':
+ buf.append(p)
+ continue
+ #TODO: check if '\r\n' is allowed in freenet client protocol
+ else:
+ if buf[-1] == '\r':
+ del buf[-1]
+
+ line = ''.join(buf)
+ buf = []
+ if line == 'EndMessage':
+ break
+
+ # first line == message name
+ if msg.name is None:
+ msg.name = line
+
+ # get data member
+ elif line == 'Data':
+ remaining = int(msg.params['DataLength'])
+ msg.data = ''
+ while remaining > 0:
+ error, p = clss.bytesFromSocket(socketObj, remaining)
+ if error:
+ return error
+ remaining -= len(p)
+ msg.data += p
+ break
+
+ # get next paramater
+ else:
+ head, sep, tail = line.partition('=')
+ msg.params[head] = tail
+ # TODO: errorchek params?
+ #if not sep: pass
+
+ return msg
+
+
def get(self, name, default=None):
"""Returns the message parameter 'name' or 'default' """
return self.params.get(name, default)
+
def __getitem__(self, name):
"""Returns the message parameter 'name' """
return self.params[name]
+
def __setitem__(self, name, value):
"""Sets the message parameter 'name' to 'value' """
self.params[name] = value
- @classmethod
- def fromSocket(clss, socketObj):
- msg = clss(None)
- buf = []
- while True:
-
- try:
- p = socketObj.recv(1)
- if not p: raise ValueError('Socket is dead')
- except socket.timeout, d: # no new messages in queue
- msg.name = clss.ClientSocketTimeOut
- return msg
- except Exception, d:
- msg.name = clss.ClientSocketDied
- msg['Exception'] = Exception
- msg['Details'] = d
- return msg
-
- if p == '\r': # ignore
- continue
-
- if p != '\n':
- buf.append(p)
- continue
-
- line = ''.join(buf)
- if line in ('End', "EndMessage"):
- break
- buf = []
-
- if msg.name is None:
- msg.name = line
- elif line == 'Data':
- n = int(msg.params['DataLength'])
- try:
- msg.data = socketObj.recv(n)
- if not msg.data: raise ValueError('Socket is dead')
- except Exception, d:
- msg.name = clss.ClientSocketDied
- msg['Exception'] = Exception
- msg['Details'] = d
- return msg
-
- else:
- head, sep, tail = line.partition('=')
- msg.params[head] = tail
- if not sep:
- # TODO: chek for invalid messages or not
- pass
-
- return msg
-
+
def pprint(self):
"""Returns the message as nicely formated human readable string"""
out = ['', '>>' + self.name, ]
@@ -617,6 +654,14 @@
out.append('>>EndMessage')
return '\n'.join(out)
+
+ def send(self, socketObj):
+ """Dumps the message to a socket
+ @param socketObj: socket to dump the message to
+ """
+ socketObj.sendall(self.toString())
+
+
def toString(self):
"""Returns the message as formated string ready to be send"""
# TODO: "Data" not yet implemented
@@ -625,7 +670,18 @@
out = [self.name, ]
for param, value in self.params.items():
out.append('%s=%s' % (param, value))
- out.append('EndMessage\n')
+ if self.data:
+ assert 'DataLength' in self.params, 'DataLength member required'
+ n = None
+ try:
+ n = int(self['DataLength'])
+ except ValueError: pass
+ assert n is not None, 'DataLength member must be an integer'
+ assert n == len(self.data), 'DataLength member must corrospond to lenght of data'
+ out.append('Data')
+ out.append(self.data)
+ else:
+ out.append('EndMessage\n')
return '\n'.join(out)
@@ -1448,7 +1504,7 @@
"""
self._log.debug(self._logMessages.MessageSend + msg.pprint())
try:
- self._socket.sendall(msg.toString())
+ msg.send(self._socket)
except socket.error, d:
self._log.info(self._logMessages.SocketDead)
self.close()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-28 21:56:17
|
Revision: 20
http://fclient.svn.sourceforge.net/fclient/?rev=20&view=rev
Author: jUrner
Date: 2007-10-28 14:56:19 -0700 (Sun, 28 Oct 2007)
Log Message:
-----------
some more cooments
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 09:45:09 UTC (rev 19)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 21:56:19 UTC (rev 20)
@@ -51,14 +51,6 @@
'''
-#NOTE:
-#
-# downloading data to disk is not supported st the moment. TestDDA code is quite unwritable
-# and as far as I can see there are plans to get rid of it. So wait...
-#
-#
-
-
import atexit
import base64
import logging
@@ -688,6 +680,8 @@
#**************************************************************************
# jobs
#**************************************************************************
+#TODO: maybe remove syncron functionality and rely only on signals
+# ...if so, remove timeStart, timeStop aswell.. leave up to caller
class JobBase(object):
"""Base class for jobs"""
@@ -1165,10 +1159,9 @@
#TODO: no idea what happens on reconnect if socket died. What about running jobs?
#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
#TODO: no idea if to add support for pending jobs and queue management here
-
+#
#TODO: do not mix directories as identifiers with identifiers (might lead to collisions)
#TODO: how to handle (ProtocolError code 18: Shutting down)?
-
class FcpClient(events.Events):
"""Fcp client implementation
"""
@@ -1316,6 +1309,7 @@
# check if the is something like an identifier in the message
#TODO: we run into troubles when using directories and NodeIdentifiers as identifiers
+ # have to maintain extra queues to prevent this. jobDispatchMessage(queue='directories')
if msg.name == Message.TestDDAReply:
identifier = msg['Directory']
elif msg.name == Message.TestDDAComplete:
@@ -1534,17 +1528,30 @@
#########################################################
- ## boilerplate code to tackle TestDDA
+ ## how to tackle TestDDA?
##
- ## ...but I don't trust it ;-) I was not yet able to wrap my head around
- ## jobAdd(synchron=True) enough to know wether it is save (thread, deadlock) or not.
+ ## best idea hear so far is to wait for ProtocolError 25 Test DDA denied (or PersistantGet)
+ ## and reinsert the job if necessary after TestDDA completion.
##
- ## Another problem is that there is no way to know when a directory is no longer
- ## needed. And I fon't want to write code in a Gui to tackle a problem that will hopefully
- ## go away in the near future.
+ ## Problem is, how to wait for a message without flooding the caller. Basic idea of the client
+ ## is to enshure a Gui can stay responsitive by letting the caller decide when to process the
+ ## next message. So waiting would require to buffer messages and watch messages carefuly
+ ## as they flood in.
+ ##
+ ## If we do not wait, the caller may flood us with download requests, I fear, faster than
+ ## the node and we are able to go get the error and through the TestDDA drill. Have to
+ ## do some tests to see how the node reacts.
##
- ## see: https://bugs.freenetproject.org/view.php?id=1753
+ ## easiest approach would be to let the caller test a directory explicitely when HE thinks
+ ## it might be necessary. But then this code will hang around forever with an already
+ ## assigned bug report [https://bugs.freenetproject.org/view.php?id=1753] suggesting
+ ## much easier processing to test DDA (DDA Challenge)
##
+ ##
+ ## so.. maybe best is to lurker around a while and keep an eye on the tracker
+ ##
+ ##
+ ## below is just some old boilerplate code.. to be removed sooner or later
#########################################################
def testWriteAccess(self, directory):
canRead, canWrite = False, False
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-28 22:44:18
|
Revision: 21
http://fclient.svn.sourceforge.net/fclient/?rev=21&view=rev
Author: jUrner
Date: 2007-10-28 15:44:22 -0700 (Sun, 28 Oct 2007)
Log Message:
-----------
added some fixed identifier prefixes to avoid collisions
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 21:56:19 UTC (rev 20)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 22:44:22 UTC (rev 21)
@@ -131,6 +131,9 @@
PriorityDefault = Bulk
+#TODO: no idea how fcp handles strings as in <Peer volatile.status=CONNECTED>
+# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED
+# in --> freenet/node/PeerManager.java
class PeerNodeStatus:
Connected = 1
RoutingBackedOff = 2
@@ -147,6 +150,18 @@
Disconnecting = 13
+#TODO: see if we can get away with these to avoid collisions. TestDDA uses no prefix
+# cos ProtocolError 7 passes the directory as identifier and there is no other hint
+# that the error is related to TestDDA.
+class IdentifierPrefix:
+ """Identifier prefixes"""
+
+ ClientGet = 'ClientGet::'
+ #TestDDA = ''
+ PeerNote = 'PeerNote::'
+
+
+
#************************************************************************************
# exceptions
#************************************************************************************
@@ -656,7 +671,8 @@
def toString(self):
"""Returns the message as formated string ready to be send"""
- # TODO: "Data" not yet implemented
+
+ #TODO: just a guess, so maybe remove this check
if isinstance(self.name, (int, long)):
raise ValueError('You can not send client internal messages to the node')
out = [self.name, ]
@@ -841,7 +857,7 @@
Message.ListPeerNotes,
NodeIdentifier=identifier
)
- JobBase.__init__(self, fcpClient, identifier, message)
+ JobBase.__init__(self, fcpClient, IdentifierPrefix.PeerNote + identifier, message)
def handleStart(self):
@@ -906,7 +922,7 @@
string and size may not be accurate.
"""
- identifier = newIdentifier()
+ identifier = IdentifierPrefix.ClientGet + newIdentifier()
message = Message(
Message.ClientGet,
Identifier=identifier,
@@ -1158,8 +1174,6 @@
#TODO: no idea what happens on reconnect if socket died. What about running jobs?
#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
-#TODO: no idea if to add support for pending jobs and queue management here
-#
#TODO: do not mix directories as identifiers with identifiers (might lead to collisions)
#TODO: how to handle (ProtocolError code 18: Shutting down)?
class FcpClient(events.Events):
@@ -1290,6 +1304,8 @@
return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
elif code == ProtocolError.ShuttingDown:
+
+ #TODO: ??? why dispatch to ClientHello.. can't remember
if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg):
# ########################################
@@ -1308,39 +1324,26 @@
else:
# check if the is something like an identifier in the message
- #TODO: we run into troubles when using directories and NodeIdentifiers as identifiers
- # have to maintain extra queues to prevent this. jobDispatchMessage(queue='directories')
if msg.name == Message.TestDDAReply:
identifier = msg['Directory']
elif msg.name == Message.TestDDAComplete:
identifier = msg['Directory']
elif msg.name == Message.PeerNote:
- identifier = msg['NodeIdentifier']
+ identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier']
elif msg.name == Message.EndListPeerNotes:
- identifier = msg['NodeIdentifier']
-
+ identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier']
else:
identifier = msg.get('Identifier', None)
# dispatch to jobs with fixed identifiers
if identifier is None:
-
if msg.name == Message.NodeHello:
return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
-
elif msg.name == Message.EndListPeers:
return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
-
elif msg.name == Message.Peer:
return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
- #elif msg.name == Message.PeerNote:
- # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg)
-
- #elif msg.name == Message.EndListPeerNotes:
- # return self.jobDispatchMessage(FixedJobIdentifiers.ListPeerNotes, msg)
-
-
# more here.....
else:
@@ -1349,7 +1352,7 @@
else:
return self.jobDispatchMessage(identifier, msg)
- raise RuntimeError('Should not have endet here: %s' % msg.name)
+ raise RuntimeError('We should not have gotten here: %s' % msg.name)
@@ -1670,14 +1673,8 @@
#foo()
- def foo():
- job = JobGenerateSSK(c)
- c.jobAdd(job, synchron=True)
- print job.jobResult
- #foo()
-
+
-
def foo():
d = os.path.dirname(os.path.abspath(__file__))
job2 = JobTestDDA(c, d)
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-10-30 15:11:01
|
Revision: 22
http://fclient.svn.sourceforge.net/fclient/?rev=22&view=rev
Author: jUrner
Date: 2007-10-30 08:11:02 -0700 (Tue, 30 Oct 2007)
Log Message:
-----------
Another major rewrite. Cut all down to a plain message handler and events.
For now no way for now to wrap the protocol on a higher level without getting
into deep troubles.
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-28 22:44:22 UTC (rev 21)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-30 15:11:02 UTC (rev 22)
@@ -93,6 +93,11 @@
SocketTimeout = 0.1
+class IdentifierPrefix:
+ """Special purpose identifier prefixes"""
+
+ FileInfo = 'FileInfo::'
+
class Verbosity:
Debug = logging.DEBUG
@@ -100,22 +105,6 @@
Warning = logging.WARNING
-class FixedJobIdentifiers:
- """Fixed job identifiers
- @note: he client can only handle one job of these at a time
- """
- ClientHello = 'ClientHello'
- ListPeers = 'ListPeers'
- ListPeerNotes = 'ListPeerNotes'
- GetNode = 'GetNode'
- GetConfig = 'GetConfig'
- ModifyConfig = 'ModifyConfig'
- WatchGlobal = 'WatchGlobal'
- Shutdown = 'Shutdown'
-
-
-
-
class Priorities:
"""All priorities supported by the client"""
@@ -132,8 +121,8 @@
#TODO: no idea how fcp handles strings as in <Peer volatile.status=CONNECTED>
-# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED
-# in --> freenet/node/PeerManager.java
+# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED
+# in --> freenet/node/PeerManager.java
class PeerNodeStatus:
Connected = 1
RoutingBackedOff = 2
@@ -150,16 +139,10 @@
Disconnecting = 13
-#TODO: see if we can get away with these to avoid collisions. TestDDA uses no prefix
-# cos ProtocolError 7 passes the directory as identifier and there is no other hint
-# that the error is related to TestDDA.
-class IdentifierPrefix:
- """Identifier prefixes"""
-
- ClientGet = 'ClientGet::'
- #TestDDA = ''
- PeerNote = 'PeerNote::'
+class PeerNoteType:
+ """All known peer note types"""
+ Private = '1'
#************************************************************************************
@@ -170,7 +153,7 @@
def __init__(self, msg):
"""
- @param msg: (Message) GetFailed message
+ @param msg: (Message) GetFailed message or its parameters dict
"""
self.value = '%s (%s, %s)' % (
msg.get('CodeDescription', 'Unknown error') ,
@@ -214,7 +197,7 @@
def __init__(self, msg):
"""
- @param msg: (Message) PutFailed message
+ @param msg: (Message) PutFailed message or its parameters dict
"""
self.value = '%s (%s, %s)' % (
msg.get('CodeDescription', 'Unknown error') ,
@@ -240,7 +223,7 @@
def __init__(self, msg):
"""
- @param msg: (Message) ProtocolError message
+ @param msg: (Message) ProtocolError message or its parameters dict
"""
self.value = '%s (%s, %s)' % (
msg.get('CodeDescription', 'Unknown error') ,
@@ -279,22 +262,13 @@
CanNotPeerWithSelf = '28'
PeerExists = '29'
OpennetDisabled = '30'
- DarknetOnly = '31'
+ DarknetPeerOnly = '31'
class SocketError(Exception): pass
+class FcpError(Exception): pass
#**********************************************************************
# functions
#**********************************************************************
-def fcpBool(pythonBool):
- """Converts a python bool to a fcp bool
- @param pythonBool: (bool)
- @return: (str) 'true' or 'false'
- """
- if pythonBool:
- return 'true'
- return 'false'
-
-
def newIdentifier(prefix=None):
"""Returns a new unique identifier
@return: (str) uuid
@@ -304,14 +278,6 @@
return str(uuid.uuid4())
-def pythonBool(fcpBool):
- """Converts a fcp bool to a python bool
- @param pythonBool: 'true' or 'false'
- @return: (bool) True or False
- """
- return fcpBool == 'true'
-
-
def saveReadFile(fpath):
"""Reads contents of a file in the savest manner possible
@param fpath: file to write
@@ -692,463 +658,7 @@
out.append('EndMessage\n')
return '\n'.join(out)
-
#**************************************************************************
-# jobs
-#**************************************************************************
-#TODO: maybe remove syncron functionality and rely only on signals
-# ...if so, remove timeStart, timeStop aswell.. leave up to caller
-class JobBase(object):
- """Base class for jobs"""
-
-
- def __init__(self, fcpClient, identifier, message):
- """
- @param fcpClient: FcpClient() instance
- @param identifier: (str) identifier of the job
- @param message: (Message) to send to the node whne the job ist started
- @ivar jobClient: FcpClient() instance of the job
- @ivar jobIdentifier: identifier of the job
- @ivar jobMessage: message to be send to the node
- @ivar jobResult: if no error was encountered, holding the result of the job when complete
- @ivar jobTimeStart: time the job was started
- @ivar jobTimeStop: time the job was stopped
- """
- self.jobClient = fcpClient
- self.jobIdentifier = identifier
- self.jobMessage = message
- self.jobResult = None
- self.jobTimeStart = 0
- self.jobTimeStop = 0
-
-
- def handleMessage(self, msg):
- return False
-
-
- def handleStart(self):
- """Starts the job"""
- self.jobResult = None
- self.jobTimeStart = time.time()
- self.jobClient.sendMessageEx(self.jobMessage)
-
-
- # XXX
- def handleStop(self, flagError, msg):
- """Called on job completion to stop the job
- @param flagError: True if an error was encountered, False otherwise
- @param msg: (Message) to pass to the job
- """
- self.jobTimeStop = time.time()
- self.jobResult = (flagError, msg)
-
-
-class JobClientHello(JobBase):
- """Sed a ClientHello message to the node
-
- @note: this must be the first message passed to the node. If everything
- goes well, you will get a NodeHello in response.
- """
-
- def __init__(self, fcpClient, name=None, expectedVersion='2.0'):
- """
- @param name: (str) connection name or None, to use an arbitrary name
- @param expectedVersion: (str) node version expected
- """
- message = Message(
- Message.ClientHello,
- Name=name if name is not None else newIdentifier(),
- ExpectedVersion=expectedVersion,
- )
- JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ClientHello, message)
-
- def displayName(self):
- return 'ClientHello'
-
-
- def handleMessage(self, msg):
- if msg.name == Message.NodeHello:
- return self.handleNodeHello(msg)
- elif msg.name == Message.ProtocolError:
- return self.handleProtocolError(msg)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handleNodeHello(self, msg):
- self.jobTimeStop = time.time()
- self.jobResult = msg
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
- def handleProtocolError(self, msg):
- raise ProtocolError(msg)
-
-
-
-
-
-
-class JobListPeers(JobBase):
- """Lists all known peers of the node
- """
-
- def __init__(self, fcpClient, withMetaData=True, withVolantile=True):
- """
- @param withMetaData: include meta data for each peer?
- @param withVolantile: include volantile data for each peer?
- @ivar jobResult: on job completion, will be a list containing all perrs as one 'Peer' message for each peer
- """
- message = Message(
- Message.ListPeers,
- WithMetadata=fcpBool(withMetaData),
- WithVolatile=fcpBool(withVolantile),
- )
- JobBase.__init__(self, fcpClient, FixedJobIdentifiers.ListPeers, message)
-
-
- def handleStart(self):
- JobBase.handleStart(self)
- self.jobClient.EventListPeers()
-
-
- def handleMessage(self,msg):
- if msg.name == Message.EndListPeers:
- return self.handleEndListPeers(msg)
- elif msg.name == Message.Peer:
- return self.handlePeer(msg)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handlePeer(self, msg):
- self.jobClient.EventListNextPeer(msg.params)
- if self.jobResult is None:
- self.jobResult = [msg.params, ]
- else:
- self.jobResult.append(msg.params)
- return True
-
-
- def handleEndListPeers(self, msg):
- self.jobClient.EventEndListPeers()
- self.jobTimeStop = time.time()
- if self.jobResult is None:
- self.jobResult = []
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
-
-class JobListPeerNotes(JobBase):
- """Lists all notes associated to a peer of the node
- """
-
- def __init__(self, fcpClient, identifier):
- """
- @param identifier: identifier of the peer to list notes for (peer identity)
- @ivar jobResult: on job completion will be a list containing all notes associated to the peer
-
- @note: notes are only available for darknet peers (opennet == false)
- """
-
- message = Message(
- Message.ListPeerNotes,
- NodeIdentifier=identifier
- )
- JobBase.__init__(self, fcpClient, IdentifierPrefix.PeerNote + identifier, message)
-
-
- def handleStart(self):
- JobBase.handleStart(self)
- self.jobClient.EventListPeerNotes()
-
-
- def handleMessage(self,msg):
- if msg.name == Message.EndListPeerNotes:
- return self.handleEndListPeerNotes(msg)
- elif msg.name == Message.PeerNote:
- return self.handlePeerNote(msg)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
- def handlePeerNote(self, msg):
- note = msg.get('NoteText', '')
- self.jobClient.EventListNextPeerNote(note)
- if note:
- note = base64.decodestring(note)
- if self.jobResult is None:
- self.jobResult = [note, ]
- else:
- self.jobResult.append(note)
- return True
-
-
- def handleEndListPeerNotes(self, msg):
- self.jobClient.EventEndListPeerNotes()
- self.jobTimeStop = time.time()
- if self.jobResult is None:
- self.jobResult = []
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
-
-
-#TODO: identifier collisions are not yet handled
-class JobGetFileInfo(JobBase):
- """Tries to retieve information about a file. If everything goes well
-
- On completion fcpResult will hold a tuple((str) content-type, (str) size) of the file.
- Note, that both members may be '' (empty string)
- """
-
-
- # idea is to provoke a GetFailed message and take mimetype and size from 'GetFailed'
- def __init__(self, fcpClient, uri, **params):
- """
- @param fcpClient: FcpClient() instance
- @param uri: uri of the file to retrieve info for
- @param params: additional parameters:
- IgnoreDS='true' / 'false'
- DSOnly='true' / 'false'
- MaxRetries=-1 ...N
- PriorityClass=Priority*
-
- @ivar jobResult: will be a tuple(bool error, data). If error is True, no information could be
- retrieved and data will be a GetFailed message containing details. If error is False
- data will be a tuple(str metadataContentType, str size). Note that both may be empty
- string and size may not be accurate.
-
- """
- identifier = IdentifierPrefix.ClientGet + newIdentifier()
- message = Message(
- Message.ClientGet,
- Identifier=identifier,
- URI=uri,
- # suggested by Mathew Toseland to use about 32k for mimeType requests
- # basic sizes of keys are: 1k for SSks and 32k for CHKs
- MaxSize='32000',
- ReturnType='none',
- Verbosity='1',
- **params
- )
- JobBase.__init__(self, fcpClient, identifier, message)
-
-
- def getPrority(self):
- return self.jobMessage.get('PriorityClass', Priorities.PriorityDefault)
-
-
- def setPriority(self, priority):
- if not priority in Priorities:
- raise ValueError('Invalid priority: %r' % priority)
- self.jobClient.sendMessage(
- Message.ModifyPersistentRequest,
- Identifier=self.jobIdentifier,
- Global=fcpBool(False),
- PriorityClass=priority,
- )
- # not shure if the response arrives in any case, so set it here
- self.jobMessage['PriorityClass'] = priority
-
-
- def stopRequest(self):
- self.jobClient.sendMessage(
- Message.RemovePersistentRequest,
- Global=fcpBool(False),
- Identifier=self.jobIdentifier,
- )
-
-
- def handleMessage(self, msg):
- if msg.name == Message.DataFound:
- return self.handleDataFound(msg)
- elif msg.name == Message.GetFailed:
- return self.handleGetFailed(msg)
- elif msg.name == Message.IdentifierCollision:
- return self.handleIdentifierCollision(msg)
- elif msg.name == Message.PersistentRequestModified:
- return self.handlePersistentRequestModified(msg)
- elif msg.name == Message.PersistentRequestRemoved:
- return self.handlePersistentRequestRemoved(msg)
- elif msg.name == Message.SimpleProgress:
- return self.handleSimpleProgress(msg)
-
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handleDataFound(self, msg):
- self.jobTimeStop = time.time()
- self.jobResult = (
- False,
- (
- msg.get('Metadata.ContentType', ''),
- msg.get('DataLength', '')
- )
- )
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
- def handleGetFailed(self, msg):
- self.jobTimeStop = time.time()
- if msg['Code'] == FetchError.TooBig:
- self.jobResult = (False, msg)
- self.jobResult = (
- False,
- (
- msg.get('ExpectedMetadata.ContentType', ''),
- msg.get('ExpectedDataLength', '')
- )
- )
- else:
- self.jobResult = (True, msg)
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
- def handleIdentifierCollision(self, msg):
- raise
-
-
- def handleSimpleProgress(self, msg):
- return True
-
-
- def handlePersistentRequestModified(self, msg):
- priorityClass = msg.get('PriorityClass', None)
- if priorityClass is not None:
- self.jobMessage['PriorityClass'] = priorityClass
- return True
-
- def handlePersistentRequestRemoved(self, msg):
- if self.jobClient.jobIsRunning(self.jobIdentifier):
- self.jobClient.jobStop(self.jobIdentifier)
- return True
-
-
-#TODO: handle case where directories are registered multiple times
-class JobTestDDA(JobBase):
- """Tests a directory for read / write accesss
- """
-
-
- def __init__(self, fcpClient, directory, read=False, write=False):
- """
-
- @ivar jobResult: when the job is complete this will be set to a tuple(bool readAllowed, bool writeAllowed)
- """
- if not os.path.isdir(directory):
- raise ValueError('No such directory: %r' % directory)
-
- message = Message(
- Message.TestDDARequest,
- Directory=directory,
- WantReadDirectory=fcpBool(read),
- WantWriteDirectory=fcpBool(write),
- )
-
- JobBase.__init__(self, fcpClient, directory, message)
- self.jobTmpFile = None
-
-
- def handleMessage(self, msg):
- if msg.name == Message.TestDDAReply:
- return self.handleTestDDAReply(msg)
- elif msg.name == Message.TestDDAComplete:
- return self.handleTestDDAComplete(msg)
- elif msg.name == Message.ProtocolError:
- return self.handleProtocolError(msg)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handleProtocolError(self, msg):
- # most likely code 7 here...
- # "Both WantReadDirectory and WantWriteDirectory are set to false: what's the point of sending a message?"
- # ..a stupid response that is ;-)
- self.jobTimeStop = time.time()
- self.jobClient.jobRemove(self.jobIdentifier)
- if msg['Code'] == ProtocolError.InvalidMessage:
- self.jobResult = (False, False)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handleTestDDAReply(self, msg):
- fpathWrite = msg.params.get('WriteFilename', None)
- fpathRead = msg.params.get('ReadFilename', None)
- readContent = ''
- if fpathWrite is not None:
- written = saveWriteFile(fpathWrite, msg['ContentToWrite'])
- if not written:
- if os.path.isfile(fpathWrite):
- os.remove(fpathWrite)
- else:
- self.jobTmpFile = fpathWrite
-
- if fpathRead is not None:
- readContent = saveReadFile(fpathRead)
- if readContent is None:
- readContent = ''
-
- self.jobClient.sendMessage(
- Message.TestDDAResponse,
- Directory=msg['Directory'],
- ReadContent=readContent,
- )
- return True
-
-
- def handleTestDDAComplete(self, msg):
- self.jobTimeStop = time.time()
- self.jobResult = (
- pythonBool(msg.get('ReadDirectoryAllowed', 'false')),
- pythonBool(msg.get('WriteDirectoryAllowed', 'false')),
- )
- saveRemoveFile(self.jobTmpFile)
- self.jobTmpFile = None
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
-
-class JobGenerateSSK(JobBase):
- """Job to generate a SSK key pair
- """
-
-
- def __init__(self, fcpClient):
- """
- @ivar jobResult: on job completion, a tuple(insertURI, requestURI) of the generated
- SSK key
- """
-
- identifier = newIdentifier()
- message = Message(
- Message.GenerateSSK,
- Identifier=identifier,
- )
- JobBase.__init__(self, fcpClient, identifier, message)
-
-
- def handleMessage(self, msg):
- if msg.name == Message.SSKKeypair:
- return self.handleSSKKeypair(msg)
- else:
- raise ValueError('Unexpected message: %s' % msg.name)
-
-
- def handleSSKKeypair(self, msg):
- self.jobTimeStop = time.time()
- self.jobResult = (msg['InsertURI'], msg['RequestURI'])
- self.jobClient.jobRemove(self.jobIdentifier)
- return True
-
-
-#**************************************************************************
# fcp client
#**************************************************************************
class LogMessages:
@@ -1169,7 +679,7 @@
KeyboardInterrupt = 'Keyboard interrupt'
- SocketDead = 'Socket is dead'
+ SocketDied = 'Socket died'
#TODO: no idea what happens on reconnect if socket died. What about running jobs?
@@ -1181,42 +691,62 @@
"""
_events_ = (
+
+ #Peer related events
'EventListPeers',
- 'EventListNextPeer',
'EventEndListPeers',
+ 'EventPeer',
+ 'EventPeerRemoved',
+ 'EventUnknownIdentifier',
'EventListPeerNotes',
- 'EventListNextPeerNote',
'EventEndListPeerNotes',
+ 'EventPeerNote',
+ 'EventShutdown',
+ 'EventSocketDied',
+
+ # get / put related events
+ 'EventIdentifierCollision',
+
+ 'EventFileInfo',
+ 'EventFileInfoProgress',
+
+ 'EventDataFound',
+ 'EventGetFailed',
+ 'EventSimpleProgress',
+ 'EventPersistentRequestModified',
+ 'EventPersistentRequestRemoved',
+
+
+ # others
+ 'EventSSKKeypair',
+
)
+
+ Version = '2.0'
+ FcpTrue = 'true'
+ FcpFalse = 'false'
def __init__(self,
name='',
- errorHandler=None,
+ connectionName=None,
verbosity=Verbosity.Warning,
logMessages=LogMessages
):
"""
@param name: name of the client instance or '' (for debugging)
- @param errorHandler: will be called if the socket conncetion to the node is dead
- with two params: SocketError + details. When the handler is called the client
- is already closed.
+ @param conectionName: name of the connection
@param verbosity: verbosity level for debugging
@param logMessages: LogMessages class containing message strings
"""
- self._isConnected = False
- self._jobs = {
- 'Jobs': {},
- 'PendingJobs': [],
- 'RegisteredDirectories': [],
- }
- self._errorHandler = errorHandler #TODO: check if necessary!
+ self._connectionName = connectionName
+ self._ddaTmpFiles = []
self._log = logging.getLogger(name)
self._logMessages = logMessages
- self._lock = thread.allocate_lock() # lock when resources are accessed
+ self._lock = thread.allocate_lock()
self._socket = None
self.setVerbosity(verbosity)
@@ -1230,6 +760,10 @@
if self._socket is not None:
self._socket.close()
self._socket = None
+
+ # clean left over tmp files
+ for fpath in self._ddaTmpFiles:
+ saveRemoveFile(fpath)
#TODO: an iterator would be nice to enshure Guis stay responsitive in the call
@@ -1245,8 +779,8 @@
self._log.info(self._logMessages.Connecting)
# poll untill freenet responds
- time_elapsed = 0
- while time_elapsed <= repeat:
+ timeElapsed = 0
+ while timeElapsed <= repeat:
# try to Connect socket
if self._socket is not None:
@@ -1265,12 +799,15 @@
# but instad of responding with ClientHelloMustBeFirst
# as expected when not doing so, the node disconnects.
# So take it over here.
- job = JobClientHello(self)
- self.jobAdd(job, synchron=False)
- while time_elapsed <= repeat:
+ self.sendMessage(
+ Message.ClientHello,
+ Name=self._connectionName if self._connectionName is not None else newIdentifier(),
+ ExpectedVersion=self.Version,
+ )
+ while timeElapsed <= repeat:
msg = self.next()
if msg.name == Message.ClientSocketTimeout:
- time_elapsed += SocketTimeout
+ timeElapsed += SocketTimeout
elif msg.name == Message.NodeHello:
return msg.params
else:
@@ -1279,7 +816,7 @@
# continue polling
self._log.info(self._logMessages.ConnectionRetry)
- time_elapsed += timeout
+ timeElapsed += timeout
time.sleep(timeout)
self._log.info(self._logMessages.ConnectingFailed)
@@ -1294,190 +831,172 @@
if msg.name == Message.ClientSocketTimeout:
return True
-
self._log.debug(self._logMessages.MessageReceived + msg.pprint())
-
-
+
if msg.name == Message.ProtocolError:
code = msg['Code']
- if code == ProtocolError.NoLateClientHellos or code == ProtocolError.ClientHelloMustBeFirst:
- return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
-
- elif code == ProtocolError.ShuttingDown:
-
- #TODO: ??? why dispatch to ClientHello.. can't remember
- if not self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg):
-
- # ########################################
- #TODO: ???
-
- return True
-
- else:
- identifier = msg.get('Identifier', None)
- if identifier is None:
- #TODO: inform caller
- raise ProtocolError(msg)
- else:
- return self.jobDispatchMessage(identifier, msg)
-
- else:
+ if code == ProtocolError.ShuttingDown:
+ self.close()
+ self.EventShutdown(msg.params)
+ return True
- # check if the is something like an identifier in the message
- if msg.name == Message.TestDDAReply:
- identifier = msg['Directory']
- elif msg.name == Message.TestDDAComplete:
- identifier = msg['Directory']
- elif msg.name == Message.PeerNote:
- identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier']
- elif msg.name == Message.EndListPeerNotes:
- identifier = IdentifierPrefix.PeerNote + msg['NodeIdentifier']
- else:
- identifier = msg.get('Identifier', None)
-
- # dispatch to jobs with fixed identifiers
- if identifier is None:
- if msg.name == Message.NodeHello:
- return self.jobDispatchMessage(FixedJobIdentifiers.ClientHello, msg)
- elif msg.name == Message.EndListPeers:
- return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
- elif msg.name == Message.Peer:
- return self.jobDispatchMessage(FixedJobIdentifiers.ListPeers, msg)
-
- # more here.....
-
+ raise ProtocolError(msg)
+
+ ####################################################
+ ##
+ ## TestDDA
+ ##
+ ## Note: if both, ReadDirectoryAllowed and WriteDirectoryAllowed are
+ ## set to false, the node sends a ProtocolError (7, 'Invalid message').
+ ## Have to handle this!
+ ##
+ ####################################################
+ elif msg.name == Message.TestDDAReply:
+ fpathWrite = msg.params.get('WriteFilename', None)
+ fpathRead = msg.params.get('ReadFilename', None)
+ readContent = ''
+ if fpathWrite is not None:
+ written = saveWriteFile(fpathWrite, msg['ContentToWrite'])
+ if not written:
+ saveRemoveFile(fpathWrite)
else:
- raise ValueError('Unhandled message: ' + msg.name)
+ self._ddaTmpFiles.append(fpathWrite)
- else:
- return self.jobDispatchMessage(identifier, msg)
+ if fpathRead is not None:
+ readContent = saveReadFile(fpathRead)
+ if readContent is None:
+ readContent = ''
- raise RuntimeError('We should not have gotten here: %s' % msg.name)
-
+ self.sendMessage(
+ Message.TestDDAResponse,
+ Directory=msg['Directory'],
+ ReadContent=readContent,
+ )
+ return True
+
-
- #########################################################
- ## jobs
- #########################################################
- def hasJobsRunning(self):
- """Checks if the client has running jobs
- @return: (bool) True if so, False otherwise
- """
- self._lock.acquire(True)
- try:
- result = self._jobs['Jobs'] or self._jobs['PendingJobs']
- finally:
- self._lock.release()
+ elif msg.name == Message.TestDDAComplete:
+ # clean tmp files
+ for fpath in self._ddaTmpFiles:
+ saveRemoveFile(fpath)
+ self._ddaTmpFiles = []
+ return True
+
+ ####################################################
+ ##
+ ## Peer related messages
+ ##
+ ####################################################
+ elif msg.name == Message.EndListPeers:
+ self.EventEndListPeers(msg.params)
+ return True
+
+ elif msg.name == Message.EndListPeerNotes:
+ self.EventEndListPeerNotes(msg.params)
+ return True
+ elif msg.name == Message.Peer:
+ self.EventPeer(msg.params)
+ return True
+ elif msg.name == Message.PeerNote:
+ note = msg.get('NoteText', '')
+ if note:
+ note = base64.decodestring(note)
+ msg['NoteText'] = note
+ self.EventPeerNote(msg.params, note)
+ return True
+
+ elif msg.name == Message.PeerRemoved:
+ self.EventPeerRemoved(msg.params)
+ return True
+
+ elif msg.name == Message.UnknownNodeIdentifier:
+ self.EventUnknownIdentifier(msg.params)
+ return True
+
+ ####################################################
+ ##
+ ## Get related messages
+ ##
+ ####################################################
+ elif msg.name == Message.DataFound:
+ if msg['Identifier'].startswith(IdentifierPrefix.FileInfo):
+ self.EventFileInfo(msg.params)
+ return True
+
+ #TODO:
+ self.EventDataFound(msg.params)
+ return True
+
- return result
-
-
- #TODO: not quite clear about the consequences of a synchron job. Have to think this over
- def jobAdd(self, job, synchron=False):
- """Adds a job to the client
- @param job: (Job*) job to add
- @param synchron: if True, wait untill the job is completed, if False return emidiately
- """
- self._lock.acquire(True)
- try:
- if job.jobIdentifier in self._jobs['Jobs']:
- raise ValueError('Duplicate job: %r' % job.jobIdentifier)
- self._jobs['Jobs'][job.jobIdentifier] = job
- finally:
- self._lock.release()
-
- self._log.info(self._logMessages.JobStart + job.jobMessage.name)
- job.handleStart()
- if synchron:
- while self.jobGet(job.jobIdentifier):
- self.next()
+ elif msg.name == Message.GetFailed:
+ code = msg['Code']
+ if code == FetchError.TooBig:
+ if msg['Identifier'].startswith(IdentifierPrefix.FileInfo):
+ params = {
+ 'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''),
+ 'DataLength': msg.get('ExpectedDataLength', '')
+ }
+ self.EventFileInfo(params)
+ return True
+
+ self.EventGetFailed(msg.params)
+ return True
+
+
+ elif msg.name == Message.SimpleProgress:
+ if msg['Identifier'].startswith(IdentifierPrefix.FileInfo):
+ self.EventFileInfoProgress(msg.params)
+ else:
+ self.EventSimpleProgress(msg.params)
+ return True
+
+
+ elif msg.name == Message.IdentifierCollision:
+ self.EventIdentifierCollision(msg.params)
+ return True
+
+ elif msg.name == Message.PersistentRequestModified:
+ self.EventPersistentRequestModified(msg.params)
+ return True
- def jobDispatchMessage(self, identifier, msg):
- """Dispatches a message to a job
- @param identifier: identifier of the job
- @param msg: (Message) message to dispatch
- @return: True if the message was handled, False otherwise
- """
- job = self.jobGet(identifier)
- if job is not None:
- return job.handleMessage(msg)
- return False
+ elif msg.name == Message.PersistentRequestRemoved:
+ self.EventPersistentRequestRemoved(msg.params)
+ return True
-
- def jobGet(self, identifier):
- """Returns a job given its identifier
- @param identifier: identifier of the job
- @return: (Job*) instance or None, if no corrosponding job was found
- """
- self._lock.acquire(True)
- try:
- result = self._jobs['Jobs'].get(identifier, None)
- finally:
- self._lock.release()
- return result
-
-
- def jobIsRunning(self, identifier):
- """Checks if a job is running
- @param identifier: identifier of the job
- @return: True if so, False otherwise
- """
- self._lock.acquire(True)
- try:
- result = identifier in self._jobs['Jobs']
- finally:
- self._lock.release()
- return result
+ ####################################################
+ ##
+ ## Others
+ ##
+ ####################################################
+ elif msg.name == Message.SSKKeypair:
+ self.EventSSKKeypair(msg.params)
+ return True
- def jobRemove(self, identifier):
- """Removes a job unconditionally
- @param identifier: identifier of the job to remove
- @return: True if the job was found, False otherwise
- """
- self._lock.acquire(True)
- try:
- job = self._jobs['Jobs'].get(identifier, None)
- if job is not None:
- del self._jobs['Jobs'][identifier]
- finally:
- self._lock.release()
- if job is None:
- return False
- self._log.info(self._logMessages.JobStop + job.jobMessage.name)
- return True
+ ## default
+ return False
- #TODO: some info when all jobs are completed?
+ #########################################################
+ ##
+ ##
+ ##
+ #########################################################
def next(self):
"""Pumps the next message waiting
@note: use this method instead of run() to run the client step by step
"""
msg = Message.fromSocket(self._socket)
if msg.name == Message.ClientSocketDied:
+ self.EventSocketDied(msg['Exception'], msg['Details'])
raise SocketError(msg['Details'])
self.handleMessage(msg)
return msg
-
- d...
[truncated message content] |
|
From: <jU...@us...> - 2007-10-31 10:00:18
|
Revision: 23
http://fclient.svn.sourceforge.net/fclient/?rev=23&view=rev
Author: jUrner
Date: 2007-10-31 03:00:15 -0700 (Wed, 31 Oct 2007)
Log Message:
-----------
refactored the code and added some more protocol methods and events
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-30 15:11:02 UTC (rev 22)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-10-31 10:00:15 UTC (rev 23)
@@ -1,10 +1,9 @@
-'''Freenet client protocol 2.0 implementation
+"""Freenet client protocol 2.0 implementation
@newfield event, events
-
Sample code::
client = FcpClient()
@@ -12,45 +11,8 @@
if nodeHello is not None:
# do whatever
-
+"""
-Most method calls can be made either synchron or asynchron::
-
- peers = client.peerList(synchron=True)
- for peer in peers:
- # do whatever
-
-
-To get informed about asynchron events you should connect the relevant events the client provides::
-
- # connect to one single event
- client.EventListNextPeer += MyCallback
-
- # connect to multiple events at once
- client += (
- (client.EventListPeers, MyCallback1),
- (client.EventEndListPeers, MyCallback2),
- )
-
- # each callback is called with the event as first parameter, followed by additional parameters,
- # depending on the event triggered.
- def MyListNextPeerCallback(event, peer):
- print peer
-
- client.peerList(synchron=False)
-
-
- # when event notifications are no longer required, you should always make shure to disconnect from them
- client.EventListNextPeer -= MyCallback
- client -= (
- (client.EventListPeers, MyCallback1),
- (client.EventEndListPeers, MyCallback2),
- )
-
-
-
-'''
-
import atexit
import base64
import logging
@@ -92,192 +54,9 @@
DefaultFcpPort = 9481
SocketTimeout = 0.1
-
-class IdentifierPrefix:
- """Special purpose identifier prefixes"""
-
- FileInfo = 'FileInfo::'
-
-
-class Verbosity:
- Debug = logging.DEBUG
- Info = logging.INFO
- Warning = logging.WARNING
-
-
-class Priorities:
- """All priorities supported by the client"""
-
- Maximum = '0'
- Interactive = '1'
- SemiInteractive = '2'
- Updatable = '3'
- Bulk = '4'
- Prefetch = '5'
- Minimum = '6'
-
- PriorityMin = Minimum
- PriorityDefault = Bulk
-
-
-#TODO: no idea how fcp handles strings as in <Peer volatile.status=CONNECTED>
-# all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED
-# in --> freenet/node/PeerManager.java
-class PeerNodeStatus:
- Connected = 1
- RoutingBackedOff = 2
- TooNew = 3
- TooOld = 4
- Disconnected = 5
- NeverConnected = 6
- Disabled = 7
- Bursting = 8
- Listening = 9
- ListenOnly = 10
- ClockProblem = 11
- ConnError = 12
- Disconnecting = 13
-
-
-
-class PeerNoteType:
- """All known peer note types"""
- Private = '1'
-
-
-#************************************************************************************
-# exceptions
-#************************************************************************************
-class FetchError(Exception):
- """All fetch errors supported by the client"""
-
- def __init__(self, msg):
- """
- @param msg: (Message) GetFailed message or its parameters dict
- """
- self.value = '%s (%s, %s)' % (
- msg.get('CodeDescription', 'Unknown error') ,
- msg['Code'],
- msg.get('ExtraDescription', '...'),
- )
- def __str__(self): return self.value
-
- MaxArchiveRecursionExceeded = '1'
- UnknownSplitfileMetadata = '2'
- UnknownMetadata = '3'
- InvalidMetadata = '4'
- ArchiveFailure = '5'
- BlockDecodeError = '6'
- MaxMetadataLevelsExceeded = '7'
- MaxArchiveRestartsExceeded = '8'
- MaxRecursionLevelExceeded = '9'
- NotAnArchve = '10'
- TooManyMetastrings = '11'
- BucketError = '12'
- DataNotFound = '13'
- RouteNotFound = '14'
- RejectedOverload = '15'
- TooManyRedirects = '16'
- InternalError = '17'
- TransferFailed = '18'
- SplitfileError = '19'
- InvalidUri = '20'
- TooBig = '21'
- MetadataTooBig = '22'
- TooManyBlocks = '23'
- NotEnoughMetastrings = '24'
- Canceled = '25'
- ArchiveRestart = '26'
- PermanentRedirect = '27'
- NotAllDataFound = '28'
-
-
-class InsertError(Exception):
- """All insert errors supported by the client"""
-
- def __init__(self, msg):
- """
- @param msg: (Message) PutFailed message or its parameters dict
- """
- self.value = '%s (%s, %s)' % (
- msg.get('CodeDescription', 'Unknown error') ,
- msg['Code'],
- msg.get('ExtraDescription', '...'),
- )
- def __str__(self): return self.value
-
- InvalidUri = '1'
- BucketError = '2'
- InternalError = '3'
- RejectedOverload = '4'
- RouteNotFound = '5'
- FatalErrorInBlocks = '6'
- TooManyRetriesInBlock = '7'
- RouteReallyNotFound = '8'
- Collision = '9'
- Canceled = '10'
-
-
-class ProtocolError(Exception):
- """All protocol errors supported by the client"""
-
- def __init__(self, msg):
- """
- @param msg: (Message) ProtocolError message or its parameters dict
- """
- self.value = '%s (%s, %s)' % (
- msg.get('CodeDescription', 'Unknown error') ,
- msg['Code'],
- msg.get('ExtraDescription', '...'),
- )
- def __str__(self): return self.value
-
- ClientHelloMustBeFirst = '1'
- NoLateClientHellos = '2'
- MessageParseError = '3'
- UriParseError = '4'
- MissingField = '5'
- ErrorParsingNumber = '6'
- InvalidMessage = '7'
- InvalidField = '8'
- FileNotFound = '9'
- DiskTargetExists = '10'
- SameDirectoryExpected = '11'
- CouldNotCreateFile = '12'
- CouldNotWriteFile = '13'
- CouldNotRenameFile = '14'
- NoSuchIdentifier = '15'
- NotSupported = '16'
- InternalError = '17'
- ShuttingDown = '18'
- NoSuchNodeIdentifier = '19' # Unused since 995
- UrlParseError = '20'
- ReferenceParseError = '21'
- FileParseError = '22'
- NotAFile = '23'
- AccessDenied = '24'
- DDADenied = '25'
- CouldNotReadFile = '26'
- ReferenceSignature = '27'
- CanNotPeerWithSelf = '28'
- PeerExists = '29'
- OpennetDisabled = '30'
- DarknetPeerOnly = '31'
-
-class SocketError(Exception): pass
-class FcpError(Exception): pass
#**********************************************************************
-# functions
+# helpers
#**********************************************************************
-def newIdentifier(prefix=None):
- """Returns a new unique identifier
- @return: (str) uuid
- """
- if prefix:
- return prefix + str(uuid.uuid4())
- return str(uuid.uuid4())
-
-
def saveReadFile(fpath):
"""Reads contents of a file in the savest manner possible
@param fpath: file to write
@@ -330,358 +109,14 @@
fp.close()
return written
-def startFreenet(cmdline):
- """Starts freenet
- @param cmdline: commandline to start freenet (like '/freenet/run.sh start' or 'c:\freenet\start.bat')
- @return: (string) whatever freenet returns
- """
- #TODO: on windows it may be necessary to hide the command window
- p = subprocess.Popen(
- args=cmdline,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- stdout, stderr = p.communicate()
- return stdout
-
#**********************************************************************
# classes
#**********************************************************************
-class FcpUri(object):
- """Wrapper class for freenet uris"""
-
-
- KeySSK = 'SSK@'
- KeyKSK = 'KSK@'
- KeyCHK = 'CHK@'
- KeyUSK = 'USK@'
- KeySVK = 'SVK@'
- KeyUnknown = ''
- KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK)
-
- ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I)
- ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I)
-
-
- def __init__(self, uri):
- """
- @param uri: uri to wrap
- @param cvar ReUriPattern: pattern matching a freenet uri
- @param cvar ReKeyPattern: pattern matching the key type of a freenet uri
-
- @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible
-
-
- >>> uri = FcpUri('freenet:SSK@foo/bar')
- >>> str(uri)
- 'SSK@foo/bar'
- >>> uri.keyType() == FcpUri.KeySSK
- True
- >>> uri.split()
- ('SSK@foo', 'bar')
- >>> uri.fileName()
- 'bar'
-
- >>> uri = FcpUri('http://SSK@foo/bar')
- >>> str(uri)
- 'SSK@foo/bar'
-
- # uris not containing freenet keys are left unchanged
- >>> uri = FcpUri('http://foo/bar')
- >>> str(uri)
- 'http://foo/bar'
- >>> uri.keyType() == FcpUri.KeyUnknown
- True
- >>> uri.split()
- ('http://foo/bar', '')
- >>> uri.fileName()
- 'http://foo/bar'
-
- """
- self._uri = uri
-
- result = self.ReUriPattern.search(uri)
- if result is not None:
- self._uri = result.group(0)
-
- def __str__(self):
- return str(self._uri)
-
- def __unicode__(self):
- return unicode(self._uri)
-
- def keyType(self):
- """Retuns the key type of the uri
- @return: one of the Key* consts
- """
- result = self.ReKeyPattern.search(self._uri)
- if result is not None:
- return result.group(0).upper()
- return self.KeyUnknown
-
- def split(self):
- """Splits the uri
- @return: tuple(freenet-key, file-name)
- """
- if self.keyType() != self.KeyUnknown:
- head, sep, tail = self._uri.partition('/')
- return head, tail
- return self._uri, ''
-
- def fileName(self):
- """Returns the filename part of the uri
- @return: str
- """
- head, tail = self.split()
- if tail:
- return tail
- return self._uri
-
-class Message(object):
- """Class wrapping a freenet message"""
-
- __slots__ = ('name', 'data', 'params')
-
- # client messages
- ClientHello = 'ClientHello'
- ListPeer = 'ListPeer' # (since 1045)
- ListPeers = 'ListPeers'
- ListPeerNotes = 'ListPeerNotes'
- AddPeer = 'AddPeer'
- ModifyPeer = 'ModifyPeer'
- ModifyPeerNote = 'ModifyPeerNote'
- RemovePeer = 'RemovePeer'
- GetNode = 'GetNode'
- GetConfig = 'GetConfig' # (since 1027)
- ModifyConfig = 'ModifyConfig' # (since 1027)
- TestDDARequest = 'TestDDARequest' # (since 1027)
- TestDDAResponse = 'TestDDAResponse' # (since 1027)
- GenerateSSK = 'GenerateSSK'
- ClientPut = 'ClientPut'
- ClientPutDiskDir = 'ClientPutDiskDir'
- ClientPutComplexDir = 'ClientPutComplexDir'
- ClientGet = 'ClientGet'
- SubscribeUSK = 'SubscribeUSK'
- WatchGlobal = 'WatchGlobal'
- GetRequestStatus = 'GetRequestStatus'
- ListPersistentRequests = 'ListPersistentRequests'
- RemovePersistentRequest = 'RemovePersistentRequest'
- ModifyPersistentRequest = 'ModifyPersistentRequest'
- Shutdown = 'Shutdown'
-
- # node messages
- NodeHello = 'NodeHello'
- CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
- Peer = 'Peer'
- PeerNote = 'PeerNote'
- EndListPeers = 'EndListPeers'
- EndListPeerNotes = 'EndListPeerNotes'
- PeerRemoved = 'PeerRemoved'
- NodeData = 'NodeData'
- ConfigData = 'ConfigData' # (since 1027)
- TestDDAReply = 'TestDDAReply' # (since 1027)
- TestDDAComplete = 'TestDDAComplete' # (since 1027)
- SSKKeypair = 'SSKKeypair'
- PersistentGet = 'PersistentGet'
- PersistentPut = 'PersistentPut'
- PersistentPutDir = 'PersistentPutDir'
- URIGenerated = 'URIGenerated'
- PutSuccessful = 'PutSuccessful'
- PutFetchable = 'PutFetchable'
- DataFound = 'DataFound'
- AllData = 'AllData'
- StartedCompression = 'StartedCompression'
- FinishedCompression = 'FinishedCompression'
- SimpleProgress = 'SimpleProgress'
- EndListPersistentRequests = 'EndListPersistentRequests'
- PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
- PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
- PutFailed = 'PutFailed'
- GetFailed = 'GetFailed'
- ProtocolError = 'ProtocolError'
- IdentifierCollision = 'IdentifierCollision'
- UnknownNodeIdentifier = 'UnknownNodeIdentifier'
- UnknownPeerNoteType = 'UnknownPeerNoteType'
- SubscribedUSKUpdate = 'SubscribedUSKUpdate'
-
- # client messages (internal use only)
- ClientSocketTimeout = 0
- ClientSocketDied = 1
-
-
- def __init__(self, name, data=None, **params):
- """
- @param name: messge name
- @param data: data associated to the messge (not yet implemented)
- @param params: {field-name: value, ...} of parameters of the message
- @note: all params can be accessed as attributes of the class
- """
- self.data = data
- self.name = name
- self.params = params
-
-
- @classmethod
- def bytesFromSocket(clss, socketObj, n):
- """Reads n bytes from socket
- @param socketObj: socket to read bytes from
- @param n: (int) number of bytes to read
- @return: (tuple) error-message or None, bytes read or None if an error occured
- or no bytes could be read
- """
- error = p = None
- try:
- p = socketObj.recv(n)
- if not p:
- p = None
- raise socket.error('Socket shut down by node')
- except socket.timeout, d: # no new messages in queue
- error = clss(clss.ClientSocketTimeout)
- except socket.error, d:
- error = clss(clss.ClientSocketDied, Exception=socket.error, Details=d)
- return error, p
-
-
- @classmethod
- def fromSocket(clss, socketObj):
- """Reads a message from a socket
- @param socketObj: socket to read a message from
- @return: L{Message} next message from the socket. If the socket dies
- unexpectedly a L{ClientSocketDied} message is returned containing the parameters
- 'Exception' and 'Details'. If the socket times out a L{MessageClientSocketTimout}
- message is returned.
- """
-
- msg = clss(None)
- buf = []
-
- #TODO: to buffer or not to buffer?
- while True:
-
- # get next line from socket
- error, p = clss.bytesFromSocket(socketObj, 1)
- if error:
- return error
-
- if p != '\n':
- buf.append(p)
- continue
- #TODO: check if '\r\n' is allowed in freenet client protocol
- else:
- if buf[-1] == '\r':
- del buf[-1]
-
- line = ''.join(buf)
- buf = []
- if line == 'EndMessage':
- break
-
- # first line == message name
- if msg.name is None:
- msg.name = line
-
- # get data member
- elif line == 'Data':
- remaining = int(msg.params['DataLength'])
- msg.data = ''
- while remaining > 0:
- error, p = clss.bytesFromSocket(socketObj, remaining)
- if error:
- return error
- remaining -= len(p)
- msg.data += p
- break
-
- # get next paramater
- else:
- head, sep, tail = line.partition('=')
- msg.params[head] = tail
- # TODO: errorchek params?
- #if not sep: pass
-
- return msg
-
-
- def get(self, name, default=None):
- """Returns the message parameter 'name' or 'default' """
- return self.params.get(name, default)
-
-
- def __getitem__(self, name):
- """Returns the message parameter 'name' """
- return self.params[name]
-
-
- def __setitem__(self, name, value):
- """Sets the message parameter 'name' to 'value' """
- self.params[name] = value
-
-
- def pprint(self):
- """Returns the message as nicely formated human readable string"""
- out = ['', '>>' + self.name, ]
- for param, value in self.params.items():
- out.append('>> %s=%s' % (param, value))
- out.append('>>EndMessage')
- return '\n'.join(out)
-
-
- def send(self, socketObj):
- """Dumps the message to a socket
- @param socketObj: socket to dump the message to
- """
- socketObj.sendall(self.toString())
-
-
- def toString(self):
- """Returns the message as formated string ready to be send"""
-
- #TODO: just a guess, so maybe remove this check
- if isinstance(self.name, (int, long)):
- raise ValueError('You can not send client internal messages to the node')
- out = [self.name, ]
- for param, value in self.params.items():
- out.append('%s=%s' % (param, value))
- if self.data:
- assert 'DataLength' in self.params, 'DataLength member required'
- n = None
- try:
- n = int(self['DataLength'])
- except ValueError: pass
- assert n is not None, 'DataLength member must be an integer'
- assert n == len(self.data), 'DataLength member must corrospond to lenght of data'
- out.append('Data')
- out.append(self.data)
- else:
- out.append('EndMessage\n')
- return '\n'.join(out)
#**************************************************************************
# fcp client
#**************************************************************************
-class LogMessages:
- """Message strings used for log infos"""
- Connecting = 'Connecting to node...'
- Connected = 'Connected to node'
- ConnectionRetry = 'Connecting to node failed... retrying'
- ConnectingFailed = 'Connecting to node failed'
-
- ClientClose = 'Closing client'
-
- MessageSend = 'SendMessage'
- MessageReceived = 'ReceivedMessage'
-
- JobStart = 'Starting job: '
- JobStop = 'Stopping job: '
- JobsCompleted = 'All jobs completed'
-
-
- KeyboardInterrupt = 'Keyboard interrupt'
- SocketDied = 'Socket died'
-
-
#TODO: no idea what happens on reconnect if socket died. What about running jobs?
#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
#TODO: do not mix directories as identifiers with identifiers (might lead to collisions)
@@ -692,6 +127,10 @@
_events_ = (
+ # config related events
+ 'EventConfigData',
+ 'EventNodeData',
+
#Peer related events
'EventListPeers',
'EventEndListPeers',
@@ -707,18 +146,18 @@
'EventSocketDied',
# get / put related events
+ 'EventTestDDAComplete',
'EventIdentifierCollision',
- 'EventFileInfo',
- 'EventFileInfoProgress',
+ 'EventClientGetInfo',
+ 'EventClientGetInfoProgress',
'EventDataFound',
'EventGetFailed',
'EventSimpleProgress',
'EventPersistentRequestModified',
'EventPersistentRequestRemoved',
-
-
+
# others
'EventSSKKeypair',
@@ -727,26 +166,513 @@
Version = '2.0'
FcpTrue = 'true'
FcpFalse = 'false'
+ class FetchError(Exception):
+ """All fetch errors supported by the client"""
+
+ def __init__(self, msg):
+ """
+ @param msg: (Message) GetFailed message or its parameters dict
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+ def __str__(self): return self.value
+
+ MaxArchiveRecursionExceeded = '1'
+ UnknownSplitfileMetadata = '2'
+ UnknownMetadata = '3'
+ InvalidMetadata = '4'
+ ArchiveFailure = '5'
+ BlockDecodeError = '6'
+ MaxMetadataLevelsExceeded = '7'
+ MaxArchiveRestartsExceeded = '8'
+ MaxRecursionLevelExceeded = '9'
+ NotAnArchve = '10'
+ TooManyMetastrings = '11'
+ BucketError = '12'
+ DataNotFound = '13'
+ RouteNotFound = '14'
+ RejectedOverload = '15'
+ TooManyRedirects = '16'
+ InternalError = '17'
+ TransferFailed = '18'
+ SplitfileError = '19'
+ InvalidUri = '20'
+ TooBig = '21'
+ MetadataTooBig = '22'
+ TooManyBlocks = '23'
+ NotEnoughMetastrings = '24'
+ Canceled = '25'
+ ArchiveRestart = '26'
+ PermanentRedirect = '27'
+ NotAllDataFound = '28'
+
+ class FcpError(Exception): pass
+ class FcpUri(object):
+ """Wrapper class for freenet uris"""
+
+
+ KeySSK = 'SSK@'
+ KeyKSK = 'KSK@'
+ KeyCHK = 'CHK@'
+ KeyUSK = 'USK@'
+ KeySVK = 'SVK@'
+ KeyUnknown = ''
+ KeysAll = (KeySSK, KeyKSK, KeyCHK, KeyUSK, KeySVK)
+
+ ReUriPattern = re.compile('(%s.*?)(?= |\Z)' % '.*?|'.join(KeysAll), re.I)
+ ReKeyPattern = re.compile('(%s)' % '|'.join(KeysAll), re.I)
+
+
+ def __init__(self, uri):
+ """
+ @param uri: uri to wrap
+ @param cvar ReUriPattern: pattern matching a freenet uri
+ @param cvar ReKeyPattern: pattern matching the key type of a freenet uri
+
+ @note: any dfecorations prefixing the freenet part of the uri uri are stripped if possible
+
+
+ >>> uri = FcpUri('freenet:SSK@foo/bar')
+ >>> str(uri)
+ 'SSK@foo/bar'
+ >>> uri.keyType() == FcpUri.KeySSK
+ True
+ >>> uri.split()
+ ('SSK@foo', 'bar')
+ >>> uri.fileName()
+ 'bar'
+
+ >>> uri = FcpUri('http://SSK@foo/bar')
+ >>> str(uri)
+ 'SSK@foo/bar'
+
+ # uris not containing freenet keys are left unchanged
+ >>> uri = FcpUri('http://foo/bar')
+ >>> str(uri)
+ 'http://foo/bar'
+ >>> uri.keyType() == FcpUri.KeyUnknown
+ True
+ >>> uri.split()
+ ('http://foo/bar', '')
+ >>> uri.fileName()
+ 'http://foo/bar'
+
+ """
+ self._uri = uri
+
+ result = self.ReUriPattern.search(uri)
+ if result is not None:
+ self._uri = result.group(0)
+
+ def __str__(self):
+ return str(self._uri)
+
+ def __unicode__(self):
+ return unicode(self._uri)
+
+ def keyType(self):
+ """Retuns the key type of the uri
+ @return: one of the Key* consts
+ """
+ result = self.ReKeyPattern.search(self._uri)
+ if result is not None:
+ return result.group(0).upper()
+ return self.KeyUnknown
+
+ def split(self):
+ """Splits the uri
+ @return: tuple(freenet-key, file-name)
+ """
+ if self.keyType() != self.KeyUnknown:
+ head, sep, tail = self._uri.partition('/')
+ return head, tail
+ return self._uri, ''
+
+ def fileName(self):
+ """Returns the filename part of the uri
+ @return: str
+ """
+ head, tail = self.split()
+ if tail:
+ return tail
+ return self._uri
+
+ class IdentifierPrefix:
+ """Special purpose identifier prefixes"""
+ ClientGetInfo = 'ClientGetInfo::'
+
+ class InsertError(Exception):
+ """All insert errors supported by the client"""
+
+ def __init__(self, msg):
+ """
+ @param msg: (Message) PutFailed message or its parameters dict
+ """
+ self.value = '%s (%s, %s)' % (
+ msg.get('CodeDescription', 'Unknown error') ,
+ msg['Code'],
+ msg.get('ExtraDescription', '...'),
+ )
+ def __str__(self): return self.value
+
+ InvalidUri = '1'
+ BucketError = '2'
+ InternalError = '3'
+ RejectedOverload = '4'
+ RouteNotFound = '5'
+ FatalErrorInBlocks = '6'
+ TooManyRetriesInBlock = '7'
+ RouteReallyNotFound = '8'
+ Collision = '9'
+ Canceled = '10'
+
+ class LogMessages:
+ """Message strings used for log infos"""
+ Connecting = 'Connecting to node...'
+ Connected = 'Connected to node'
+ ConnectionRetry = 'Connecting to node failed... retrying'
+ ConnectingFailed = 'Connecting to node failed'
+
+ ClientClose = 'Closing client'
+
+ MessageSend = 'SendMessage'
+ MessageReceived = 'ReceivedMessage'
+
+ JobStart = 'Starting job: '
+ JobStop = 'Stopping job: '
+ JobsCompleted = 'All jobs completed'
+
+ KeyboardInterrupt = 'Keyboard interrupt'
+ SocketDied = 'Socket died'
+
+ class Message(object):
+ """Class wrapping a freenet message"""
+
+ __slots__ = ('name', 'data', 'params')
+
+ # client messages
+ ClientHello = 'ClientHello'
+ ListPeer = 'ListPeer' # (since 1045)
+ ListPeers = 'ListPeers'
+ ListPeerNotes = 'ListPeerNotes'
+ AddPeer = 'AddPeer'
+ ModifyPeer = 'ModifyPeer'
+ ModifyPeerNote = 'ModifyPeerNote'
+ RemovePeer = 'RemovePeer'
+ GetNode = 'GetNode'
+ GetConfig = 'GetConfig' # (since 1027)
+ ModifyConfig = 'ModifyConfig' # (since 1027)
+ TestDDARequest = 'TestDDARequest' # (since 1027)
+ TestDDAResponse = 'TestDDAResponse' # (since 1027)
+ GenerateSSK = 'GenerateSSK'
+ ClientPut = 'ClientPut'
+ ClientPutDiskDir = 'ClientPutDiskDir'
+ ClientPutComplexDir = 'ClientPutComplexDir'
+ ClientGet = 'ClientGet'
+ SubscribeUSK = 'SubscribeUSK'
+ WatchGlobal = 'WatchGlobal'
+ GetRequestStatus = 'GetRequestStatus'
+ ListPersistentRequests = 'ListPersistentRequests'
+ RemovePersistentRequest = 'RemovePersistentRequest'
+ ModifyPersistentRequest = 'ModifyPersistentRequest'
+ Shutdown = 'Shutdown'
+
+ # node messages
+ NodeHello = 'NodeHello'
+ CloseConnectionDuplicateClientName = 'CloseConnectionDuplicateClientName'
+ Peer = 'Peer'
+ PeerNote = 'PeerNote'
+ EndListPeers = 'EndListPeers'
+ EndListPeerNotes = 'EndListPeerNotes'
+ PeerRemoved = 'PeerRemoved'
+ NodeData = 'NodeData'
+ ConfigData = 'ConfigData' # (since 1027)
+ TestDDAReply = 'TestDDAReply' # (since 1027)
+ TestDDAComplete = 'TestDDAComplete' # (since 1027)
+ SSKKeypair = 'SSKKeypair'
+ PersistentGet = 'PersistentGet'
+ PersistentPut = 'PersistentPut'
+ PersistentPutDir = 'PersistentPutDir'
+ URIGenerated = 'URIGenerated'
+ PutSuccessful = 'PutSuccessful'
+ PutFetchable = 'PutFetchable'
+ DataFound = 'DataFound'
+ AllData = 'AllData'
+ StartedCompression = 'StartedCompression'
+ FinishedCompression = 'FinishedCompression'
+ SimpleProgress = 'SimpleProgress'
+ EndListPersistentRequests = 'EndListPersistentRequests'
+ PersistentRequestRemoved = 'PersistentRequestRemoved' # (since 1016)
+ PersistentRequestModified = 'PersistentRequestModified' # (since 1016)
+ PutFailed = 'PutFailed'
+ GetFailed = 'GetFailed'
+ ProtocolError = 'ProtocolError'
+ IdentifierCollision = 'IdentifierCollision'
+ UnknownNodeIdentifier = 'UnknownNodeIdentifier'
+ UnknownPeerNoteType = 'UnknownPeerNoteType'
+ SubscribedUSKUpdate = 'SubscribedUSKUpdate'
+
+ # client messages (internal use only)
+ ClientSocketTimeout = 0
+ ClientSocketDied = 1
+
+
+ def __init__(self, name, data=None, **params):
+ """
+ @param name: messge name
+ @param data: data associated to the messge (not yet implemented)
+ @param params: {field-name: value, ...} of parameters of the message
+ @note: all params can be accessed as attributes of the class
+ """
+ self.data = data
+ self.name = name
+ self.params = params
+
+
+ @classmethod
+ def bytesFromSocket(clss, socketObj, n):
+ """Reads n bytes from socket
+ @param socketObj: socket to read bytes from
+ @param n: (int) number of bytes to read
+ @return: (tuple) error-message or None, bytes read or None if an error occured
+ or no bytes could be read
+ """
+ error = p = None
+ try:
+ p = socketObj.recv(n)
+ if not p:
+ p = None
+ raise socket.error('Socket shut down by node')
+ except socket.timeout, d: # no new messages in queue
+ error = clss(clss.ClientSocketTimeout)
+ except socket.error, d:
+ error = clss(clss.ClientSocketDied, Exception=socket.error, Details=d)
+ return error, p
+
+
+ @classmethod
+ def fromSocket(clss, socketObj):
+ """Reads a message from a socket
+ @param socketObj: socket to read a message from
+ @return: L{Message} next message from the socket. If the socket dies
+ unexpectedly a L{ClientSocketDied} message is returned containing the parameters
+ 'Exception' and 'Details'. If the socket times out a L{MessageClientSocketTimout}
+ message is returned.
+ """
+
+ msg = clss(None)
+ buf = []
+
+ #TODO: to buffer or not to buffer?
+ while True:
+
+ # get next line from socket
+ error, p = clss.bytesFromSocket(socketObj, 1)
+ if error:
+ return error
+
+ if p != '\n':
+ buf.append(p)
+ continue
+ #TODO: check if '\r\n' is allowed in freenet client protocol
+ else:
+ if buf[-1] == '\r':
+ del buf[-1]
+
+ line = ''.join(buf)
+ buf = []
+ if line == 'EndMessage':
+ break
+
+ # first line == message name
+ if msg.name is None:
+ msg.name = line
+
+ # get data member
+ elif line == 'Data':
+ remaining = int(msg.params['DataLength'])
+ msg.data = ''
+ while remaining > 0:
+ error, p = clss.bytesFromSocket(socketObj, remaining)
+ if error:
+ return error
+ remaining -= len(p)
+ msg.data += p
+ break
+
+ # get next paramater
+ else:
+ head, sep, tail = line.partition('=')
+ msg.params[head] = tail
+ # TODO: errorchek params?
+ #if not sep: pass
+
+ return msg
+
+
+ def get(self, name, default=None):
+ """Returns the message parameter 'name' or 'default' """
+ return self.params.get(name, default)
+
+
+ def __getitem__(self, name):
+ """Returns the message parameter 'name' """
+ return self.params[name]
+
+
+ def __setitem__(self, name, value):
+ """Sets the message parameter 'name' to 'value' """
+ self.params[name] = value
+
+
+ def pprint(self):
+ """Returns the message as nicely formated human readable string"""
+ out = ['', '>>' + self.name, ]
+ for param, value in self.params.items():
+ out.append('>> %s=%s' % (param, value))
+ out.append('>>EndMessage')
+ return '\n'.join(out)
+
+
+ def send(self, socketObj):
+ """Dumps the message to a socket
+ @param socketObj: socket to dump the message to
+ """
+ socketObj.sendall(self.toString())
+
+
+ def toString(self):
+ """Returns the message as formated string ready to be send"""
+
+ #TODO: just a guess, so maybe remove this check
+ if isinstance(self.name, (int, long)):
+ raise ValueError('You can not send client internal messages to the node')
+ out = [self.name, ]
+ for param, value in self.params.items():
+ out.append('%s=%s' % (param, value))
+ if self.data:
+ assert 'DataLength' in self.params, 'DataLength member required'
+ n = None
+ try:
+ n = int(self['DataLength'])
+ except ValueError: pass
+ assert n is not None, 'DataLength member must be an integer'
+ assert n == len(self.data), 'DataLength member must corrospond to lenght of data'
+ out.append('Data')
+ out.append(self.data)
+ else:
+ out.append('EndMessage\n')
+ return '\n'.join(out)
+
+
+
+ #TODO: no idea how fcp handles strings as in <Peer volatile.status=CONNECTED>
+ # all I could find in the sources where these constants as in PEER_NODE_STATUS_CONNECTED
+ # in --> freenet/node/PeerManager.java
+ class PeerNodeStatus:
+ Connected = 1
+ RoutingBackedOff = 2
+ TooNew = 3
+ TooOld = 4
+ Disconnected = 5
+ NeverConnected = 6
+ Disabled = 7
+ Bursting = 8
+ Listening = 9
+ ListenOnly = 10
+ ClockProblem = 11
+ ConnError = 12
+ Disconnecting = 13
+
+ class PeerNoteType:
+ """All known peer note types"""
+ Private = '1'
+
+ class Priorities:
+ """All priorities supported by the client"""
+ Maximum = '0'
+ Interactive = '1'
+ SemiInteractive = '2'
+ Updatable = '3'
+ Bulk = '4'
+ Prefetch = '5'
+ Minimum = '6'
+
+ PriorityMin = Minimum
+ PriorityDefault = Bulk
+
+ class ProtocolError(Exception):
+ """All protocol errors supported by the client"""
+
+ def __init__(self, msg):...
[truncated message content] |
|
From: <jU...@us...> - 2007-11-01 14:54:29
|
Revision: 26
http://fclient.svn.sourceforge.net/fclient/?rev=26&view=rev
Author: jUrner
Date: 2007-11-01 07:54:34 -0700 (Thu, 01 Nov 2007)
Log Message:
-----------
bit of refactoring
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-01 14:53:13 UTC (rev 25)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-01 14:54:34 UTC (rev 26)
@@ -109,41 +109,34 @@
fp.close()
return written
-#**********************************************************************
-# classes
-#**********************************************************************
-
-
#**************************************************************************
# fcp client
#**************************************************************************
-#TODO: no idea what happens on reconnect if socket died. What about running jobs?
#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
-#TODO: do not mix directories as identifiers with identifiers (might lead to collisions)
-#TODO: how to handle (ProtocolError code 18: Shutting down)?
+#TODO: events should be FcpClient.event.PeerNote not FcpClient.EventPeerNote
class FcpClient(events.Events):
"""Fcp client implementation
"""
_events_ = (
+ 'EventClientConnected',
+ 'EventClientDisconnected',
+
# config related events
'EventConfigData',
'EventNodeData',
#Peer related events
- 'EventListPeers',
'EventEndListPeers',
'EventPeer',
'EventPeerRemoved',
- 'EventUnknownIdentifier',
+ 'EventUnknownNodeIdentifier',
'EventListPeerNotes',
'EventEndListPeerNotes',
'EventPeerNote',
- 'EventShutdown',
- 'EventSocketDied',
# get / put related events
'EventTestDDAComplete',
@@ -166,6 +159,11 @@
Version = '2.0'
FcpTrue = 'true'
FcpFalse = 'false'
+ class DisconnectReason:
+ """Reason for client disconnect"""
+ Shutdown = '1'
+ SocketDied = '2'
+
class FetchError(Exception):
"""All fetch errors supported by the client"""
@@ -692,14 +690,16 @@
saveRemoveFile(fpath)
- #TODO: an iterator would be nice to enshure Guis stay responsitive in the call
+
def connect(self, host=DefaultFcpHost, port=DefaultFcpPort, repeat=20, timeout=0.5):
- """Establishes the connection to a freenet node
+ """Iterator to stablish a connection to a freenet node
@param host: (str) host of th node
@param port: (int) port of the node
@param repeat: (int) how many seconds try to connect before giving up
@param timeout: (int) how much time to wait before another attempt to connect
- @return: (Message) NodeHello if successful,None otherwise
+ @event: EventConnected(event, params). Triggered as soon as the client is connected. Params
+ will be the parameters of the NodeHello message.
+ @return: (Message) NodeHello if successful, None otherwise for the next iteration
"""
self._clientHello = None
self._log.info(self.LogMessages.Connecting)
@@ -716,7 +716,7 @@
try:
self._socket.connect((host, port))
except Exception, d:
- pass
+ yield None
else:
self._log.info(self.LogMessages.Connected)
@@ -733,9 +733,12 @@
while timeElapsed <= repeat:
msg = self.next()
if msg.name == self.Message.ClientSocketTimeout:
- timeElapsed += SocketTimeout
+ timeElapsed += SocketTimeout
+ yield None
elif msg.name == self.Message.NodeHello:
- return msg.params
+ self.EventClientConnected(msg.params)
+ yield msg.params
+ raise StopIteration
else:
break
break
@@ -746,9 +749,11 @@
time.sleep(timeout)
self._log.info(self.LogMessages.ConnectingFailed)
- return None
+ raise StopIteration
+
+
-
+
def handleMessage(self, msg):
"""Handles a message from the freenet node
@param msg: (Message) to handle
@@ -763,7 +768,7 @@
code = msg['Code']
if code == self.ProtocolError.ShuttingDown:
self.close()
- self.EventShutdown(msg.params)
+ self.EventClientDisconnect({'DisconnectReason': DisconnectReason.Shutdown})
return True
raise self.ProtocolError(msg)
@@ -851,7 +856,7 @@
return True
elif msg.name == self.Message.UnknownNodeIdentifier:
- self.EventUnknownIdentifier(msg.params)
+ self.EventUnknownNodeIdentifier(msg.params)
return True
####################################################
@@ -951,8 +956,13 @@
"""
msg = self.Message.fromSocket(self._socket)
if msg.name == self.Message.ClientSocketDied:
- self.EventSocketDied(msg['Exception'], msg['Details'])
- raise SocketError(msg['Details'])
+ params = {
+ 'DisconnectReason': DisconnectReason.SocketDied,
+ 'Exception': msg['Exception'],
+ 'Details': msg['Details']
+ }
+ self.EventClientDisconnected(params)
+ raise self.SocketError(msg['Details'])
self.handleMessage(msg)
return msg
@@ -984,8 +994,13 @@
except socket.error, d:
self._log.info(self.LogMessages.SocketDied)
self.close()
- self.EventSocketDied(socket.error, d)
- raise SocketError(d)
+ params = {
+ 'DisconnectReason': DisconnectReason.SocketDied,
+ 'Exception': socket.error,
+ 'Details': d
+ }
+ self.EventClientDisconnected(params)
+ raise self.SocketError(d)
return msg
#########################################################
@@ -1001,7 +1016,8 @@
return self.FcpTrue if pythonBool else self.FcpFalse
- def newIdentifier(self, prefix=None):
+ @classmethod
+ def newIdentifier(clss, prefix=None):
"""Returns a new unique identifier
@return: (str) uuid
"""
@@ -1009,7 +1025,7 @@
return prefix + str(uuid.uuid4())
return str(uuid.uuid4())
-
+
def pythonBool(self, fcpBool):
"""Converts a fcp bool to a python bool
@param pythonBool: 'true' or 'false'
@@ -1017,6 +1033,15 @@
"""
return fcpBool == self.FcpTrue
+
+ def pythonTime(self, fcpTime):
+ """Converts a fcp time value to a python time value
+ @param fcpTime: (int, str) time to convert
+ @return: (int) python time
+ """
+ fcpTime = int(fcpTime)
+ return fcpTime / 1000
+
########################################################
##
## Config related methods
@@ -1054,14 +1079,14 @@
## Peer related methods
##
########################################################
- def listPeer(self, identifier):
+ def listPeer(self, identity):
self.jobClient.sendMessage(
self.Message.ListPeer,
- NodeIdentifier=identifier,
+ NodeIdentifier=identity,
)
- def listPeerNotes(self, identifier):
+ def listPeerNotes(self, identity):
"""Lists all text notes associated to a peer
@param identifier: peer as returned in a call to L{peerList}
@event: EventListPeerNotes(event).
@@ -1070,7 +1095,7 @@
"""
self.sendMessage(
self.Message.ListPeerNotes,
- NodeIdentifier=identifier
+ NodeIdentifier=identity
)
@@ -1079,9 +1104,8 @@
@param withMetaData: include meta data for each peer?
@param withVolantile: include volantile data for each peer?
- @event: EventListPeers(event).
@event: EvenPeer(event, peer).
- @event: EventEndListPeers(event).
+ @event: EventEndListPeers(event, params).
"""
self.sendMessage(
self.Message.ListPeers,
@@ -1090,10 +1114,10 @@
)
- def modifyPeer(self, identifier, allowLocalAddresses=None, isDisabled=None, isListenOnly=None):
+ def modifyPeer(self, identity, allowLocalAddresses=None, isDisabled=None, isListenOnly=None):
msg = Message(
self.Message.ModifyPeer,
- NodeIdentifier=identifier,
+ NodeIdentifier=identity,
)
if allowLocalAddresses is not None:
msg['AllowLocalAddresses'] = self.fcpBool(allowLocalAddresses)
@@ -1105,20 +1129,20 @@
self.sendMessageEx(msg)
- def modifyPeerNote(self, identifier, note):
+ def modifyPeerNote(self, identity, note):
self.sendMessage(
self.Message.ModifyPeerNote,
- NodeIdentifier=identifier,
+ NodeIdentifier=identity,
#NOTE: currently fcp supports only this one type
PeerNoteType=self.PeerNoteType.Private,
NoteText=note
)
- def removePeer(self, identifier):
+ def removePeer(self, identity):
self.sendMessage(
self.Message.RemovePeer,
- NodeIdentifier=identifier,
+ NodeIdentifier=identity,
)
##########################################################
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-02 08:27:30
|
Revision: 33
http://fclient.svn.sourceforge.net/fclient/?rev=33&view=rev
Author: jUrner
Date: 2007-11-02 01:27:25 -0700 (Fri, 02 Nov 2007)
Log Message:
-----------
combed over events + a minor adjustement in testDDA()
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-01 14:58:13 UTC (rev 32)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-02 08:27:25 UTC (rev 33)
@@ -114,56 +114,61 @@
#**************************************************************************
#TODO: name as specified in NodeHello seems to be usable to keep jobs alive. Have to test this.
#TODO: events should be FcpClient.event.PeerNote not FcpClient.EventPeerNote
-class FcpClient(events.Events):
+class FcpClient(object):
"""Fcp client implementation
"""
+
+ Version = '2.0'
+ FcpTrue = 'true'
+ FcpFalse = 'false'
+ class DisconnectReason:
+ """Reason for client disconnect"""
+ Shutdown = '1'
+ SocketDied = '2'
+
- _events_ = (
+ class Events(events.Events):
+ """All events the client supports"""
+ _events_ = (
- 'EventClientConnected',
- 'EventClientDisconnected',
+ 'ClientConnected',
+ 'ClientDisconnected',
# config related events
- 'EventConfigData',
- 'EventNodeData',
+ 'ConfigData',
+ 'NodeData',
#Peer related events
- 'EventEndListPeers',
- 'EventPeer',
- 'EventPeerRemoved',
- 'EventUnknownNodeIdentifier',
+ 'EndListPeers',
+ 'Peer',
+ 'PeerRemoved',
+ 'UnknownNodeIdentifier',
- 'EventListPeerNotes',
- 'EventEndListPeerNotes',
- 'EventPeerNote',
+ 'ListPeerNotes',
+ 'EndListPeerNotes',
+ 'PeerNote',
# get / put related events
- 'EventTestDDAComplete',
- 'EventIdentifierCollision',
+ 'TestDDAComplete',
+ 'IdentifierCollision',
- 'EventClientGetInfo',
- 'EventClientGetInfoProgress',
+ 'ClientGetInfo',
+ 'ClientGetInfoProgress',
- 'EventDataFound',
- 'EventGetFailed',
- 'EventSimpleProgress',
- 'EventPersistentRequestModified',
- 'EventPersistentRequestRemoved',
+ 'DataFound',
+ 'GetFailed',
+ 'SimpleProgress',
+ 'PersistentRequestModified',
+ 'PersistentRequestRemoved',
# others
- 'EventSSKKeypair',
+ 'SSKKeypair',
)
- Version = '2.0'
- FcpTrue = 'true'
- FcpFalse = 'false'
- class DisconnectReason:
- """Reason for client disconnect"""
- Shutdown = '1'
- SocketDied = '2'
+
class FetchError(Exception):
"""All fetch errors supported by the client"""
@@ -348,6 +353,7 @@
KeyboardInterrupt = 'Keyboard interrupt'
SocketDied = 'Socket died'
+ #TODO: maybe speed up lookup of message name lookup by implementing integer message names
class Message(object):
"""Class wrapping a freenet message"""
@@ -666,6 +672,8 @@
@param name: name of the client instance or '' (for debugging)
@param conectionName: name of the connection
@param verbosity: verbosity level for debugging
+
+ @ivar events: events the client supports
"""
self._connectionName = connectionName
@@ -673,6 +681,8 @@
self._log = logging.getLogger(name)
self._socket = None
+ self.events = self.Events()
+
self.setVerbosity(verbosity)
atexit.register(self.close)
@@ -697,7 +707,7 @@
@param port: (int) port of the node
@param repeat: (int) how many seconds try to connect before giving up
@param timeout: (int) how much time to wait before another attempt to connect
- @event: EventConnected(event, params). Triggered as soon as the client is connected. Params
+ @event: Connected(event, params). Triggered as soon as the client is connected. Params
will be the parameters of the NodeHello message.
@return: (Message) NodeHello if successful, None otherwise for the next iteration
"""
@@ -736,7 +746,7 @@
timeElapsed += SocketTimeout
yield None
elif msg.name == self.Message.NodeHello:
- self.EventClientConnected(msg.params)
+ self.events.ClientConnected(msg.params)
yield msg.params
raise StopIteration
else:
@@ -751,9 +761,7 @@
self._log.info(self.LogMessages.ConnectingFailed)
raise StopIteration
-
-
-
+
def handleMessage(self, msg):
"""Handles a message from the freenet node
@param msg: (Message) to handle
@@ -768,7 +776,7 @@
code = msg['Code']
if code == self.ProtocolError.ShuttingDown:
self.close()
- self.EventClientDisconnect({'DisconnectReason': DisconnectReason.Shutdown})
+ self.events.ClientDisconnect({'DisconnectReason': DisconnectReason.Shutdown})
return True
raise self.ProtocolError(msg)
@@ -777,10 +785,6 @@
##
## TestDDA
##
- ## Note: if both, ReadDirectoryAllowed and WriteDirectoryAllowed are
- ## set to false, the node sends a ProtocolError (7, 'Invalid message').
- ## Have to handle this!
- ##
####################################################
elif msg.name == self.Message.TestDDAReply:
fpathWrite = msg.params.get('WriteFilename', None)
@@ -810,7 +814,7 @@
for fpath in self._ddaTmpFiles:
saveRemoveFile(fpath)
self._ddaTmpFiles = []
- self.EventTestDDAComplete(msg.params)
+ self.events.TestDDAComplete(msg.params)
return True
####################################################
@@ -819,11 +823,11 @@
##
####################################################
elif msg.name == self.Message.ConfigData:
- self.EventConfigData(msg.params)
+ self.events.ConfigData(msg.params)
return True
elif msg.name == self.Message.NodeData:
- self.EventNodeData(msg.params)
+ self.events.NodeData(msg.params)
return True
####################################################
@@ -832,15 +836,15 @@
##
####################################################
elif msg.name == self.Message.EndListPeers:
- self.EventEndListPeers(msg.params)
+ self.events.EndListPeers(msg.params)
return True
elif msg.name == self.Message.EndListPeerNotes:
- self.EventEndListPeerNotes(msg.params)
+ self.events.EndListPeerNotes(msg.params)
return True
elif msg.name == self.Message.Peer:
- self.EventPeer(msg.params)
+ self.events.Peer(msg.params)
return True
elif msg.name == self.Message.PeerNote:
@@ -848,28 +852,28 @@
if note:
note = base64.decodestring(note)
msg['NoteText'] = note
- self.EventPeerNote(msg.params, note)
+ self.events.PeerNote(msg.params, note)
return True
elif msg.name == self.Message.PeerRemoved:
- self.EventPeerRemoved(msg.params)
+ self.events.PeerRemoved(msg.params)
return True
elif msg.name == self.Message.UnknownNodeIdentifier:
- self.EventUnknownNodeIdentifier(msg.params)
+ self.events.UnknownNodeIdentifier(msg.params)
return True
####################################################
##
- ## Get related messages
+ ## ClientGet related messages
##
####################################################
elif msg.name == self.Message.DataFound:
if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo):
- self.EventClientGetInfo(msg.params)
+ self.events.ClientGetInfo(msg.params)
return True
- self.EventDataFound(msg.params)
+ self.events.DataFound(msg.params)
return True
elif msg.name == self.Message.GetFailed:
@@ -880,29 +884,29 @@
'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''),
'DataLength': msg.get('ExpectedDataLength', '')
}
- self.EventClientGetInfo(params)
+ self.events.ClientGetInfo(params)
return True
- self.EventGetFailed(msg.params)
+ self.events.GetFailed(msg.params)
return True
elif msg.name == self.Message.SimpleProgress:
if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo):
- self.EventClientGetInfoProgress(msg.params)
+ self.events.ClientGetInfoProgress(msg.params)
else:
- self.EventSimpleProgress(msg.params)
+ self.events.SimpleProgress(msg.params)
return True
elif msg.name == self.Message.IdentifierCollision:
- self.EventIdentifierCollision(msg.params)
+ self.events.IdentifierCollision(msg.params)
return True
elif msg.name == self.Message.PersistentRequestModified:
- self.EventPersistentRequestModified(msg.params)
+ self.events.PersistentRequestModified(msg.params)
return True
elif msg.name == self.Message.PersistentRequestRemoved:
- self.EventPersistentRequestRemoved(msg.params)
+ self.events.PersistentRequestRemoved(msg.params)
return True
####################################################
@@ -911,7 +915,7 @@
##
####################################################
elif msg.name == self.Message.SSKKeypair:
- self.EventSSKKeypair(msg.params)
+ self.events.SSKKeypair(msg.params)
return True
@@ -961,7 +965,7 @@
'Exception': msg['Exception'],
'Details': msg['Details']
}
- self.EventClientDisconnected(params)
+ self.events.ClientDisconnected(params)
raise self.SocketError(msg['Details'])
self.handleMessage(msg)
return msg
@@ -999,7 +1003,7 @@
'Exception': socket.error,
'Details': d
}
- self.EventClientDisconnected(params)
+ self.events.ClientDisconnected(params)
raise self.SocketError(d)
return msg
@@ -1089,9 +1093,9 @@
def listPeerNotes(self, identity):
"""Lists all text notes associated to a peer
@param identifier: peer as returned in a call to L{peerList}
- @event: EventListPeerNotes(event).
- @event: EventListPeerNote(event, note).
- @event: EventEndListPeerNotes(event).
+ @event: ListPeerNotes(event).
+ @event: ListPeerNote(event, note).
+ @event: EndListPeerNotes(event).
"""
self.sendMessage(
self.Message.ListPeerNotes,
@@ -1104,8 +1108,8 @@
@param withMetaData: include meta data for each peer?
@param withVolantile: include volantile data for each peer?
- @event: EvenPeer(event, peer).
- @event: EventEndListPeers(event, params).
+ @event: Peer(event, peer).
+ @event: EndListPeers(event, params).
"""
self.sendMessage(
self.Message.ListPeers,
@@ -1183,7 +1187,7 @@
@param uri: uri of the file to request info about
@event: clientGetInfo(event, params). If success, params will contain a key 'Metadata.ContentType'
and 'DataLength'. Both may be '' (empty string)
- @event: clientGetInfoProgress(event, params). Triggered instead of EventSimpleProgress
+ @event: GetInfoProgress(event, params). Triggered instead ofSimpleProgress
@note: for other events see: L{clientGet}
@return: (str) request identifier
"""
@@ -1201,7 +1205,7 @@
)
return identifier
-
+
def testDDA(self, directory, wantReadDirectory=None, wantWriteDirectory=None):
"""Tests a directory for read / write access
@param directory: directory to test
@@ -1213,7 +1217,19 @@
and a directory for read access before uploading content from it
@note: the node does not like both parameters being False and will respond with a protocol error in this
case. Take care of that.
+ @note: if a directory is no longer needed, best pratice is to free resources by calling
+ testDDA() with both parameters set to False.
"""
+
+ # if both, ReadDirectoryAllowed and WriteDirectoryAllowed are
+ # set to false, the node sends a ProtocolError (7, 'Invalid message')
+ # No idea what the error is good for... so simply ignore the request.
+ #
+ # already file a bug report that (False, False) should be interpreted
+ # by the node to forgett a directory (free resources)
+ if not wantReadDirectory and not wantWriteDirectory:
+ return
+
msg = self.Message(
self.Message.TestDDARequest,
Directory=directory,
@@ -1223,6 +1239,7 @@
if wantWriteDirectory is not None:
msg['WantWriteDirectory'] = self.fcpBool(wantWriteDirectory)
self.sendMessageEx(msg)
+
##########################################################
##
## others
@@ -1246,7 +1263,8 @@
#*****************************************************************************
if __name__ == '__main__':
c = FcpClient(name='test', verbosity=FcpClient.Verbosity.Debug)
- nodeHello = c.connect()
+
+ for nodeHello in c.connect(): pass
if nodeHello is not None:
@@ -1289,13 +1307,13 @@
print '%s=%s' % (prefix, value)
print
- c.EventConfigData += cb
+ c.events.ConfigData += cb
oldVerbosity = c.verbosity()
##c.setVerbosity(c.Verbosity.Warning)
print '\n>> Requesting config\n'
c.getConfig()
- for i in xrange(1):
+ for i in xrange(5):
c.next()
c.setVerbosity(oldVerbosity)
@@ -1308,7 +1326,7 @@
def cb(event, params):
print params
- c.EventSSKKeypair += cb
+ c.events.SSKKeypair += cb
c.generateSSK()
for i in xrange(1):
c.next()
@@ -1319,7 +1337,7 @@
def cb(event, params):
print params
- c.EventTestDDAComplete += cb
+ c.events.TestDDAComplete += cb
d = os.path.dirname(os.path.abspath(__file__))
c.testDDA(d, True, True)
for i in xrange(4):
@@ -1333,9 +1351,9 @@
if params['opennet'] == c.FcpFalse:
c.listPeerNotes(params['identity'])
- c.EventPeer += cb
+ c.events.Peer += cb
c.listPeers()
- for i in xrange(100):
+ for i in xrange(120):
c.next()
#testListPeerNotes()
@@ -1345,7 +1363,7 @@
def cb(event, params):
print params
- c.EventClientGetInfo += cb
+ c.events.ClientGetInfo += cb
identifier = c.clientGetInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
for i in xrange(20):
c.next()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-02 08:43:53
|
Revision: 36
http://fclient.svn.sourceforge.net/fclient/?rev=36&view=rev
Author: jUrner
Date: 2007-11-02 01:43:54 -0700 (Fri, 02 Nov 2007)
Log Message:
-----------
message has to be dispatched
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-02 08:30:13 UTC (rev 35)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-02 08:43:54 UTC (rev 36)
@@ -809,6 +809,7 @@
)
return True
+ #TODO: unconditionally clean up all tmp files? Looks like trouble..
elif msg.name == self.Message.TestDDAComplete:
# clean tmp files
for fpath in self._ddaTmpFiles:
@@ -1228,6 +1229,12 @@
# already file a bug report that (False, False) should be interpreted
# by the node to forgett a directory (free resources)
if not wantReadDirectory and not wantWriteDirectory:
+ msg = self.Message(
+ self.Message.TestDDAComplete,
+ ReadDirectoryAllowed=self.FcpFalse,
+ WriteDirectoryAllowed=self.FcpFalse,
+ )
+ self.handleMessage(msg)
return
msg = self.Message(
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-02 16:01:26
|
Revision: 37
http://fclient.svn.sourceforge.net/fclient/?rev=37&view=rev
Author: jUrner
Date: 2007-11-02 09:01:29 -0700 (Fri, 02 Nov 2007)
Log Message:
-----------
some fixes
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-02 08:43:54 UTC (rev 36)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-02 16:01:29 UTC (rev 37)
@@ -144,7 +144,6 @@
'PeerRemoved',
'UnknownNodeIdentifier',
- 'ListPeerNotes',
'EndListPeerNotes',
'PeerNote',
@@ -443,7 +442,7 @@
"""Reads n bytes from socket
@param socketObj: socket to read bytes from
@param n: (int) number of bytes to read
- @return: (tuple) error-message or None, bytes read or None if an error occured
+ @return: (tuple) (error-message or None, bytes read or None) if an error occured
or no bytes could be read
"""
error = p = None
@@ -452,7 +451,7 @@
if not p:
p = None
raise socket.error('Socket shut down by node')
- except socket.timeout, d: # no new messages in queue
+ except socket.timeout, d: # nothing in the queue
error = clss(clss.ClientSocketTimeout)
except socket.error, d:
error = clss(clss.ClientSocketDied, Exception=socket.error, Details=d)
@@ -853,7 +852,7 @@
if note:
note = base64.decodestring(note)
msg['NoteText'] = note
- self.events.PeerNote(msg.params, note)
+ self.events.PeerNote(msg.params)
return True
elif msg.name == self.Message.PeerRemoved:
@@ -1094,9 +1093,9 @@
def listPeerNotes(self, identity):
"""Lists all text notes associated to a peer
@param identifier: peer as returned in a call to L{peerList}
- @event: ListPeerNotes(event).
- @event: ListPeerNote(event, note).
- @event: EndListPeerNotes(event).
+ @event: ListPeerNote(event, params)
+ @event: EndListPeerNotes(event, params)
+ @note: listPeerNotes() is only available for darknet nodes
"""
self.sendMessage(
self.Message.ListPeerNotes,
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-04 17:31:25
|
Revision: 43
http://fclient.svn.sourceforge.net/fclient/?rev=43&view=rev
Author: jUrner
Date: 2007-11-04 09:31:30 -0800 (Sun, 04 Nov 2007)
Log Message:
-----------
some more consts
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-04 17:29:50 UTC (rev 42)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-04 17:31:30 UTC (rev 43)
@@ -121,10 +121,20 @@
Version = '2.0'
FcpTrue = 'true'
FcpFalse = 'false'
+ class ConnectReason:
+ Connect = '1'
+ Reconnect = '2'
+
+
class DisconnectReason:
- """Reason for client disconnect"""
+ """Reasons for client disconnect
+ @cvar Shutdown: regular shutdown of the connection
+ @cvar SocketDied: connection to the node died unexpectingly
+ @cvar ConnectFailed: connection could not be established
+ """
Shutdown = '1'
SocketDied = '2'
+ ConnectFailed = '3'
class Events(events.Events):
@@ -757,10 +767,16 @@
timeElapsed += timeout
time.sleep(timeout)
+ sef.events.ClientDisconnect({'DisconnectReason': DisconnectReason.ConectFailed})
self._log.info(self.LogMessages.ConnectingFailed)
raise StopIteration
+
+
+ def connectionName(self):
+ """Returns the connection name the client uses"""
+ return self._connectionName
-
+
def handleMessage(self, msg):
"""Handles a message from the freenet node
@param msg: (Message) to handle
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-05 12:43:06
|
Revision: 46
http://fclient.svn.sourceforge.net/fclient/?rev=46&view=rev
Author: jUrner
Date: 2007-11-05 04:43:07 -0800 (Mon, 05 Nov 2007)
Log Message:
-----------
error handling
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-05 12:42:27 UTC (rev 45)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-05 12:43:07 UTC (rev 46)
@@ -1057,9 +1057,13 @@
def pythonTime(self, fcpTime):
"""Converts a fcp time value to a python time value
@param fcpTime: (int, str) time to convert
+ @raise ValueError: if the fcpTime could not be converted
@return: (int) python time
"""
- fcpTime = int(fcpTime)
+ try:
+ fcpTime = int(fcpTime)
+ except Exception, d:
+ raise ValueError(d)
return fcpTime / 1000
########################################################
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-06 12:34:48
|
Revision: 48
http://fclient.svn.sourceforge.net/fclient/?rev=48&view=rev
Author: jUrner
Date: 2007-11-06 04:34:50 -0800 (Tue, 06 Nov 2007)
Log Message:
-----------
minor changes
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-05 12:50:42 UTC (rev 47)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-06 12:34:50 UTC (rev 48)
@@ -222,6 +222,7 @@
NotAllDataFound = '28'
class FcpError(Exception): pass
+ #TODO: better description of keys (MaxKeyLen ...) so we can use regex(es) to parse arbitrary strings for keys
class FcpUri(object):
"""Wrapper class for freenet uris"""
@@ -362,7 +363,7 @@
KeyboardInterrupt = 'Keyboard interrupt'
SocketDied = 'Socket died'
- #TODO: maybe speed up lookup of message name lookup by implementing integer message names
+ #TODO: maybe speed up message name lookup by implementing integer message names
class Message(object):
"""Class wrapping a freenet message"""
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-07 17:33:38
|
Revision: 54
http://fclient.svn.sourceforge.net/fclient/?rev=54&view=rev
Author: jUrner
Date: 2007-11-07 09:33:26 -0800 (Wed, 07 Nov 2007)
Log Message:
-----------
added an idle event and fixed a missing identifier in EventClientGetInfo
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-07 17:32:18 UTC (rev 53)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-07 17:33:26 UTC (rev 54)
@@ -141,6 +141,8 @@
"""All events the client supports"""
_events_ = (
+ 'Idle',
+
'ClientConnected',
'ClientDisconnected',
@@ -898,6 +900,7 @@
if code == self.FetchError.TooBig:
if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo):
params = {
+ 'Identifier': msg['Identifier'],
'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''),
'DataLength': msg.get('ExpectedDataLength', '')
}
@@ -984,7 +987,12 @@
}
self.events.ClientDisconnected(params)
raise self.SocketError(msg['Details'])
- self.handleMessage(msg)
+
+ elif msg.name == self.Message.ClientSocketTimeout:
+ self.events.Idle(msg)
+
+ else:
+ self.handleMessage(msg)
return msg
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-08 11:12:52
|
Revision: 58
http://fclient.svn.sourceforge.net/fclient/?rev=58&view=rev
Author: jUrner
Date: 2007-11-08 03:12:56 -0800 (Thu, 08 Nov 2007)
Log Message:
-----------
protocol errors in relation to requests are now passed to the caller ++ some renames
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-07 17:35:34 UTC (rev 57)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-08 11:12:56 UTC (rev 58)
@@ -146,6 +146,8 @@
'ClientConnected',
'ClientDisconnected',
+ 'ProtocolError',
+
# config related events
'ConfigData',
'NodeData',
@@ -164,8 +166,8 @@
'TestDDAComplete',
'IdentifierCollision',
- 'ClientGetInfo',
- 'ClientGetInfoProgress',
+ 'ClientRequestInfo',
+ 'ClientRequestInfoProgress',
'DataFound',
'GetFailed',
@@ -319,7 +321,7 @@
class IdentifierPrefix:
"""Special purpose identifier prefixes"""
- ClientGetInfo = 'ClientGetInfo::'
+ ClientRequestInfo = 'ClientRequestInfo::'
class InsertError(Exception):
"""All insert errors supported by the client"""
@@ -797,8 +799,14 @@
self.events.ClientDisconnect({'DisconnectReason': DisconnectReason.Shutdown})
return True
- raise self.ProtocolError(msg)
+ identifier = msg.get('Identifier', None)
+ if identifier is None:
+ #TODO: check how to handle this
+ raise self.ProtocolError(msg)
+ else:
+ self.events.ProtocolError(msg.params)
+
####################################################
##
## TestDDA
@@ -888,8 +896,8 @@
##
####################################################
elif msg.name == self.Message.DataFound:
- if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo):
- self.events.ClientGetInfo(msg.params)
+ if msg['Identifier'].startswith(self.IdentifierPrefix.ClientRequestInfo):
+ self.events.ClientRequestInfo(msg.params)
return True
self.events.DataFound(msg.params)
@@ -898,21 +906,21 @@
elif msg.name == self.Message.GetFailed:
code = msg['Code']
if code == self.FetchError.TooBig:
- if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo):
+ if msg['Identifier'].startswith(self.IdentifierPrefix.ClientRequestInfo):
params = {
'Identifier': msg['Identifier'],
'Metadata.ContentType': msg.get('ExpectedMetadata.ContentType', ''),
'DataLength': msg.get('ExpectedDataLength', '')
}
- self.events.ClientGetInfo(params)
+ self.events.ClientRequestInfo(params)
return True
self.events.GetFailed(msg.params)
return True
elif msg.name == self.Message.SimpleProgress:
- if msg['Identifier'].startswith(self.IdentifierPrefix.ClientGetInfo):
- self.events.ClientGetInfoProgress(msg.params)
+ if msg['Identifier'].startswith(self.IdentifierPrefix.ClientRequestInfo):
+ self.events.ClientRequestInfoProgress(msg.params)
else:
self.events.SimpleProgress(msg.params)
return True
@@ -1211,7 +1219,7 @@
return identifier
- def clientGetInfo(self, uri, **params):
+ def clientRequestInfo(self, uri, **params):
"""Requests info about a file
@param uri: uri of the file to request info about
@event: clientGetInfo(event, params). If success, params will contain a key 'Metadata.ContentType'
@@ -1220,7 +1228,7 @@
@note: for other events see: L{clientGet}
@return: (str) request identifier
"""
- identifier = self.IdentifierPrefix.ClientGetInfo + self.newIdentifier()
+ identifier = self.IdentifierPrefix.ClientRequestInfo + self.newIdentifier()
self.sendMessage(
self.Message.ClientGet,
Identifier=identifier,
@@ -1394,13 +1402,13 @@
#testListPeerNotes()
- def testClientGetInfo():
+ def testClientRequestInfo():
def cb(event, params):
print params
c.events.ClientGetInfo += cb
- identifier = c.clientGetInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
+ identifier = c.clientRequestInfo('CHK@sdNenKGj5mupxaSwo44jcW8dsX7vYTLww~BsRPtur0k,ZNRm9reMjtKEl9e-xFByKXbW6q4f6OQyfg~l9GRSAes,AAIC--8/snow_002%20%281%29.jpg')
for i in xrange(20):
c.next()
- #testClientGetInfo()
+ #testClientRequestInfo()
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-10 11:28:50
|
Revision: 65
http://fclient.svn.sourceforge.net/fclient/?rev=65&view=rev
Author: jUrner
Date: 2007-11-10 03:28:52 -0800 (Sat, 10 Nov 2007)
Log Message:
-----------
request identifiers can now be set explicitely + EventFileInfoProgress has gone + a bit of this and that
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-10 11:27:11 UTC (rev 64)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-10 11:28:52 UTC (rev 65)
@@ -167,8 +167,7 @@
'IdentifierCollision',
'ClientRequestInfo',
- 'ClientRequestInfoProgress',
-
+
'DataFound',
'GetFailed',
'SimpleProgress',
@@ -321,6 +320,7 @@
class IdentifierPrefix:
"""Special purpose identifier prefixes"""
+ ClientGetFile = 'ClientGetFile::'
ClientRequestInfo = 'ClientRequestInfo::'
class InsertError(Exception):
@@ -919,10 +919,7 @@
return True
elif msg.name == self.Message.SimpleProgress:
- if msg['Identifier'].startswith(self.IdentifierPrefix.ClientRequestInfo):
- self.events.ClientRequestInfoProgress(msg.params)
- else:
- self.events.SimpleProgress(msg.params)
+ self.events.SimpleProgress(msg.params)
return True
elif msg.name == self.Message.IdentifierCollision:
@@ -1192,10 +1189,13 @@
##
##########################################################
#TODO: not complete yet
- def clientGetFile(self, uri, filename):
+ def clientGetFile(self, uri, filename, identifier=None):
"""
"""
- identifier = self.new_identifier()
+ if identifier is None:
+ identifier = self.IdentifierPrefix.ClientGetFile + self.newIdentifier()
+ else:
+ assert identifier.startswith(self.IdentifierPrefix.ClientGetFile), 'Wrong prefix'
msg = self.Message(
self.Message.ClientGet,
IgnoreDS='false',
@@ -1219,20 +1219,29 @@
return identifier
- def clientRequestInfo(self, uri, **params):
+ def clientRequestInfo(self, uri, identifier=None, **params):
"""Requests info about a file
@param uri: uri of the file to request info about
+ @param identifier: request identifier or None to let the method create one. If an identifier is passed, it has to be
+ be prefixed with L{IdentifierPrefix.ClientRequestInfo}
@event: clientGetInfo(event, params). If success, params will contain a key 'Metadata.ContentType'
and 'DataLength'. Both may be '' (empty string)
@event: GetInfoProgress(event, params). Triggered instead ofSimpleProgress
@note: for other events see: L{clientGet}
@return: (str) request identifier
+ @note: the request identifier returned is very likely to be unique but uniqueness is not guaranteed
"""
- identifier = self.IdentifierPrefix.ClientRequestInfo + self.newIdentifier()
+ if identifier is None:
+ identifier = self.IdentifierPrefix.ClientRequestInfo + self.newIdentifier()
+ else:
+ assert identifier.startswith(self.IdentifierPrefix.ClientRequestInfo), 'Wrong prefix'
self.sendMessage(
self.Message.ClientGet,
Identifier=identifier,
URI=uri,
+ #TODO: persistance???
+ #Persistence='forever',
+
# suggested by Mathew Toseland to use about 32k for mimeType requests
# basic sizes of keys are: 1k for SSks and 32k for CHKs
MaxSize='32000',
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-12 10:35:06
|
Revision: 67
http://fclient.svn.sourceforge.net/fclient/?rev=67&view=rev
Author: jUrner
Date: 2007-11-12 02:35:09 -0800 (Mon, 12 Nov 2007)
Log Message:
-----------
bit of this and that
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-10 11:30:27 UTC (rev 66)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-12 10:35:09 UTC (rev 67)
@@ -168,6 +168,7 @@
'ClientRequestInfo',
+ 'PersistentGet',
'DataFound',
'GetFailed',
'SimpleProgress',
@@ -318,6 +319,7 @@
+
class IdentifierPrefix:
"""Special purpose identifier prefixes"""
ClientGetFile = 'ClientGetFile::'
@@ -691,7 +693,7 @@
"""
self._connectionName = connectionName
- self._ddaTmpFiles = []
+ self._ddaTmpFiles = {}
self._log = logging.getLogger(name)
self._socket = None
@@ -813,6 +815,7 @@
##
####################################################
elif msg.name == self.Message.TestDDAReply:
+ directory = msg['Directory']
fpathWrite = msg.params.get('WriteFilename', None)
fpathRead = msg.params.get('ReadFilename', None)
readContent = ''
@@ -821,8 +824,13 @@
if not written:
saveRemoveFile(fpathWrite)
else:
- self._ddaTmpFiles.append(fpathWrite)
-
+ # ...hope the node will keep request order so the correct
+ # tmp file is removed on completion
+ if directory in self._ddaTmpFiles:
+ self._ddaTmpFiles[directory].append(fpathWrite)
+ else:
+ self._ddaTmpFiles[directory] = [fpathWrite, ]
+
if fpathRead is not None:
readContent = saveReadFile(fpathRead)
if readContent is None:
@@ -835,12 +843,11 @@
)
return True
- #TODO: unconditionally clean up all tmp files? Looks like trouble..
elif msg.name == self.Message.TestDDAComplete:
- # clean tmp files
- for fpath in self._ddaTmpFiles:
- saveRemoveFile(fpath)
- self._ddaTmpFiles = []
+ # clean up tmp file
+ directory = msg['Directory']
+ tmp_file = self._ddaTmpFiles[directory].pop(0)
+ saveRemoveFile(tmp_file)
self.events.TestDDAComplete(msg.params)
return True
@@ -926,6 +933,10 @@
self.events.IdentifierCollision(msg.params)
return True
+ elif msg.name == self.Message.PersistentGet:
+ self.events.PersistentGet(msg.params)
+ return True
+
elif msg.name == self.Message.PersistentRequestModified:
self.events.PersistentRequestModified(msg.params)
return True
@@ -1189,13 +1200,13 @@
##
##########################################################
#TODO: not complete yet
- def clientGetFile(self, uri, filename, identifier=None):
+ def clientGetFile(self, uri, filename, identifier=None, clientToken=None):
"""
"""
if identifier is None:
identifier = self.IdentifierPrefix.ClientGetFile + self.newIdentifier()
else:
- assert identifier.startswith(self.IdentifierPrefix.ClientGetFile), 'Wrong prefix'
+ assert self.isClientGetFile(identifier), 'Wrong prefix'
msg = self.Message(
self.Message.ClientGet,
IgnoreDS='false',
@@ -1214,12 +1225,14 @@
#BinaryBlob='false',
Filename=filename,
)
+ if clientToken is not None:
+ msg['ClientToken'] = clientToken
self.sendMessageEx(msg)
return identifier
- def clientRequestInfo(self, uri, identifier=None, **params):
+ def clientRequestInfo(self, uri, identifier=None, clientToken=None, **params):
"""Requests info about a file
@param uri: uri of the file to request info about
@param identifier: request identifier or None to let the method create one. If an identifier is passed, it has to be
@@ -1234,13 +1247,13 @@
if identifier is None:
identifier = self.IdentifierPrefix.ClientRequestInfo + self.newIdentifier()
else:
- assert identifier.startswith(self.IdentifierPrefix.ClientRequestInfo), 'Wrong prefix'
- self.sendMessage(
+ assert self.isClientRequestInfo(identifier), 'Wrong prefix'
+ msg = self.Message(
self.Message.ClientGet,
Identifier=identifier,
URI=uri,
#TODO: persistance???
- #Persistence='forever',
+ Persistence='forever',
# suggested by Mathew Toseland to use about 32k for mimeType requests
# basic sizes of keys are: 1k for SSks and 32k for CHKs
@@ -1249,9 +1262,41 @@
Verbosity='1',
**params
)
+
+ if clientToken is not None:
+ msg['ClientToken'] = clientToken
+
+ self.sendMessageEx(msg)
return identifier
+
+ def isClientGetFile(self, identifier):
+ """Checks if an identifier is a ClientGetFile identifier
+ @return: (bool)
+ """
+ return identifier.startswith(self.IdentifierPrefix.ClientGetFile)
+
+ def isClientRequestInfo(self, identifier):
+ """Checks if an identifier is a RequestFileInfo identifier
+ @return: (bool)
+ """
+ return identifier.startswith(self.IdentifierPrefix.ClientRequestInfo)
+
+
+ def removePersistentRequest(self, identifier, global_=False):
+ """Removes a request
+ @param identifier: (str) identifier of the request to remove
+ @param global_: (bool) has to be set to True if the request was a global request
+ """
+ self.sendMessage(
+ self.Message.RemovePersistentRequest,
+ Global=self.fcpBool(global_),
+ Identifier=identifier
+ )
+
+
+ #TODO: check if node passes directory unchanged to testDDAComplete (trailing slash)
def testDDA(self, directory, wantReadDirectory=None, wantWriteDirectory=None):
"""Tests a directory for read / write access
@param directory: directory to test
@@ -1292,6 +1337,7 @@
msg['WantWriteDirectory'] = self.fcpBool(wantWriteDirectory)
self.sendMessageEx(msg)
+
##########################################################
##
## others
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-13 20:57:12
|
Revision: 73
http://fclient.svn.sourceforge.net/fclient/?rev=73&view=rev
Author: jUrner
Date: 2007-11-13 12:57:13 -0800 (Tue, 13 Nov 2007)
Log Message:
-----------
more methods, more flags
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-12 11:33:36 UTC (rev 72)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-13 20:57:13 UTC (rev 73)
@@ -613,7 +613,7 @@
"""All known peer note types"""
Private = '1'
- class Priorities:
+ class Priority:
"""All priorities supported by the client"""
Maximum = '0'
Interactive = '1'
@@ -623,9 +623,7 @@
Prefetch = '5'
Minimum = '6'
- PriorityMin = Minimum
- PriorityDefault = Bulk
-
+
class ProtocolError(Exception):
"""All protocol errors supported by the client"""
@@ -1200,7 +1198,7 @@
##
##########################################################
#TODO: not complete yet
- def clientGetFile(self, uri, filename, identifier=None, clientToken=None):
+ def clientGetFile(self, uri, filename, identifier=None, clientToken=None, priorityClass=None):
"""
"""
if identifier is None:
@@ -1225,14 +1223,17 @@
#BinaryBlob='false',
Filename=filename,
)
- if clientToken is not None:
+ if clientToken is not None:
msg['ClientToken'] = clientToken
+ if priorityClass is not None:
+ msg['PriorityClass'] = priorityClass
+
self.sendMessageEx(msg)
return identifier
- def clientRequestInfo(self, uri, identifier=None, clientToken=None, **params):
+ def clientRequestInfo(self, uri, identifier=None, clientToken=None, priorityClass=None, **params):
"""Requests info about a file
@param uri: uri of the file to request info about
@param identifier: request identifier or None to let the method create one. If an identifier is passed, it has to be
@@ -1262,10 +1263,11 @@
Verbosity='1',
**params
)
-
if clientToken is not None:
msg['ClientToken'] = clientToken
-
+ if priorityClass is not None:
+ msg['PriorityClass'] = priorityClass
+
self.sendMessageEx(msg)
return identifier
@@ -1284,6 +1286,24 @@
return identifier.startswith(self.IdentifierPrefix.ClientRequestInfo)
+ def modifyPersistantRequest(self, identifier, global_=False, priorityClass=None):
+ """Modifies a request
+ @param identifier: identifier of the request
+ @param global: (bool) required. Is the request global or local?
+ @param clientToken: new client token or None
+ @param priorityClass: new priority or None
+ """
+ msg = self.Message(
+ self.Message.ModifyPersistentRequest,
+ Identifier=identifier,
+ Global=self.fcpBool(global_),
+ )
+ if priorityClass is not None:
+ msg['PriorityClass'] = priorityClass
+ self.sendMessageEx(msg)
+
+
+
def removePersistentRequest(self, identifier, global_=False):
"""Removes a request
@param identifier: (str) identifier of the request to remove
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <jU...@us...> - 2007-11-14 11:03:31
|
Revision: 78
http://fclient.svn.sourceforge.net/fclient/?rev=78&view=rev
Author: jUrner
Date: 2007-11-14 03:03:35 -0800 (Wed, 14 Nov 2007)
Log Message:
-----------
typo
Modified Paths:
--------------
trunk/fclient/fclient_lib/fcp/fcp2_0.py
Modified: trunk/fclient/fclient_lib/fcp/fcp2_0.py
===================================================================
--- trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-14 11:03:13 UTC (rev 77)
+++ trunk/fclient/fclient_lib/fcp/fcp2_0.py 2007-11-14 11:03:35 UTC (rev 78)
@@ -1286,7 +1286,7 @@
return identifier.startswith(self.IdentifierPrefix.ClientRequestInfo)
- def modifyPersistantRequest(self, identifier, global_=False, priorityClass=None):
+ def modifyPersistentRequest(self, identifier, global_=False, priorityClass=None):
"""Modifies a request
@param identifier: identifier of the request
@param global: (bool) required. Is the request global or local?
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|