bases-devel Mailing List for Bases: Enterprise Application Framework (Page 2)
Status: Alpha
Brought to you by:
joe_steeve
You can subscribe to this list here.
| 2009 |
Jan
|
Feb
(31) |
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
|---|
|
From: <joe...@us...> - 2009-02-26 07:09:49
|
Revision: 7
http://bases.svn.sourceforge.net/bases/?rev=7&view=rev
Author: joe_steeve
Date: 2009-02-26 07:09:40 +0000 (Thu, 26 Feb 2009)
Log Message:
-----------
Generalized the creation of bases-transport objects.
Modified Paths:
--------------
trunk/ChangeLog
trunk/TODO
trunk/bases/core/broker.py
trunk/bases/core/transports/transports.py
trunk/src/bases-server
trunk/src/server.ini
Modified: trunk/ChangeLog
===================================================================
--- trunk/ChangeLog 2009-02-26 07:06:33 UTC (rev 6)
+++ trunk/ChangeLog 2009-02-26 07:09:40 UTC (rev 7)
@@ -1,5 +1,23 @@
+2008-01-23 Joe Steeve <joe...@gm...>
+
+ * src/bases-server (sanitizeConfigOptions): convert config option
+ types, put sane defaults, and basic option verification.
+ Removed unnecessary casts and exception handling
+ stuff. sanitizeConfigOptions makes them un-necessary
+
+ * bases/core/transports/transports.py (makeSSLContext): creation
+ of ssl context is needed in the broker. So, makes sense here.
+ This now uses twisted.internet.ssl.CertificateOptions to create a
+ context.
+
2008-01-22 Joe Steeve <joe...@gm...>
+ * bases/core/transports/transports.py (basesTransportsMap): add
+ this map to keep track for the protocols that we support
+
+ * src/bases-server (start_services): made protocol-factory
+ creation generic
+
* bases/services/directory.py (BasesComponentDirectory.__init__):
the component dir registers with the obj-repo.
Modified: trunk/TODO
===================================================================
--- trunk/TODO 2009-02-26 07:06:33 UTC (rev 6)
+++ trunk/TODO 2009-02-26 07:09:40 UTC (rev 7)
@@ -1,7 +1,7 @@
* Broker related stuff
-- The MsgId should be a tuple = (id(broker), timestamp, number)
+
* Things to verify
- Does the logger rotate the logs?
Modified: trunk/bases/core/broker.py
===================================================================
--- trunk/bases/core/broker.py 2009-02-26 07:06:33 UTC (rev 6)
+++ trunk/bases/core/broker.py 2009-02-26 07:09:40 UTC (rev 7)
@@ -3,8 +3,11 @@
This handles messages sent to remote bases-components. It keeps track
of sent messages, and acts on obtaining a response."""
+from datetime import datetime
+
from twisted.internet import defer, error, protocol
-from datetime import datetime
+from bases.core.transports import transports
+
shafunc = None
try:
import hashlib
@@ -100,11 +103,6 @@
ResponseObject: The object that was returned as a response.
"""
- transports={}
- in_method_calls={} # list of tuples of (location, msgid, time-stamp)
- out_method_calls={} # list of tuples of (location, msgid,
- # time-stamp, deferred)
-
def __init__(self):
self.in_MethodCalls = {}
self.out_MethodCalls = {}
@@ -225,8 +223,8 @@
# Book-keeping
self.out_MethodCalls[MsgID] = (d, datetime.utcnow())
if not self.out_Transports.has_key(id(t)):
- self.out_Transports[id(t)] = []
- self.out_Transports[id(t)].append(MsgID)
+ self.out_Transports[id(t)] = {"trans":t, "msgs":[]}
+ self.out_Transports[id(t)]['msgs'].append(MsgID)
return d
@@ -252,7 +250,7 @@
mref[0].Callback(dmesg[BASES_MSG_RESPONSE])
finally:
del(self.out_MethodCalls[dmesg[BASES_MSG_MSGID]])
- self.out_Transports[id(trans)].remove(dmesg[BASES_MSG_MSGID])
+ self.out_Transports[id(trans)]['msgs'].remove(dmesg[BASES_MSG_MSGID])
def getTransport(self, LocationTuple):
@@ -261,20 +259,22 @@
@param LocationTuple: A tuple in the following format
(protocol, host/ip, port)
- @rtype: A BasesTransport object that implements the
- IBasesTransport interface.
-
- @return: An instance of the appropriate BasesTransport object
- that should be used to connect to the location mentioned in
- 'LocationTuple'. If there is an existing connection, then that
- object is returned.
+ @rtype: A BasesTransport object that can talk to the given
+ location. If there is an existing connection, then that object
+ is returned.
"""
- if self.transports.has_key(LocationTuple):
- return transports[LocationTuple]
- else:
- raise BasesNotImplemented, "TBD:obtaining a transport automagically"
+ for tid in self.out_Transports:
+ if self.out_Transports[tid]['trans'].canHandle(LocationTuple):
+ return self.out_Transports[tid]['trans']
+ secure = False
+ tmap = transports.basesTransportsMap
+ if LocationTuple[0] in tmap['secure']:
+ secure = True
+
+
+
def cleanupTransReferences(self, trans, server=False):
"""
I should be called when a transport is dying to clear out its
Modified: trunk/bases/core/transports/transports.py
===================================================================
--- trunk/bases/core/transports/transports.py 2009-02-26 07:06:33 UTC (rev 6)
+++ trunk/bases/core/transports/transports.py 2009-02-26 07:09:40 UTC (rev 7)
@@ -2,6 +2,9 @@
from twisted.internet import protocol
from twisted.python import log
+class BasesTransportError(Exception):
+ pass
+
class _BasesServerFactory(protocol.ServerFactory):
def __init__(self, sname, broker, poser, Debug=False):
self.basesService = sname
@@ -23,3 +26,63 @@
log.msg(3, str(reason))
self.factory.basesBroker.cleanupTransReferences(self,
server=self._ServerSide)
+
+ def canHandle(self, LocationTuple):
+ """
+ I check whether I can handle data for the given LocationTuple
+ and return a boolean True if I can. Else, I return a False.
+ """
+ pass
+
+
+def makeSSLContext(Key, Cert, verify=False, CA=None):
+ '''Returns an ssl Context Object
+
+ @param Key: A X509 formatted key-file contents
+
+ @param Cert: A X509 formatted cert-file contents
+
+ @param verify: A boolean. Set this to True and provide the CA list
+ to verify the other side.
+
+ @param CA: List of CAs that we trust.
+ '''
+
+ if verify == True and CA == None:
+ raise BasesTransportError("Cannot verify without CA")
+
+ # our goal in here is to make a SSLContext object to pass to
+ # connectSSL or listenSSL
+ from twisted.internet import ssl
+ fctx = ssl.CertificateOptions(privateKey=Key,
+ certificate=Cert,
+ method=ssl.SSL.TLSv1_METHOD,
+ verify=verify,
+ caCerts=CA,
+ verifyDepth=1,
+ requireCertificate=True,
+ verifyOnce=True,
+ enableSingleUseKeys=True,
+ enableSessions=True,
+ fixBrokerPeers=True)
+ ctx = fctx.getContext()
+ return ctx
+
+
+def createBBTransServerFactory(*args, **kwargs):
+ from bases.core.transports import bbtrans
+ return bbtrans.BBTransServerFactory(*args, **kwargs)
+
+def createBBTransClientFactory(*args, **kwargs):
+ from bases.core.transports import bbtrans
+ return bbtrans.BBTransClientFactory(*args, **kwargs)
+
+
+basesTransportsMap = \
+ {"server": \
+ {"bbtrans": createBBTransServerFactory},
+ "client": \
+ {"bbtrans": createBBTransClientFactory},
+ "secure": \
+ {"sbbtrans":"bbtrans"}
+ }
Modified: trunk/src/bases-server
===================================================================
--- trunk/src/bases-server 2009-02-26 07:06:33 UTC (rev 6)
+++ trunk/src/bases-server 2009-02-26 07:09:40 UTC (rev 7)
@@ -55,30 +55,23 @@
parser.error("ERROR: Not enough arguments (expected atleast 2)")
sys.exit(1)
- #TBD: setup sane defaults for config file
conf = LoadConfig(args[0])
- if not conf.has_key("globals.name"):
- conf['globals.name'] = 'bases-server'
-
+ conf = sanitizeConfigOptions(conf)
+
+ ## Commandline options override the config file options.
# Setup verbosity
- if not conf.has_key('globals.verbosity'):
- conf['globals.verbosity'] = 0
- else:
- conf['globals.verbosity'] = int(conf['globals.verbosity'])
if options.verbosity > 0:
conf["globals.verbosity"] = options.verbosity
-
# Foreground/Background
conf["globals.debug"] = options.debug
# Profiler?
conf["globals.profile_hs"] = options.profile_hs
if options.profile_hs is True:
if options.profile_file == "":
- parser.error("ERROR: Please provide a file to log the profiler output")
+ parser.error("ERROR: Profile log file not given (--profile-log)")
sys.exit(1)
else:
conf["globals.profile_file"] = options.profile_file
-
# explicitly deleting the parser
del(parser)
return conf, args
@@ -114,6 +107,90 @@
return conf
+def sanitizeConfigOptions(conf):
+ """
+ The config parser loads the configuration file into conf as
+ strings. I sanitize the conf into the correct types as expected
+ everywhere in the code. In the process I also make sure some
+ essential options are not ommitted and provide sane defaults where
+ ever I can.
+ """
+
+ # There should be better ways of doing this. But, for now this is
+ # enough.
+
+ # First make sure we have the essentials
+ try:
+ conf['globals.userid'] = int(conf['globals.userid'])
+ conf['globals.groupid'] = int(conf['globals.groupid'])
+ conf['globals.pidfile'] = conf['globals.pidfile']
+ conf['globals.bases_neighbours'] = conf['globals.bases_neighbours']
+ conf['globals.bases_cache'] = conf['globals.bases_cache']
+ conf['services'] = conf['services']
+ except KeyError, e:
+ print "Required option %s not found in config file" % (str(e))
+ sys.exit(1)
+
+ if conf.has_key('globals.verbosity'):
+ conf['globals.verbosity'] = int(conf['globals.verbosity'])
+ else:
+ conf['globals.verbosity'] = 0
+
+ if conf.has_key('globals.log_keep_days'):
+ conf['globals.log_keep_days'] = int(conf['globals.log_keep_days'])
+ else:
+ conf['globals.log_keep_days'] = 10
+
+ if not conf.has_key("globals.name"):
+ conf['globals.name'] = 'bases-server'
+
+ if not conf.has_key("globals.syslog_prefix"):
+ conf['globals.syslog_prefix'] = conf['globals.name']
+
+ if len(conf['services']) == 0:
+ print "ERROR: No services defined"
+ sys.exit(1)
+
+ from bases.core.transports import transports
+ tmap = transports.basesTransportsMap
+ trans_list = tmap['server']
+ strans_list = tmap['secure']
+
+ for s in conf['services']:
+ sn = s[0]
+ sd = s[1]
+ try:
+ sd['protocol'] = sd['protocol']
+ sd['interface'] = sd['interface']
+ sd['port'] = int(sd['port'])
+ sd['interface_repo'] = sd['interface_repo']
+ except KeyError, e:
+ print "Service=%s, Required option %s not defined" % (sn, str(e))
+ sys.exit(1)
+
+ if sd['protocol'] not in trans_list and \
+ sd['protocol'] not in strans_list:
+ print "Service:%s,Protocol=%s is not known" % \
+ (sn, sd['protocol'])
+ sys.exit(1)
+
+ if sd['protocol'] in strans_list:
+ if not sd.has_key('sslkey') or \
+ not sd.has_key('sslcert'):
+ print "Service:%s,Protocol=%s requires sslkey and sslcert" % \
+ (sn, sd['protocol'])
+ sys.exit(1)
+ if sd.has_key('verify_peers'):
+ sd['verify_peers'] = bool(sd['verify_peers'])
+ else:
+ sd['verify_peers'] = False
+ if sd['verify_peers'] is True and not sd.has_key('ca'):
+ print "Service=%s,Protocol=%s requires CA to verify_peers" % \
+ (sn, sd['protocol'])
+ sys.exit(1)
+ return conf
+
+
def start_server(conf):
"""
I start the bases-server. I setup the server based on the given
@@ -130,8 +207,7 @@
from twisted.internet import reactor
start_services(conf, reactor)
try:
- util.switchUID(int(conf['globals.userid']),
- int(conf['globals.groupid']))
+ util.switchUID(conf['globals.userid'], conf['globals.groupid'])
except:
pass
@@ -185,9 +261,8 @@
day.
If globals.debug is not set and globals.log is not set, then I
- setup logging to syslog. When logging to syslog, I look for the
- optional globals.syslog_prefix option and use it as a program
- prefix in the syslog instead of the string 'bases-server'.
+ setup logging to syslog. When logging to syslog, I use
+ globals.syslog_prefix as a program prefix.
"""
if conf["globals.debug"] is True:
@@ -212,10 +287,7 @@
log.startLoggingWithObserver(lo.emit, setStdout=True)
else:
lo = None
- if conf.has_key("globals.syslog_prefix"):
- lo = BasesSysLogObserver(conf['globals.syslog_prefix'])
- else:
- lo = BasesSysLogObserver("bases-server")
+ lo = BasesSysLogObserver(conf['globals.syslog_prefix'])
lo.bases_verbosity = conf['globals.verbosity']
log.startLoggingWithObserver(lo.emit, setStdout=True)
@@ -228,23 +300,18 @@
if conf['globals.debug'] is True:
return
- # Check if we have a pidfile and whether we can write to it
- if conf.has_key("globals.pidfile"):
- d = os.path.dirname(conf["globals.pidfile"])
- if not os.access(d, os.W_OK):
- stderr.write("ERROR: need write permissions on pidfile directory")
- stderr.flush()
- sys.exit(1)
- if (os.access(conf["globals.pidfile"], os.F_OK) is True) and \
- (os.access(conf["globals.pidfile"], os.W_OK) is False):
- stderr.write("ERROR: unable to write to pid file (%s)" % \
- (conf["globals.pidfile"]))
- stderr.flush()
- sys.exit(1)
- else:
- stderr.write("ERROR: no pidfile given in %s" % (conf['config_file']))
- stderr.flush()
+ # Check whether we can write to the globals.pidfile
+ d = os.path.dirname(conf["globals.pidfile"])
+ if not os.access(d, os.W_OK):
+ stderr.write("ERROR: need write permissions on pidfile directory")
+ stderr.flush()
sys.exit(1)
+ if os.access(conf["globals.pidfile"], os.F_OK) is True and \
+ os.access(conf["globals.pidfile"], os.W_OK) is False:
+ stderr.write("ERROR: unable to write to pid file (%s)" % \
+ (conf["globals.pidfile"]))
+ stderr.flush()
+ sys.exit(1)
# Fork #1
pid = 0
@@ -282,11 +349,9 @@
I iterate through the defined services and configure a listener on
each of them..
"""
- if len(conf['services']) == 0:
- print "ERROR: No services defined"
- sys.exit(0)
-
from bases.services import objrepo, directory
+ from bases.core.transports import transports
+ tmap = transports.basesTransportsMap
b = broker.BasesBroker()
p = broker.BasesAsyncPoser()
@@ -296,92 +361,42 @@
for s in conf["services"]:
sname = s[0]
desc = s[1]
- if desc['protocol'] == "sbbtrans":
- start_sbbtrans(reactor, conf, desc, sname, b, p)
- elif desc['protocol'] == "bbtrans":
- start_bbtrans(reactor, conf, desc, sname, b, p)
- else:
- print "ERROR: Unimplemented protocol %s" % (desc['protocol'])
- sys.exit(1)
- return
+ secure = False
+ proto = desc['protocol']
+ ctxt = None
-
-def start_sbbtrans(reactor, conf, desc, sname, broker, poser):
- ctx = makeSSLContext(desc['sslkey'], desc['sslcert'], desc['ca'])
- from bases.core.transports import bbtrans
- f = bbtrans.BBTransServerFactory(sname, broker, poser,
- Debug=conf['globals.debug'])
- reactor.listenSSL(int(desc['port']), f, ctx)
+ # Check whether the given protocol is one of the secure
+ # protocols. i.e. requires Digital Certificate
+ # verification
+ if proto in tmap['secure']:
+ secure = True
+ # Read the key, certificate and CA stuff
+ fd = open(desc['sslkey'],'r')
+ key_buff = fd.read()
+ fd.close()
+ fd = open(desc['sslcert'],'r')
+ cert_buff = fd.read()
+ fd.close()
+ ca_buff = None
+ if desc['verify_peer'] is True:
+ fd = open(desc['ca'],'r')
+ ca_buff = fd.read()
+ fd.close()
-def start_bbtrans(reactor, conf, desc, sname, broker, poser):
- from bases.core.transports import bbtrans
- f = bbtrans.BBTransServerFactory(sname, broker, poser,
- Debug=conf['globals.debug'])
- reactor.listenTCP(int(desc['port']), f)
+ ctxt = transports.makeSSLContext(key_buff, cert_buff,
+ verify=desc['verify_peer'],
+ CA=ca_buff)
+ proto = tmap['secure'][proto]
+ ffactory = tmap['server'][proto]
+ f = ffactory(sname, b, p, Debug=conf['globals.debug'])
+ if secure is True:
+ reactor.listenSSL(desc['port'], f, ctxt)
+ else:
+ reactor.listenTCP(desc['port'], f)
-def makeSSLContext(Key, CA):
- '''Returns an ssl Context Object
- @param myKey a pem formated key and certifcate with for my current
- host the other end of this connection must have the cert
- from the CA that signed this key
-
- @param trustedCA a pem formated certificat from a CA you trust you
- will only allow connections from clients signed by this CA
- and you will only allow connections to a server signed by
- this CA
- '''
-
- # our goal in here is to make a SSLContext object to pass to
- # connectSSL or listenSSL
-
- # Why these functioins... Not sure...
- fd = open(Key,'r')
- theCert = ssl.PrivateCertificate.loadPEM(fd.read())
- fd.close()
- fd = open(CA,'r')
- theCA = ssl.Certificate.loadPEM(fd.read())
- fd.close()
- ctx = theCert.options(theCA)
-
- # Now the options you can set look like Standard OpenSSL Library
- # options
-
- # The SSL protocol to use, one of SSLv23_METHOD, SSLv2_METHOD,
- # SSLv3_METHOD, TLSv1_METHOD. Defaults to TLSv1_METHOD.
- ctx.method = ssl.SSL.TLSv1_METHOD
-
- # If True, verify certificates received from the peer and fail the
- # handshake if verification fails. Otherwise, allow anonymous
- # sessions and sessions with certificates which fail validation.
- ctx.verify = True
-
- # Depth in certificate chain down to which to verify.
- ctx.verifyDepth = 1
-
- # If True, do not allow anonymous sessions.
- ctx.requireCertification = True
-
- # If True, do not re-verify the certificate on session resumption.
- ctx.verifyOnce = True
-
- # If True, generate a new key whenever ephemeral DH parameters are
- # used to prevent small subgroup attacks.
- ctx.enableSingleUseKeys = True
-
- # If True, set a session ID on each context. This allows a
- # shortened handshake to be used when a known client reconnects.
- ctx.enableSessions = True
-
- # If True, enable various non-spec protocol fixes for broken SSL
- # implementations.
- ctx.fixBrokenPeers = False
-
- return ctx
-
-
## Lifted off twisted's code
def runWithHotshot(conf, reactor):
"""Run reactor under hotshot profiler."""
Modified: trunk/src/server.ini
===================================================================
--- trunk/src/server.ini 2009-02-26 07:06:33 UTC (rev 6)
+++ trunk/src/server.ini 2009-02-26 07:09:40 UTC (rev 7)
@@ -49,6 +49,7 @@
port=10001
# SSL options
+verify_peers=True
ca=/etc/bases-server/CA.pem
sslkey=/etc/bases-server/bases-SSL.key
sslcert=/etc/bases-server/bases-SSL.crt
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <joe...@us...> - 2009-02-26 07:06:42
|
Revision: 6
http://bases.svn.sourceforge.net/bases/?rev=6&view=rev
Author: joe_steeve
Date: 2009-02-26 07:06:33 +0000 (Thu, 26 Feb 2009)
Log Message:
-----------
Support to use SHA-1 hashes as MsgIDs instead of simple integers.
Object repository fairly complete. Garbage collection remains. The
code is not tested. Could be severly broken.
Modified Paths:
--------------
trunk/ChangeLog
trunk/TODO
trunk/bases/core/broker.py
trunk/bases/core/component.py
trunk/bases/core/proxy.py
trunk/bases/core/transports/bbtrans.py
trunk/bases/core/transports/transports.py
trunk/bases/services/directory.py
trunk/bases/services/objrepo.py
trunk/src/bases-server
Modified: trunk/ChangeLog
===================================================================
--- trunk/ChangeLog 2009-02-26 07:03:02 UTC (rev 5)
+++ trunk/ChangeLog 2009-02-26 07:06:33 UTC (rev 6)
@@ -1,3 +1,17 @@
+2008-01-22 Joe Steeve <joe...@gm...>
+
+ * bases/services/directory.py (BasesComponentDirectory.__init__):
+ the component dir registers with the obj-repo.
+
+ * bases/services/objrepo.py (BasesObjectRepository): added most of
+ the needed methods. The garbage collection code should be put here.
+
+ * bases/core/broker.py (BasesBroker): converted MsgIDs to be
+ sha-hash rather than ints.
+ (BasesBroker.__init__): Hard constraints that incoming transports
+ are different from outgoing transports. A connection drop will
+ cause relevant objects to be destroyed.
+
2008-01-19 Joe Steeve <joe...@gm...>
* bases/core/transports/bbtrans.py (_BBTransProtocol): cleaned up
Modified: trunk/TODO
===================================================================
--- trunk/TODO 2009-02-26 07:03:02 UTC (rev 5)
+++ trunk/TODO 2009-02-26 07:06:33 UTC (rev 6)
@@ -1,5 +1,9 @@
* Broker related stuff
-- send 'service-name' to getObject of object-repo
- The MsgId should be a tuple = (id(broker), timestamp, number)
+* Things to verify
+
+- Does the logger rotate the logs?
+- Does the privilege dropping code work?
+
Modified: trunk/bases/core/broker.py
===================================================================
--- trunk/bases/core/broker.py 2009-02-26 07:03:02 UTC (rev 5)
+++ trunk/bases/core/broker.py 2009-02-26 07:06:33 UTC (rev 6)
@@ -5,7 +5,15 @@
from twisted.internet import defer, error, protocol
from datetime import datetime
+shafunc = None
+try:
+ import hashlib
+ shafunc = hashlib.sha512
+except:
+ import sha
+ shafunc = sha.sha
+
## Order of values in the message-list
BASES_MSG_MSGTYPE = 0
BASES_MSG_MSGID = 1
@@ -67,9 +75,9 @@
int MsgType: This says whether a message is a 'method-call' or a
'method-response'.
- int MsgID: An ID uniquely identifying a message. This ID is used
- to map a 'method-response' to its 'method-call'. This is used by
- the BasesBroker to keep track of messages and their respective
+ string MsgID: An ID uniquely identifying a message. This ID is
+ used to map a 'method-response' to its 'method-call'. This is used
+ by the BasesBroker to keep track of messages and their respective
replies.
int ObjID: An ID that uniquely identifies an object to which the
@@ -87,7 +95,7 @@
'[MsgType, MsgID, ResponseObject]'
- int MsgID: The MsgID for which this is a response.
+ string MsgID: The MsgID for which this is a response.
ResponseObject: The object that was returned as a response.
"""
@@ -96,99 +104,90 @@
in_method_calls={} # list of tuples of (location, msgid, time-stamp)
out_method_calls={} # list of tuples of (location, msgid,
# time-stamp, deferred)
- next_msgid=1
- # A reference to the object repository
- objrepo = None
-
def __init__(self):
- pass
+ self.in_MethodCalls = {}
+ self.out_MethodCalls = {}
+ self.in_Transports = {}
+ self.out_Transports = {}
+ self.ObjRepo = None
+
+
+ def processInMessage(self, dmesg, trans):
+ """I process a given message object. This method is called
+ from a bases-transport. The message is either a
+ BASES_METHOD_CALL or a BASES_METHOD_RESPONSE.
- def processInMessage(self, dmesg, location):
- """I process a given message object. The message is either a
- BASES_METHOD_CALL or a BASES_METHOD_RESPONSE
-
-
@param dmesg: a message object from the transport-protocol
- @param tuple location: A tuple that refers to a network
- location. (Protocol, Host, Port)
+ @param trans: a bases-transport object that has called us
"""
if dmesg[BASES_MSG_TYPE] == BASES_METHOD_CALL:
- # We have a incoming method-call.
- self._handleInMethodCall(dmesg,location)
+ self._handleInMethodCall(dmesg,trans)
elif dmesg[BASES_MSG_TYPE] == BASES_METHOD_RESPONSE:
- # We have a response for a method that we called.
- self._handleInMethodResponse(dmesg,location)
+ self._handleOutMethodResponse(dmesg,trans)
else:
raise BasesBrokerError \
("Unknown MsgType=%d in message" % (dmesg[0]))
- def _handleInMethodCall(self, dmesg, location):
+ def _handleInMethodCall(self, dmesg, trans):
"""
- I handle an incoming method-call. I lookup the requested
- object, and do the method call
+ I handle an incoming MethodCall. I lookup the requested
+ object, and do the method call.
- @param dmesg: The message list from the transport
+ @param dmesg: The message from the transport
- @param location: A location tuple of the source of the message
+ @param trans: The bases-transport object that got this
+ incoming message.
"""
- for l, MsgId, ts in in_method_calls:
- if location == l and dmesg[BASES_MSG_MSGID] == MsgId:
- raise BasesBrokerError \
- ("Duplicate MsgID=%d from %s:%s:%d" %
- (dmesg[BASES_MSG_MSGID], \
- BASES_TRANSPORTS_STR[location[0]],\
- location[1], location[2]))
-
- # TBD: We need to check for a circular loop here and log it. I
- # dont think we should stop circular calls completely. Please
- # contend on this.
- # Get the object from the 'object repository'
- o = self.objrepo.getObject(dmesg[BASES_MSG_OBJID], location)
- # Get the method-call started
+ # We generate an internal ID to index this message. We cannot
+ # rely on the uniqueness of the MsgID that was provided by the
+ # remote server.
+ mid = ""
+ while True:
+ mid_s = "%s%s%s%s" % (id(trans), dmesg[BASES_MSG_MSGID],
+ datetime.utcnow(), os.urandom(4))
+ mid = shafunc(mid_s).hexdigest()
+ if not self.in_MethodCalls.has_key(mid):
+ break
+
+ # Get the object from the 'object repository' and get the
+ # MethodCall started.
+ o = self.ObjRepo.getObject(dmesg[BASES_MSG_OBJID])
d = o.callMethod(dmesg[BASES_MSG_METHODNAME],
dmesg[BASES_MSG_ARGS], dmesg[BASES_MSG_KWARGS])
- in_method_calls.append((location, dmesg[BASES_MSG_MSGID],
- datetime.utcnow()))
- d.addCallback(self._handleOutMethodResponse, location,
- dmesg[BASES_MSG_MSGID])
+ # Add message info to in_MethodCalls
+ self.in_MethodCalls[mid] = (dmesg[BASES_MSG_MSGID], trans,
+ datetime.utcnow())
+ # Add mid to in_Transports
+ if not self.in_Transports.has_key(id(trans)):
+ self.in_Transports[id(trans)] = []
+ self.in_Transports[id(trans)].append(mid)
+ d.addCallback(self._handleInMethodResponse, mid)
- def _handleOutMethodResponse(self, response, location, MsgId):
+ def _handleInMethodResponse(self, response, mid):
"""
- I handle the result that is obtained when a remote component
- has called an object hosted on this bases-server.
+ I handle the response for an incoming MethodCall. This
+ response should be sent to the remote bases-server that called
+ us.
@param object response: The result that is obtained from the
method call.
- @param tuple location: The location-tuple
-
- @param int MsgID: The ID of the message for which this is a
- response.
+ @param string mid: The internal-id for the incoming message.
"""
- found = False
- for m in in_method_calls:
- l, _MsgId, ts = m
- if location == l and dmesg[BASES_MSG_MSGID] == MsgId:
- found = True
- dmesg = [BASES_METHOD_RESPONSE, MsgID, response]
- t = self.getTransport(location)
- t.queueMsg(dmesg)
- in_method_calls.remove(m)
- break
-
- if found is not True:
- raise BasesBrokerError \
- ("MsgID=%d targetted at %s:%s:%d is not in in_method_calls" %
- (MsgID, BASES_TRANSPORTS_STR[location[0]],\
- location[1], location[2]))
-
+ MsgID, trans, ts = self.in_MethodCalls[mid]
+ dmesg = [BASES_METHOD_RESPONSE, MsgID, response]
+ try:
+ trans.queueMsg(dmesg)
+ finally:
+ self.in_Transports[id(trans)].remove(mid)
+ del(self.in_MethodCalls[mid])
def callRemote(self, location, ObjID, MethodName, *args, **kwargs):
@@ -209,50 +208,51 @@
@param dict kwargs: A dict of keyword arguments
"""
-
- if self.Debug is True:
- for m in out_method_calls:
- if m[1] == next_msgid:
- raise BasesBrokerError \
- ("Inconsistent next_msgid-%d" % (next_msgid))
-
- msg = [BASES_METHOD_CALL, next_msgid, ObjectID, MethodName,
- args, kwargs]
t = self.getTransport(location)
d = defer.Deferred()
- t.queueMsg(call_list)
- out_method_calls.append((location, next_msgid, datetime.utcnow(), d))
+ # Generate a unique MsgID
+ MsgID = ""
+ while True:
+ MsgID_s = "%s%s%s%s" % (id(d), ObjID, datetime.utcnow(),
+ os.urandom(4))
+ MsgID = shafunc(MsgID_s).hexdigest()
+ if not self.out_MethodCalls.has_key(MsgID):
+ break
+
+ msg = [BASES_METHOD_CALL, MsgID, ObjectID, MethodName, args, kwargs]
+ t.queueMsg(msg)
+ # Book-keeping
+ self.out_MethodCalls[MsgID] = (d, datetime.utcnow())
+ if not self.out_Transports.has_key(id(t)):
+ self.out_Transports[id(t)] = []
+ self.out_Transports[id(t)].append(MsgID)
return d
- def _handleInMethodResponse(self, dmesg, location):
+ def _handleOutMethodResponse(self, dmesg, trans):
"""
- I handle an incomin method-response (A response to a
- method-call that was done from this bases-server). I lookup
- the deferred associated with the message, and fire it with the
- provided response.
+ I handle the reponse for an outgoing MethodCall from this
+ bases-server. I lookup the deferred associated with the
+ message, and fire it with the provided response.
@param dmesg: The message list from the transport
- @param location: A location tuple of the source of the message
+ @param trans: The bases-transport that got this message
"""
- found = False
- for m in out_method_calls:
- if location and dmesg[BASES_MSG_MSGID] in m:
- found = True
- try:
- m[3].Callback(dmesg[BASES_MSG_RESPONSE])
- finally:
- out_method_calls.remove(m)
- break
-
- if found != True:
+ mref = None
+ try:
+ mref = self.out_MethodCalls[dmesg[BASES_MSG_MSGID]]
+ except:
raise BasesBrokerError \
- ("Unexpected response to MsgId=%d from Location: %s:%s:%d" %
- (dmesg[BASES_MSG_MSGID], BASES_TRANSPORTS_STR[location[0]],\
- location[1], location[2]))
+ ("Unexpected response to MsgId=%d from %s" %
+ (dmesg[BASES_MSG_MSGID], trans.remoteAddr))
+ try:
+ mref[0].Callback(dmesg[BASES_MSG_RESPONSE])
+ finally:
+ del(self.out_MethodCalls[dmesg[BASES_MSG_MSGID]])
+ self.out_Transports[id(trans)].remove(dmesg[BASES_MSG_MSGID])
def getTransport(self, LocationTuple):
@@ -274,6 +274,23 @@
else:
raise BasesNotImplemented, "TBD:obtaining a transport automagically"
+
+ def cleanupTransReferences(self, trans, server=False):
+ """
+ I should be called when a transport is dying to clear out its
+ references and do cleanup job.
+ """
+ if server is True:
+ # Should clear up all in-coming related book-keeping
+ pass
+ else:
+ # Should clear up all out-going related book-keeping
+ pass
+ #TBD: cleanup references of the transport from the in_MethodCalls
+ #TBD: cleanup references of the transport from the in_Transports
+ pass
+
+
class BasesAsyncPoser:
pass
Modified: trunk/bases/core/component.py
===================================================================
--- trunk/bases/core/component.py 2009-02-26 07:03:02 UTC (rev 5)
+++ trunk/bases/core/component.py 2009-02-26 07:06:33 UTC (rev 6)
@@ -3,29 +3,51 @@
BASES_STATEFUL_COMPONENT = 2
BASES_STATEFULLONGLIVED_COMPONENT = 3
+class BasesComponentError(Exception):
+ pass
+
+
class BasesComponent:
def __init__(self):
- pass
+ self.componentInit()
- def createComponent(self):
- pass
+ def componentInit(self):
+ raise BasesComponentError("componentInit: Not implemented")
- def destroyComponent(self):
- pass
+ def __del__(self):
+ self.componentDel()
+ def componentDel(self):
+ raise BasesComponentError("componentDel: Not implemented")
+
+
class BasesComponent_Stateless(BasesComponent):
def __init__(self):
self.ComponentType = BASES_STATELESS_COMPONENT
+ BasesComponent.__init__(self)
+ def __del__(self):
+ BasesComponent.__del__(self)
+
+
class BasesComponent_Stateful(BasesComponent):
def __init__(self):
self.ComponentType = BASES_STATEFUL_COMPONENT
+ BasesComponent.__init__(self)
+ def __del__(self):
+ BasesComponent.__del__(self)
+
+
class BasesComponent_StatefulLongLived(BasesComponent):
def __init__(self):
self.ComponentType = BASES_STATEFULLONGLIVED_COMPONENT
+ BasesComponent.__init__(self)
+ def __del__(self):
+ BasesComponent.__del__(self)
+
class BasesComponentServerProxy:
"""A class to create proxies to remote BasesComponents"""
Modified: trunk/bases/core/proxy.py
===================================================================
--- trunk/bases/core/proxy.py 2009-02-26 07:03:02 UTC (rev 5)
+++ trunk/bases/core/proxy.py 2009-02-26 07:06:33 UTC (rev 6)
@@ -1,9 +1,10 @@
-class BasesComponentServerProxyError(Exception):
+
+class BasesComponentProxyError(Exception):
pass
-class BasesComponentClientProxy:
+class BasesComponentProxy:
"""
I behave like a client that is calling a component object's
methods.
Modified: trunk/bases/core/transports/bbtrans.py
===================================================================
--- trunk/bases/core/transports/bbtrans.py 2009-02-26 07:03:02 UTC (rev 5)
+++ trunk/bases/core/transports/bbtrans.py 2009-02-26 07:06:33 UTC (rev 6)
@@ -55,7 +55,7 @@
return result
-class _BBTransProtocol(protocol.Protocol):
+class _BBTransProtocol(transports._BasesTransport):
"""
The bases binary transport (bbtrans) protocol.
"""
@@ -69,20 +69,15 @@
self.Debug = False
self._Msg_TransitState = STATE_GET_HOLA
self._Buffer = None
- # _ClientSide should be configured by the Factory that is
- # creating me. I'll initiate an Init based on this.
- self._ClientSide = True
+ self._ServerSide = True
+ transports._BasesTransport.__init__(self)
def connectionMade(self):
- if self._ClientSide is True:
+ if self._ServerSide is True:
self.sendInit()
- def connectionLost(self,reason):
- pass
-
-
def dataReceived(self,data):
"""I recieve data and break them into messages. The messages
are then delivered to the broker for processing.
@@ -119,7 +114,7 @@
emsg = self._InMsgQueue.pop(0)
try:
dmsg = rencode.loads(emsg)
- self.factory.basesBroker.processInMessage(dmesg)
+ self.factory.basesBroker.processInMessage(dmesg, self)
self._ErrTolerance = 0
except Exception, e:
log.msg(3, "Exception while processing _InMsgQueue")
@@ -339,6 +334,8 @@
def buildProtocol(self, addr):
bb = _BBTransProtocol()
bb.factory = self
+ bb.remoteAddr = addr
+ bb._ServerSide = True
bb.Debug = self.Debug
return bb
@@ -347,6 +344,6 @@
def buildProtocol(self, addr):
bb = _BBTransProtocol()
bb.factory = self
- bb._ClientSide = True
+ bb._ServerSide = False
bb.Debug = self.Debug
return bb
Modified: trunk/bases/core/transports/transports.py
===================================================================
--- trunk/bases/core/transports/transports.py 2009-02-26 07:03:02 UTC (rev 5)
+++ trunk/bases/core/transports/transports.py 2009-02-26 07:06:33 UTC (rev 6)
@@ -1,5 +1,6 @@
from twisted.internet import protocol
+from twisted.python import log
class _BasesServerFactory(protocol.ServerFactory):
def __init__(self, sname, broker, poser, Debug=False):
@@ -14,3 +15,11 @@
self.Debug = Debug
+class _BasesTransport(protocol.Protocol):
+ def __init__(self):
+ pass
+
+ def connectionLost(self,reason):
+ log.msg(3, str(reason))
+ self.factory.basesBroker.cleanupTransReferences(self,
+ server=self._ServerSide)
Modified: trunk/bases/services/directory.py
===================================================================
--- trunk/bases/services/directory.py 2009-02-26 07:03:02 UTC (rev 5)
+++ trunk/bases/services/directory.py 2009-02-26 07:06:33 UTC (rev 6)
@@ -9,9 +9,13 @@
directory = []
directory_lock = thread.allocate_lock()
+
+ def __init__(self, conf, ObjRepo):
+ self.srvConf = conf
+ self.ObjRepo = ObjRepo
+ # Register myself with the object-broker
+ self.ObjRepo.addStaticObject("1", self, None)
- def __init__(self):
- pass
def getComponentURI(self, ComponentName):
""" Get the URI of a given component.
Modified: trunk/bases/services/objrepo.py
===================================================================
--- trunk/bases/services/objrepo.py 2009-02-26 07:03:02 UTC (rev 5)
+++ trunk/bases/services/objrepo.py 2009-02-26 07:06:33 UTC (rev 6)
@@ -1,5 +1,107 @@
"""Provides the object-repository service for a bases-server """
-class BasesObjectRepository:
+shafunc = None
+try:
+ import hashlib
+ shafunc = hashlib.sha512
+except:
+ import sha
+ shafunc = sha.sha
+
+from datetime import datetime
+
+
+class BasesObjRepoError(Exception):
pass
+
+class BasesObjectRepository:
+ """
+ - Should keep a database of alive objects.
+
+ TBD: Garbage collection. Should hook a method into twisted's
+ calllater and get it to do cleanup.
+
+ - Stateless objects: Need not keep track of who created this
+ object. Should verify whether this object is published by a
+ given service.
+
+ - Statefull objects: Should keep track of the client who created
+ the object (location). Should keep track of which service the
+ object is published on. Cannot be garbage-collected. The object
+ can timeout and kill itself.
+
+ """
+
+ def __init__(self, Broker):
+ self.Broker = Broker
+ self.ObjDB = {}
+ self.Broker.ObjRepo = self
+
+
+ def getObject(self, ObjID):
+ """
+ I return an object from the ObjDB with the given ObjID
+ """
+
+ try:
+ o = self.ObjDB[ObjID]
+ return o(0)
+
+ except KeyError:
+ raise BasesObjRepoError("Bad object-id")
+
+
+ def addObject(self, Object, OwnerID):
+ """
+ I put the given object into the ObjDB and return an ObjID
+
+ @param Object: The object that should be stored
+
+ @param CallerID: A string that identifies the owner. Should be
+ the KeyID of the client application when available. Else this
+ should be generated by the protocol-factory.
+ """
+
+ m = ""
+ # We make sure we dont accidentally kill another
+ # object. Checking for collisions is extreme paranoia.
+ while True:
+ s = "%s%s%s" % (id(Object), datetime.utcnow(), os.urandom(4))
+ m = shafunc(s).hexdigest()
+ if not self.ObjDB.has_key(m):
+ break
+ o = (Object, datetime.utcnow(), OwnerID)
+ self.ObjDB[m] = o
+ return m
+
+
+ def addStaticObject(self, ObjID, Object, OwnerID):
+ """
+ I let the caller choose an object-id for the given
+ object. This is not for normal use. This is for internal use.
+ """
+ if self.ObjDB.has_key(ObjID):
+ raise BasesObjRepoError("ObjectID=%s already exists" % (ObjID))
+ o = (Object, datetime.utcnow(), OwnerID)
+ self.ObjDB[ObjID] = o
+
+
+ def removeObject(self, ObjID):
+ del(self.ObjDB[ObjID])
+
+
+
+ def updateObject(self, ObjID):
+ """
+ I recompute a new hash key for the given object and refile it
+ in the DB.
+ """
+
+ o = self.ObjDB[ObjectID]
+ s = "%s%s%s" % (id(o(0)), datetime.utcnow(), os.urandom(4))
+ o1 = (o(0), datetime.utcnow(), o(2))
+ m = shafunc(s).hexdigest()
+ del(self.ObjDB[ObjID])
+ self.ObjDB[m] = o1
+ return m
Modified: trunk/src/bases-server
===================================================================
--- trunk/src/bases-server 2009-02-26 07:03:02 UTC (rev 5)
+++ trunk/src/bases-server 2009-02-26 07:06:33 UTC (rev 6)
@@ -6,17 +6,11 @@
from twisted.python import log, logfile, syslog
from bases.core import broker
-from bases.services.directory import BasesComponentDirectory
-from bases.services.objrepo import BasesObjectRepository
-
stdout = sys.stdout
stdin = sys.stdin
stderr = sys.stderr
-ourBroker = None
-ourAsyncPoser = None
-
def main():
## First lets parse the options
@@ -146,7 +140,8 @@
runWithHotshot(conf, reactor)
else:
reactor.run()
- os.unlink(conf['globals.pidfile'])
+ if conf["globals.debug"] is False:
+ os.unlink(conf['globals.pidfile'])
class BasesError(Exception):
@@ -291,8 +286,13 @@
print "ERROR: No services defined"
sys.exit(0)
+ from bases.services import objrepo, directory
+
b = broker.BasesBroker()
p = broker.BasesAsyncPoser()
+ obj_repo = objrepo.BasesObjectRepository(b)
+ comp_dir = directory.BasesComponentDirectory(conf, obj_repo)
+
for s in conf["services"]:
sname = s[0]
desc = s[1]
@@ -382,18 +382,6 @@
return ctx
-def get_broker():
- if ourBroker == None:
- ourBroker = broker.BasesBroker()
- return ourBroker
-
-
-def get_asyncposer():
- if ourAsyncPoser == None:
- ourAsyncPoser = broker.BasesAsyncPoser()
- return ourAsyncPoser
-
-
## Lifted off twisted's code
def runWithHotshot(conf, reactor):
"""Run reactor under hotshot profiler."""
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <joe...@us...> - 2009-02-26 07:03:08
|
Revision: 5
http://bases.svn.sourceforge.net/bases/?rev=5&view=rev
Author: joe_steeve
Date: 2009-02-26 07:03:02 +0000 (Thu, 26 Feb 2009)
Log Message:
-----------
Cleanup in bbtrans protocol to use logging
Modified Paths:
--------------
trunk/ChangeLog
trunk/bases/core/transports/bbtrans.py
trunk/src/bases-server
Modified: trunk/ChangeLog
===================================================================
--- trunk/ChangeLog 2009-02-26 07:00:38 UTC (rev 4)
+++ trunk/ChangeLog 2009-02-26 07:03:02 UTC (rev 5)
@@ -1,3 +1,12 @@
+2008-01-19 Joe Steeve <joe...@gm...>
+
+ * bases/core/transports/bbtrans.py (_BBTransProtocol): cleaned up
+ console printing with logging. tracebacks are printed only when in
+ debug mode.
+
+ * src/bases-server: removed Twisted services/application based
+ startup
+
2008-01-18 Joe Steeve <joe...@gm...>
* src/bases-server: multi-level logging, daemonizing, shedding
Modified: trunk/bases/core/transports/bbtrans.py
===================================================================
--- trunk/bases/core/transports/bbtrans.py 2009-02-26 07:00:38 UTC (rev 4)
+++ trunk/bases/core/transports/bbtrans.py 2009-02-26 07:03:02 UTC (rev 5)
@@ -96,29 +96,22 @@
# Parse the stream and break it into messages
while bytes_left != 0:
if self._Msg_TransitState == STATE_GET_HOLA:
- if self.Debug is True:
- print "BBTrans: _Msg_TransitState == STATE_GET_HOLA"
+ log.msg(3, "_Msg_TransitState == STATE_GET_HOLA")
pos = self._handleHola(data,pos)
elif self._Msg_TransitState == STATE_GET_MSG:
- if self.Debug is True:
- print "BBTrans: _Msg_TransitState == STATE_GET_MSG"
+ log.msg(3, "_Msg_TransitState == STATE_GET_MSG")
pos = self._handleMsg(data,pos)
elif self._Msg_TransitState == STATE_GET_INIT:
- if self.Debug is True:
- print "BBTrans: _Msg_TransitState == STATE_GET_INIT"
+ log.msg(3, "_Msg_TransitState == STATE_GET_INIT")
pos = self._handleInit(data,pos)
elif self._Msg_TransitState == STATE_GET_INITRESPONSE:
- if self.Debug is True:
- print "BBTrans: "\
- "_Msg_TransitState == STATE_GET_INITRESPONSE"
+ log.msg(3, "_Msg_TransitState == STATE_GET_INITRESPONSE")
pos = self._handleInitR(data,pos)
else:
- print "BBTrans: ERROR. Undefined state = %d" % \
- (self._Msg_TransitState)
+ log.msg(1, "ERROR: Undefined state = %d" % \
+ (self._Msg_TransitState))
bytes_left = data_len - pos
- if self.Debug is True:
- print "BBTrans: processed %d bytes, left %d" % \
- (pos, bytes_left)
+ log.msg(3,"processed %d bytes, left %d" % (pos, bytes_left))
# Process the messages that we have recieved.
@@ -126,28 +119,26 @@
emsg = self._InMsgQueue.pop(0)
try:
dmsg = rencode.loads(emsg)
- self.broker.processInMessage(dmesg)
+ self.factory.basesBroker.processInMessage(dmesg)
self._ErrTolerance = 0
except Exception, e:
+ log.msg(3, "Exception while processing _InMsgQueue")
+ log.msg(3, str(e))
if self.Debug is True:
- print "Exception while processing _InMsgQueue"
- traceback.print_exc()
- print str(e)
+ s = traceback.format_exc()
+ log.msg(3, s)
self._ErrTolerance += 1
- # TBD: Log this error occurance
if self._ErrTolerance > BBTRANS_ERROR_TOLERANCE:
- # TBD: Log an error
+ log.msg(1, "ERROR: _ErrTolerance exceeded, closing")
self.transport.loseConnection()
except Exception, e:
- #TBD: log an error
- # We just close the connection for all protocol related
- # errors
- if self.Debug is True:
- print "Exception in dataReceived"
- traceback.print_exc()
- print str(e)
+ log.msg(3, "Exception in DataRecieved")
+ log.msg(3, str(e))
+ if self.Debug is True:
+ s = traceback.format_exc()
+ log.msg(3, s)
self.transport.loseConnection()
@@ -174,7 +165,7 @@
@return tuple(int, int): A tuple of (length, type). (1) length
of the incoming message and (2) type of the incoming message"""
- print "Checking for hOla"
+ log.msg(3, "Checking for hOla")
try:
hola,length,msgtype = struct.unpack(BBTRANS_HOLA_FMT,data)
except:
Modified: trunk/src/bases-server
===================================================================
--- trunk/src/bases-server 2009-02-26 07:00:38 UTC (rev 4)
+++ trunk/src/bases-server 2009-02-26 07:03:02 UTC (rev 5)
@@ -297,25 +297,27 @@
sname = s[0]
desc = s[1]
if desc['protocol'] == "sbbtrans":
- start_sbbtrans(reactor, desc, sname, b, p)
+ start_sbbtrans(reactor, conf, desc, sname, b, p)
elif desc['protocol'] == "bbtrans":
- start_bbtrans(reactor, desc, sname, b, p)
+ start_bbtrans(reactor, conf, desc, sname, b, p)
else:
print "ERROR: Unimplemented protocol %s" % (desc['protocol'])
sys.exit(1)
return
-def start_sbbtrans(reactor, desc, sname, broker, poser):
+def start_sbbtrans(reactor, conf, desc, sname, broker, poser):
ctx = makeSSLContext(desc['sslkey'], desc['sslcert'], desc['ca'])
from bases.core.transports import bbtrans
- f = bbtrans.BBTransServerFactory(sname, broker, poser)
+ f = bbtrans.BBTransServerFactory(sname, broker, poser,
+ Debug=conf['globals.debug'])
reactor.listenSSL(int(desc['port']), f, ctx)
-def start_bbtrans(reactor, desc, sname, broker, poser):
+def start_bbtrans(reactor, conf, desc, sname, broker, poser):
from bases.core.transports import bbtrans
- f = bbtrans.BBTransServerFactory(sname, broker, poser)
+ f = bbtrans.BBTransServerFactory(sname, broker, poser,
+ Debug=conf['globals.debug'])
reactor.listenTCP(int(desc['port']), f)
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <joe...@us...> - 2009-02-26 07:00:45
|
Revision: 4
http://bases.svn.sourceforge.net/bases/?rev=4&view=rev
Author: joe_steeve
Date: 2009-02-26 07:00:38 +0000 (Thu, 26 Feb 2009)
Log Message:
-----------
Removed Twisted service/application stuff. We really dont need it.
Modified Paths:
--------------
trunk/bases/core/transports/transports.py
trunk/src/bases-server
trunk/src/server.ini
Modified: trunk/bases/core/transports/transports.py
===================================================================
--- trunk/bases/core/transports/transports.py 2009-02-26 06:58:20 UTC (rev 3)
+++ trunk/bases/core/transports/transports.py 2009-02-26 07:00:38 UTC (rev 4)
@@ -2,7 +2,10 @@
from twisted.internet import protocol
class _BasesServerFactory(protocol.ServerFactory):
- def __init__(self, Debug=False):
+ def __init__(self, sname, broker, poser, Debug=False):
+ self.basesService = sname
+ self.basesBroker = broker
+ self.basesPoser = poser
self.Debug = Debug
Modified: trunk/src/bases-server
===================================================================
--- trunk/src/bases-server 2009-02-26 06:58:20 UTC (rev 3)
+++ trunk/src/bases-server 2009-02-26 07:00:38 UTC (rev 4)
@@ -4,8 +4,6 @@
import pprint, os
from twisted.python import log, logfile, syslog
-from twisted.internet import reactor
-from twisted.application import internet, app, service
from bases.core import broker
from bases.services.directory import BasesComponentDirectory
@@ -135,23 +133,26 @@
pp.pprint(conf)
daemonize(conf)
- a = start_services(conf)
+ from twisted.internet import reactor
+ start_services(conf, reactor)
try:
util.switchUID(int(conf['globals.userid']),
int(conf['globals.groupid']))
except:
pass
- app.startApplication(a,False)
log.msg(1, "Starting bases-server [%s]" % (conf['globals.name']))
if conf['globals.profile_hs'] is True:
- runWithHotshot(conf)
+ runWithHotshot(conf, reactor)
else:
reactor.run()
+ os.unlink(conf['globals.pidfile'])
+
class BasesError(Exception):
pass
+
class BasesFileLogObserver(log.FileLogObserver):
bases_verbosity = 0
def emit (self, eventDict):
@@ -281,7 +282,7 @@
return
-def start_services(conf):
+def start_services(conf, reactor):
"""
I iterate through the defined services and configure a listener on
each of them..
@@ -292,39 +293,30 @@
b = broker.BasesBroker()
p = broker.BasesAsyncPoser()
- a = service.Application(conf['globals.name'])
-
for s in conf["services"]:
sname = s[0]
desc = s[1]
- s = None
if desc['protocol'] == "sbbtrans":
- s = start_sbbtrans(desc)
+ start_sbbtrans(reactor, desc, sname, b, p)
elif desc['protocol'] == "bbtrans":
- s = start_bbtrans(desc)
+ start_bbtrans(reactor, desc, sname, b, p)
else:
print "ERROR: Unimplemented protocol %s" % (desc['protocol'])
sys.exit(1)
- s.basesServiceName = sname
- s.basesBroker = b
- s.basesAsyncPoser = p
- s.setServiceParent(a)
- return a
+ return
-def start_sbbtrans(desc):
+def start_sbbtrans(reactor, desc, sname, broker, poser):
ctx = makeSSLContext(desc['sslkey'], desc['sslcert'], desc['ca'])
from bases.core.transports import bbtrans
- f = bbtrans.BBTransServerFactory()
- s = internet.SSLServer(int(desc['port']), f, ctx)
- return s
+ f = bbtrans.BBTransServerFactory(sname, broker, poser)
+ reactor.listenSSL(int(desc['port']), f, ctx)
-def start_bbtrans(desc):
+def start_bbtrans(reactor, desc, sname, broker, poser):
from bases.core.transports import bbtrans
- f = bbtrans.BBTransServerFactory()
- s = internet.TCPServer(int(desc['port']), f)
- return s
+ f = bbtrans.BBTransServerFactory(sname, broker, poser)
+ reactor.listenTCP(int(desc['port']), f)
def makeSSLContext(Key, CA):
@@ -401,7 +393,7 @@
## Lifted off twisted's code
-def runWithHotshot(conf):
+def runWithHotshot(conf, reactor):
"""Run reactor under hotshot profiler."""
try:
import hotshot.stats
Modified: trunk/src/server.ini
===================================================================
--- trunk/src/server.ini 2009-02-26 06:58:20 UTC (rev 3)
+++ trunk/src/server.ini 2009-02-26 07:00:38 UTC (rev 4)
@@ -10,7 +10,7 @@
# the logs are kept for 'log_keep_days'. When there is no logfile
# mentioned, bases logs to syslog. When logging to syslog, bases
# prepends syslog_prefix to the log.
-#log=/tmp/bases-server.log
+log=/tmp/bases-server.log
log_keep_days = 5
# a custom prefix string for this server
syslog_prefix = bases-server
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <joe...@us...> - 2009-02-26 06:58:29
|
Revision: 3
http://bases.svn.sourceforge.net/bases/?rev=3&view=rev
Author: joe_steeve
Date: 2009-02-26 06:58:20 +0000 (Thu, 26 Feb 2009)
Log Message:
-----------
added multiple service example in config file
Modified Paths:
--------------
trunk/src/bases-server
trunk/src/server.ini
Modified: trunk/src/bases-server
===================================================================
--- trunk/src/bases-server 2009-02-26 06:56:36 UTC (rev 2)
+++ trunk/src/bases-server 2009-02-26 06:58:20 UTC (rev 3)
@@ -36,7 +36,7 @@
time.sleep(1)
start_server(conf)
else:
- print "unknown action = %s" % (args[1].strip())
+ print "ERROR: unknown action = %s" % (args[1].strip())
sys.exit(1)
@@ -60,7 +60,7 @@
parser.print_help()
sys.exit(1)
if len(args) != 2:
- parser.error("Not enough arguments (expected atleast 2)")
+ parser.error("ERROR: Not enough arguments (expected atleast 2)")
sys.exit(1)
#TBD: setup sane defaults for config file
@@ -82,7 +82,7 @@
conf["globals.profile_hs"] = options.profile_hs
if options.profile_hs is True:
if options.profile_file == "":
- parser.error("Please provide a file to log the profiler output")
+ parser.error("ERROR: Please provide a file to log the profiler output")
sys.exit(1)
else:
conf["globals.profile_file"] = options.profile_file
@@ -143,9 +143,7 @@
pass
app.startApplication(a,False)
- log.msg(1, "Running the reactor 1")
- log.msg(2, "Running the reactor 2")
- log.msg(3, "Running the reactor 3")
+ log.msg(1, "Starting bases-server [%s]" % (conf['globals.name']))
if conf['globals.profile_hs'] is True:
runWithHotshot(conf)
else:
@@ -289,7 +287,7 @@
each of them..
"""
if len(conf['services']) == 0:
- print "No services defined"
+ print "ERROR: No services defined"
sys.exit(0)
b = broker.BasesBroker()
@@ -305,7 +303,7 @@
elif desc['protocol'] == "bbtrans":
s = start_bbtrans(desc)
else:
- print "Unimplemented protocol %s" % (desc['protocol'])
+ print "ERROR: Unimplemented protocol %s" % (desc['protocol'])
sys.exit(1)
s.basesServiceName = sname
s.basesBroker = b
@@ -408,7 +406,7 @@
try:
import hotshot.stats
except ImportError, e:
- s = "Failed to import module hotshot: %s" % e
+ s = "ERROR: Failed to import module hotshot: %s" % e
log.msg(1,s)
sys.exit(1)
Modified: trunk/src/server.ini
===================================================================
--- trunk/src/server.ini 2009-02-26 06:56:36 UTC (rev 2)
+++ trunk/src/server.ini 2009-02-26 06:58:20 UTC (rev 3)
@@ -40,3 +40,18 @@
# component repo
interface_repo=/etc/bases-server/comprepo
+
+[service:serv2]
+
+# network options
+protocol=bbtrans
+interface=192.168.1.1
+port=10001
+
+# SSL options
+ca=/etc/bases-server/CA.pem
+sslkey=/etc/bases-server/bases-SSL.key
+sslcert=/etc/bases-server/bases-SSL.crt
+
+# component repo
+interface_repo=/etc/bases-server/comprepo
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <joe...@us...> - 2009-02-26 06:56:41
|
Revision: 2
http://bases.svn.sourceforge.net/bases/?rev=2&view=rev
Author: joe_steeve
Date: 2009-02-26 06:56:36 +0000 (Thu, 26 Feb 2009)
Log Message:
-----------
Basic server code and little more niceties
This commit brings the following
(1) A multi-level logger
(2) A daemonizer that sheds privileges
(3) Option and config file parsing
(4) Multiple services startup support
(5) Base work for the larger picture.
Still a long way to go :-S
Added Paths:
-----------
trunk/ChangeLog
trunk/TODO
trunk/bases/
trunk/bases/__init__.py
trunk/bases/app.py
trunk/bases/core/
trunk/bases/core/__init__.py
trunk/bases/core/broker.py
trunk/bases/core/component.py
trunk/bases/core/proxy.py
trunk/bases/core/transports/
trunk/bases/core/transports/__init__.py
trunk/bases/core/transports/bbtrans.py
trunk/bases/core/transports/transports.py
trunk/bases/interface.py
trunk/bases/services/
trunk/bases/services/__init__.py
trunk/bases/services/directory.py
trunk/bases/services/objrepo.py
trunk/bases/synclient/
trunk/bases/synclient/__init__.py
trunk/bases/synclient/app.py
trunk/bases/synclient/broker.py
trunk/bases/synclient/component.py
trunk/bases/synclient/services.py
trunk/bases/synclient/transports/
trunk/bases/synclient/transports/bbtrans.py
trunk/bases/thirdparty/
trunk/bases/thirdparty/__init__.py
trunk/bases/thirdparty/rencode.py
trunk/scratch/
trunk/scratch/ssl-client.py
trunk/scratch/ssl-server.py
trunk/src/
trunk/src/bases-server
trunk/src/server.ini
trunk/test/
trunk/test/test-client.py
trunk/test/test-server.py
trunk/test/test.py
trunk/test/test1.py
Added: trunk/ChangeLog
===================================================================
--- trunk/ChangeLog (rev 0)
+++ trunk/ChangeLog 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,5 @@
+2008-01-18 Joe Steeve <joe...@gm...>
+
+ * src/bases-server: multi-level logging, daemonizing, shedding
+ privileges, starting multiple services.
+
Added: trunk/TODO
===================================================================
--- trunk/TODO (rev 0)
+++ trunk/TODO 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,5 @@
+
+* Broker related stuff
+- send 'service-name' to getObject of object-repo
+- The MsgId should be a tuple = (id(broker), timestamp, number)
+
Added: trunk/bases/app.py
===================================================================
--- trunk/bases/app.py (rev 0)
+++ trunk/bases/app.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,15 @@
+
+from twisted.internet import protocol
+from twisted.internet import reactor
+
+from bases.core.transports import bbtrans
+from bases.core import broker
+
+
+
+def getBasesComponent(compName):
+ """
+ Get a bases component object.
+ """
+
+ pass
Added: trunk/bases/core/broker.py
===================================================================
--- trunk/bases/core/broker.py (rev 0)
+++ trunk/bases/core/broker.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,279 @@
+"""The bases object broker.
+
+This handles messages sent to remote bases-components. It keeps track
+of sent messages, and acts on obtaining a response."""
+
+from twisted.internet import defer, error, protocol
+from datetime import datetime
+
+## Order of values in the message-list
+BASES_MSG_MSGTYPE = 0
+BASES_MSG_MSGID = 1
+BASES_MSG_OBJID = 2
+BASES_MSG_METHODNAME = 3
+BASES_MSG_ARGS = 4
+BASES_MSG_KWARGS = 5
+BASES_MSG_RESPONSE = 2
+
+## Values for the MsgType in a message-list
+BASES_METHOD_CALL = 1
+BASES_METHOD_RESPONSE = 2
+
+## Transport types we support
+BASES_TRANS_BBTRANS = 1
+
+## Convenience strings
+BASES_TRANSPORTS_STR = ["",
+ "bbtrans"]
+
+class BasesBrokerError(Exception):
+ pass
+
+
+class BasesBroker:
+ """
+ The BasesBroker keeps track of messages to/from an Object. It does
+ not care about the details of the message. There are two types of
+ messages that can come to a broker (either from a remote or a
+ local)
+
+ a. BASES_METHOD_CALL: This is a message which initiates a
+ method-call.
+
+ b. BASES_METHOD_RESPONSE: This is a message which holds a
+ 'method-response'.
+
+ Every 'method-call' is completed with a 'method-response'.
+
+ When a local object tries to call a method on a remote object, the
+ broker on the calling side (calling-broker) generates a unique
+ MsgID for the method-call. This MsgID is used purely for message
+ call-response tracking. The calling-broker generates a
+ 'BASES_METHOD_CALL' type message and sends it across to the other
+ end. It also keeps a note of the MsgID. The remote-broker that
+ gets this message, calls the appropriate object's method and gives
+ its response back with a 'BASES_METHOD_RESPONSE' type message. A
+ 'BASES_METHOD_RESPONSE' type message holds the same MsgID that was
+ given in the respective 'BASES_METHOD_CALL' message. This is used
+ by the calling-broker to send the response back to the correct
+ object that is waiting for the response.
+
+ Based on the MsgType, there can be two forms of message-object.
+
+ 1) MsgType = BASES_METHOD_CALL
+
+ '[MsgType, MsgID, ObjID, MethodName, args, kwargs]'
+
+ int MsgType: This says whether a message is a 'method-call' or a
+ 'method-response'.
+
+ int MsgID: An ID uniquely identifying a message. This ID is used
+ to map a 'method-response' to its 'method-call'. This is used by
+ the BasesBroker to keep track of messages and their respective
+ replies.
+
+ int ObjID: An ID that uniquely identifies an object to which the
+ 'method-call' should be sent to. This is used to obtain the
+ correct object from the object repository and deliver the message
+ to it.
+
+ string MethodName: The name of the method that should be called.
+
+ list args: The list of arguments to the method.
+
+ dict kwargs: A dictionary of keyword arguments.
+
+ 2) MsgType = BASES_METHOD_RESPONSE
+
+ '[MsgType, MsgID, ResponseObject]'
+
+ int MsgID: The MsgID for which this is a response.
+
+ ResponseObject: The object that was returned as a response.
+ """
+
+ transports={}
+ in_method_calls={} # list of tuples of (location, msgid, time-stamp)
+ out_method_calls={} # list of tuples of (location, msgid,
+ # time-stamp, deferred)
+ next_msgid=1
+
+ # A reference to the object repository
+ objrepo = None
+
+ def __init__(self):
+ pass
+
+ def processInMessage(self, dmesg, location):
+ """I process a given message object. The message is either a
+ BASES_METHOD_CALL or a BASES_METHOD_RESPONSE
+
+
+ @param dmesg: a message object from the transport-protocol
+
+ @param tuple location: A tuple that refers to a network
+ location. (Protocol, Host, Port)
+ """
+
+ if dmesg[BASES_MSG_TYPE] == BASES_METHOD_CALL:
+ # We have a incoming method-call.
+ self._handleInMethodCall(dmesg,location)
+ elif dmesg[BASES_MSG_TYPE] == BASES_METHOD_RESPONSE:
+ # We have a response for a method that we called.
+ self._handleInMethodResponse(dmesg,location)
+ else:
+ raise BasesBrokerError \
+ ("Unknown MsgType=%d in message" % (dmesg[0]))
+
+
+ def _handleInMethodCall(self, dmesg, location):
+ """
+ I handle an incoming method-call. I lookup the requested
+ object, and do the method call
+
+ @param dmesg: The message list from the transport
+
+ @param location: A location tuple of the source of the message
+ """
+ for l, MsgId, ts in in_method_calls:
+ if location == l and dmesg[BASES_MSG_MSGID] == MsgId:
+ raise BasesBrokerError \
+ ("Duplicate MsgID=%d from %s:%s:%d" %
+ (dmesg[BASES_MSG_MSGID], \
+ BASES_TRANSPORTS_STR[location[0]],\
+ location[1], location[2]))
+
+ # TBD: We need to check for a circular loop here and log it. I
+ # dont think we should stop circular calls completely. Please
+ # contend on this.
+
+ # Get the object from the 'object repository'
+ o = self.objrepo.getObject(dmesg[BASES_MSG_OBJID], location)
+ # Get the method-call started
+ d = o.callMethod(dmesg[BASES_MSG_METHODNAME],
+ dmesg[BASES_MSG_ARGS], dmesg[BASES_MSG_KWARGS])
+ in_method_calls.append((location, dmesg[BASES_MSG_MSGID],
+ datetime.utcnow()))
+ d.addCallback(self._handleOutMethodResponse, location,
+ dmesg[BASES_MSG_MSGID])
+
+
+ def _handleOutMethodResponse(self, response, location, MsgId):
+ """
+ I handle the result that is obtained when a remote component
+ has called an object hosted on this bases-server.
+
+ @param object response: The result that is obtained from the
+ method call.
+
+ @param tuple location: The location-tuple
+
+ @param int MsgID: The ID of the message for which this is a
+ response.
+ """
+
+ found = False
+ for m in in_method_calls:
+ l, _MsgId, ts = m
+ if location == l and dmesg[BASES_MSG_MSGID] == MsgId:
+ found = True
+ dmesg = [BASES_METHOD_RESPONSE, MsgID, response]
+ t = self.getTransport(location)
+ t.queueMsg(dmesg)
+ in_method_calls.remove(m)
+ break
+
+ if found is not True:
+ raise BasesBrokerError \
+ ("MsgID=%d targetted at %s:%s:%d is not in in_method_calls" %
+ (MsgID, BASES_TRANSPORTS_STR[location[0]],\
+ location[1], location[2]))
+
+
+
+ def callRemote(self, location, ObjID, MethodName, *args, **kwargs):
+ """
+ I am called by a BasesComponentServerProxy object. I do a
+ method call to a component-object on a remote bases-server
+
+ @param tuple location: The location tuple
+
+ @param int ObjID: the ID of the component-object on the remote
+ bases-erver.
+
+ @param str MethodName: Name of the method on the object=ObjID to
+ be called on the remote side.
+
+ @param list args: A list of arguments
+
+ @param dict kwargs: A dict of keyword arguments
+
+ """
+
+ if self.Debug is True:
+ for m in out_method_calls:
+ if m[1] == next_msgid:
+ raise BasesBrokerError \
+ ("Inconsistent next_msgid-%d" % (next_msgid))
+
+ msg = [BASES_METHOD_CALL, next_msgid, ObjectID, MethodName,
+ args, kwargs]
+ t = self.getTransport(location)
+ d = defer.Deferred()
+ t.queueMsg(call_list)
+
+ out_method_calls.append((location, next_msgid, datetime.utcnow(), d))
+ return d
+
+
+ def _handleInMethodResponse(self, dmesg, location):
+ """
+ I handle an incomin method-response (A response to a
+ method-call that was done from this bases-server). I lookup
+ the deferred associated with the message, and fire it with the
+ provided response.
+
+ @param dmesg: The message list from the transport
+
+ @param location: A location tuple of the source of the message
+ """
+ found = False
+ for m in out_method_calls:
+ if location and dmesg[BASES_MSG_MSGID] in m:
+ found = True
+ try:
+ m[3].Callback(dmesg[BASES_MSG_RESPONSE])
+ finally:
+ out_method_calls.remove(m)
+ break
+
+ if found != True:
+ raise BasesBrokerError \
+ ("Unexpected response to MsgId=%d from Location: %s:%s:%d" %
+ (dmesg[BASES_MSG_MSGID], BASES_TRANSPORTS_STR[location[0]],\
+ location[1], location[2]))
+
+
+
+ def getTransport(self, LocationTuple):
+ """Get a transport object for the given LocationTuple
+
+ @param LocationTuple: A tuple in the following format
+ (protocol, host/ip, port)
+
+ @rtype: A BasesTransport object that implements the
+ IBasesTransport interface.
+
+ @return: An instance of the appropriate BasesTransport object
+ that should be used to connect to the location mentioned in
+ 'LocationTuple'. If there is an existing connection, then that
+ object is returned.
+ """
+ if self.transports.has_key(LocationTuple):
+ return transports[LocationTuple]
+ else:
+ raise BasesNotImplemented, "TBD:obtaining a transport automagically"
+
+
+class BasesAsyncPoser:
+ pass
Added: trunk/bases/core/component.py
===================================================================
--- trunk/bases/core/component.py (rev 0)
+++ trunk/bases/core/component.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,129 @@
+
+BASES_STATELESS_COMPONENT = 1
+BASES_STATEFUL_COMPONENT = 2
+BASES_STATEFULLONGLIVED_COMPONENT = 3
+
+class BasesComponent:
+ def __init__(self):
+ pass
+
+ def createComponent(self):
+ pass
+
+ def destroyComponent(self):
+ pass
+
+
+class BasesComponent_Stateless(BasesComponent):
+ def __init__(self):
+ self.ComponentType = BASES_STATELESS_COMPONENT
+
+class BasesComponent_Stateful(BasesComponent):
+ def __init__(self):
+ self.ComponentType = BASES_STATEFUL_COMPONENT
+
+class BasesComponent_StatefulLongLived(BasesComponent):
+ def __init__(self):
+ self.ComponentType = BASES_STATEFULLONGLIVED_COMPONENT
+
+
+class BasesComponentServerProxy:
+ """A class to create proxies to remote BasesComponents"""
+
+ def __init__(self, Interface, local, instance, transport=None):
+ """
+ @param Interface: Interface name in the namespace
+
+ @param local: Boolean flag that says whether the proxied
+ object is available locally or remote.
+
+ @param instance: When local=True, this points to the instance
+ of the component. Else, this should be the object-id obtained
+ from the remote bases-server.
+
+ @param transport: When local=False, this points to a
+ BasesTransport object which can be used to connect to the
+ remote bases-server. The provided transport is assumed to be
+ configured with the location where the bases-server exists.
+
+ """
+ self._Interface = Interface
+ self._LocalObject = local
+ self._Object = instance
+ self._Transport = transport
+
+ if local == False and transport == None:
+ raise "Remote components need a Transport"
+
+
+ def __getattr__(self, MethodName):
+ """I simply route all component-method invocations to the
+ MethodCallHandler."""
+
+ self._CalledMethod = MethodName
+ return self.__MethodCallHandler
+
+
+ def __MethodCallHandler(self, *args, **kwargs):
+ # Validate the called method and its parameters against the
+ # published interface.
+ bValidInterface = bases_ValidateInterface(self._Interface,
+ self._CalledMethod,
+ args, kwargs)
+ if bValidInterface is False:
+ raise ""
+ if self._LocalObject is True:
+ m = getattr (self._Object, self._CalledMethod)
+ r = m(*args, **kwargs)
+ else:
+ r = self.__DoRemoteMethod(self._CalledMethod, args, kwargs)
+ return r
+
+
+ def __DoRemoteMethod(self, *args, **kwargs):
+ """I convert the method-call to a list and give it to the
+ transport for the remote call. I block the current thread
+ until I get a response."""
+
+ #TBD:
+
+ pass
+
+
+class BasesComponentClientProxy:
+ def __init__(self):
+
+ pass
+
+
+class BasesComponentFactory:
+ """I create component proxy objects and configure them
+ properly. Any application that wants to use a component should use
+ me to get an instance to the desired component."""
+
+ def __init__(self, ComponentName):
+ """
+ I create a component-proxy instance for the requested component.
+
+ @param ComponentName: The full namespace of a component.
+ """
+
+
+ def _GetLocation(self, ComponentName):
+ """I search the ComponentDirectory to find the location of
+ this component.
+
+ @param ComponentName: The full namespace of a component"""
+ pass
+
+
+ def _GetInterfaceFromRemote(self, ComponentName, Location):
+ """I connect to the remote bases-server given in 'location'
+ and fetch the Interface for the given ComponentName
+
+ @param ComponentName: The full namespace of a component
+
+ @param Location: The location of a given component. Given as
+ an Bases-URL"""
+ pass
+
Added: trunk/bases/core/proxy.py
===================================================================
--- trunk/bases/core/proxy.py (rev 0)
+++ trunk/bases/core/proxy.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,41 @@
+
+class BasesComponentServerProxyError(Exception):
+ pass
+
+
+class BasesComponentClientProxy:
+ """
+ I behave like a client that is calling a component object's
+ methods.
+ """
+
+ def __init__(self):
+ pass
+
+
+ def callMethod(self, MethodName, args, kwargs):
+ """
+ I invoke a method of name 'MethodName' with the given 'args'
+ and 'kwargs' for the configured component. Before calling the
+ method I do a validation of the parameters with the published
+ interface. Once the parameters are validated, I start the
+ appropriate method in a thread, and return a deferred back.
+
+ On completion of the method (in the thread), I call the
+ callbacks on the deferred.
+
+ @param MethodName: A string containing the name of the method
+ to be invoked.
+
+ @param args: A list of arguments that should be passed
+
+ @param kwargs: A dictionary of keyword arguments that should
+ be passed to the method.
+
+ @return deferred: A deferred that will notify the completion
+ of the method-call with the returned object.
+ """
+ pass
+
+
+
Added: trunk/bases/core/transports/bbtrans.py
===================================================================
--- trunk/bases/core/transports/bbtrans.py (rev 0)
+++ trunk/bases/core/transports/bbtrans.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,361 @@
+
+from bases.thirdparty import rencode
+from twisted.internet import protocol
+from twisted.python import log
+
+import struct, traceback
+from bases.core.transports import transports
+
+class BBTransError(Exception):
+ pass
+
+# A limit on the maximum length of a bbtrans message.
+BBTRANS_MAX_MSG_SIZE = 64 * 1024 #64k
+BBTRANS_MAX_INIT_SIZE = 1024
+
+# Stuff related to the Hola signature
+BBTRANS_HOLA_FMT = "!4sLL" # "h0La, msg-type, msg-len"
+BBTRANS_HOLA_LEN = struct.calcsize(BBTRANS_HOLA_FMT)
+
+BBTRANS_HOLA_STR = "h0La" # Actual HOLA!!
+BBTRANS_HOLA_TYPE_MSG = 1
+BBTRANS_HOLA_TYPE_INIT = 2
+BBTRANS_HOLA_TYPE_INITRESPONSE = 3
+
+# Stuff that go into an init-type hola
+BBTRANS_VER_MAJOR = 0
+BBTRANS_VER_MINOR = 1
+# Init advertisement data
+BBTRANS_INIT = {"ver_major":BBTRANS_VER_MAJOR,
+ "ver_minor":BBTRANS_VER_MINOR,
+ "limit":BBTRANS_MAX_MSG_SIZE,
+ "keepalive":True,
+ "async":True}
+
+# How many consecutive bad messages do we tolerate
+BBTRANS_ERROR_TOLERANCE=10
+
+
+## Internal states in which the protocol object will exist
+STATE_GET_HOLA = 1
+STATE_GET_MSG = 2
+STATE_GET_INIT = 3
+STATE_GET_INITRESPONSE = 4
+
+FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
+
+def bbdump(src, length=8):
+ N=0; result=''
+ while src:
+ s,src = src[:length],src[length:]
+ hexa = ' '.join(["%02X"%ord(x) for x in s])
+ s = s.translate(FILTER)
+ result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
+ N+=length
+ return result
+
+
+class _BBTransProtocol(protocol.Protocol):
+ """
+ The bases binary transport (bbtrans) protocol.
+ """
+
+ def __init__(self):
+ self._InMsgQueue = []
+ self._OutMsgQueue = []
+ self._ErrTolerance = 0
+ self._RemoteSizeLimit = 0
+ self._KeepConnectionAlive = False
+ self.Debug = False
+ self._Msg_TransitState = STATE_GET_HOLA
+ self._Buffer = None
+ # _ClientSide should be configured by the Factory that is
+ # creating me. I'll initiate an Init based on this.
+ self._ClientSide = True
+
+
+ def connectionMade(self):
+ if self._ClientSide is True:
+ self.sendInit()
+
+
+ def connectionLost(self,reason):
+ pass
+
+
+ def dataReceived(self,data):
+ """I recieve data and break them into messages. The messages
+ are then delivered to the broker for processing.
+
+ @param data: a stream of bytes straight from the network"""
+
+ data_len = len(data)
+ bytes_left = len(data)
+ pos = 0
+ try:
+ # Parse the stream and break it into messages
+ while bytes_left != 0:
+ if self._Msg_TransitState == STATE_GET_HOLA:
+ if self.Debug is True:
+ print "BBTrans: _Msg_TransitState == STATE_GET_HOLA"
+ pos = self._handleHola(data,pos)
+ elif self._Msg_TransitState == STATE_GET_MSG:
+ if self.Debug is True:
+ print "BBTrans: _Msg_TransitState == STATE_GET_MSG"
+ pos = self._handleMsg(data,pos)
+ elif self._Msg_TransitState == STATE_GET_INIT:
+ if self.Debug is True:
+ print "BBTrans: _Msg_TransitState == STATE_GET_INIT"
+ pos = self._handleInit(data,pos)
+ elif self._Msg_TransitState == STATE_GET_INITRESPONSE:
+ if self.Debug is True:
+ print "BBTrans: "\
+ "_Msg_TransitState == STATE_GET_INITRESPONSE"
+ pos = self._handleInitR(data,pos)
+ else:
+ print "BBTrans: ERROR. Undefined state = %d" % \
+ (self._Msg_TransitState)
+ bytes_left = data_len - pos
+ if self.Debug is True:
+ print "BBTrans: processed %d bytes, left %d" % \
+ (pos, bytes_left)
+
+
+ # Process the messages that we have recieved.
+ while len(self._InMsgQueue) != 0:
+ emsg = self._InMsgQueue.pop(0)
+ try:
+ dmsg = rencode.loads(emsg)
+ self.broker.processInMessage(dmesg)
+ self._ErrTolerance = 0
+ except Exception, e:
+ if self.Debug is True:
+ print "Exception while processing _InMsgQueue"
+ traceback.print_exc()
+ print str(e)
+ self._ErrTolerance += 1
+ # TBD: Log this error occurance
+
+ if self._ErrTolerance > BBTRANS_ERROR_TOLERANCE:
+ # TBD: Log an error
+ self.transport.loseConnection()
+
+ except Exception, e:
+ #TBD: log an error
+ # We just close the connection for all protocol related
+ # errors
+ if self.Debug is True:
+ print "Exception in dataReceived"
+ traceback.print_exc()
+ print str(e)
+ self.transport.loseConnection()
+
+
+ def _handleHola(self,data,pos):
+ done, npos, emsg = self._readData(data,pos,BBTRANS_HOLA_LEN)
+ if done is True:
+ l, t = self._checkHolaSig(emsg)
+ self._Msg_Len = l
+ if t == BBTRANS_HOLA_TYPE_MSG:
+ self._Msg_TransitState = STATE_GET_MSG
+ elif t == BBTRANS_HOLA_TYPE_INIT:
+ self._Msg_TransitState = STATE_GET_INIT
+ elif t == BBTRANS_HOLA_TYPE_INITRESPONSE:
+ self._Msg_TransitState = STATE_GET_INITRESPONSE
+ return npos
+
+
+ def _checkHolaSig(self,data):
+ """I check the given data stream for a HOLA message and return
+ the encoded length of the message
+
+ @param data: a packed string of length BBTRANS_HOLA_LEN
+
+ @return tuple(int, int): A tuple of (length, type). (1) length
+ of the incoming message and (2) type of the incoming message"""
+
+ print "Checking for hOla"
+ try:
+ hola,length,msgtype = struct.unpack(BBTRANS_HOLA_FMT,data)
+ except:
+ raise BBTransError("HOLA signature format error")
+ # Check for hola signature
+ if hola != "h0La":
+ raise BBTransError \
+ ("HOLA signature error. Expected:%s, Got:%s" % \
+ ("h0La", hola))
+ # Check the hola-msg-type
+ if msgtype not in (BBTRANS_HOLA_TYPE_MSG, BBTRANS_HOLA_TYPE_INIT,
+ BBTRANS_HOLA_TYPE_INITRESPONSE):
+ raise BBTransError \
+ ("HOLA msgtype error. Got unknown type:%d" % \
+ (msgtype))
+ # Check the hola-msg-len
+ if msgtype==BBTRANS_HOLA_TYPE_MSG and length > BBTRANS_MAX_MSG_SIZE:
+ raise BBTransError \
+ ("HOLA type=%d size=%d exceeds MAX=%d" % \
+ (msgtype, length, BBTRANS_MAX_MSG_SIZE))
+ elif (msgtype in (BBTRANS_HOLA_TYPE_INIT, BBTRANS_HOLA_TYPE_INIT)) \
+ and length > BBTRANS_MAX_INIT_SIZE:
+ raise BBTransError \
+ ("HOLA type=%d size=%d exceeds MAX=%d" % \
+ (msgtype, length, BBTRANS_MAX_INIT_SIZE))
+
+ return (length, msgtype)
+
+
+ def _handleMsg(self,data,pos):
+ done, npos, emsg = self._readData(data,pos,self._Msg_Len)
+ if done is True:
+ self._InMsgQueue.append(emsg)
+ self._Msg_TransitState = STATE_GET_HOLA
+ return npos
+
+
+ def _handleInit(self,data,pos):
+ done, npos, emsg = self._readData(data,pos,self._Msg_Len)
+ if done is True:
+ self.recievedInit(emsg,respond=True)
+ self._Msg_TransitState = STATE_GET_HOLA
+ return npos
+
+ def _handleInitR(self,data,pos):
+ done, npos, emsg = self._readData(data,pos,self._Msg_Len)
+ if done is True:
+ self.recievedInit(emsg)
+ self._Msg_TransitState = STATE_GET_HOLA
+ return npos
+
+
+ def _readData(self,data,pos,maxLen=0):
+ """I read bytes from the given 'data' stream from position=pos
+ bytes until I get a byte stream of length maxLen. If the given
+ data-stream was not enough, I return a tuple saying so. I
+ should be called with more chunks of data till get the
+ required stream length.
+
+ When the given data-stream did not have enough bytes, I return
+ a (False, new-position, None).
+
+ Once I get all the required bytes, I return a (True,
+ new-position, byte-stream)
+
+ @param data: data stream to read data from
+
+ @param pos: position from which data should be read
+
+ @param maxLen: The maximum length of the bytestream that I
+ should gather
+
+ @return tuple (bool, int, str): I return a tuple containing
+ the following: (1) boolean saying whether I have collected the
+ byte-stream completely or whether I would need more data. (2)
+ int saying the new-position from where the next read should be
+ done. (3) byte-stream containing the requested bytestream
+ collection after I have finished reading the entire length."""
+
+ left = len(data) - pos
+ assert left > 0
+
+ if self._Buffer is None:
+ self._Buffer = []
+ assert maxLen != 0
+ self._bufferRequiredLen = maxLen
+
+ if self._bufferRequiredLen > left:
+ npos = pos + left
+ self._Buffer.append(data[pos:npos])
+ self._bufferRequiredLen -= left
+ return (False, npos, None)
+ else:
+ npos = pos + self._bufferRequiredLen
+ self._Buffer.append(data[pos:npos])
+ b = ''.join(self._Buffer)
+ self._Buffer = None
+ self._bufferRequiredLen = 0
+ return (True, npos, b)
+
+
+ def queueMsg(self, m):
+ """
+ Serialize the message and queue it for sending to the other
+ side
+
+ TBD: I should check whether the remote's accepted message
+ size.
+ """
+ emsg = rencode.dumps(m)
+ l = len(emsg)
+ if l > BBTRANS_MAX_MSG_SIZE:
+ raise BBTransError\
+ ("Object too long=%d to transmit" % (l))
+ hola = struct.pack(BBTRANS_HOLA_FMT,"h0La", l, BBTRANS_HOLA_TYPE_MSG)
+ self._OutMsgQueue.append((hola,emsg))
+ self.sendMsgs()
+
+
+ def sendInit(self):
+ emsg = rencode.dumps(BBTRANS_INIT)
+ l = len(emsg)
+ if l > BBTRANS_MAX_INIT_SIZE:
+ raise BBTransError\
+ ("Object too long=%d to transmit" % (l))
+ hola = struct.pack(BBTRANS_HOLA_FMT,"h0La", l, BBTRANS_HOLA_TYPE_INIT)
+ self._OutMsgQueue.append((hola,emsg))
+ self.sendMsgs()
+
+
+ def recievedInit(self,einit,respond=False):
+ """I am called whenever we recieve a init-sequence from the
+ other end of the line. I see whether we are ok with it. When
+ respond is True, I send back a init-response."""
+
+ init = rencode.loads(einit)
+ # checking the protocol version
+ if (init['ver_major'], init['ver_minor']) != \
+ (BBTRANS_VER_MAJOR, BBTRANS_VER_MINOR):
+ raise BBTransError \
+ ("BBTrans version mismatch. Expected:%d.%d, Got:%d.%d" % \
+ (BBTRANS_VER_MAJOR, BBTRANS_VER_MINOR, \
+ init['ver_major'], init['ver_minor']))
+ if init['limit'] < BBTRANS_MAX_INIT_SIZE:
+ raise BBTransError \
+ ("Remote wont take even a init")
+ self._RemoteSizeLimit = init['limit']
+ self._KeepConnectionAlive = init['keepalive']
+
+ if respond is True:
+ emsg = rencode.dumps(BBTRANS_INIT)
+ l = len(emsg)
+ if l > BBTRANS_MAX_INIT_SIZE:
+ raise BBTransError\
+ ("Object too long=%d to transmit" % (l))
+ hola = struct.pack(BBTRANS_HOLA_FMT,"h0La", l,
+ BBTRANS_HOLA_TYPE_INITRESPONSE)
+ self._OutMsgQueue.append((hola,emsg))
+ self.sendMsgs()
+
+
+ def sendMsgs(self):
+ """Send the serialized messages in the queue to the other side"""
+
+ while len(self._OutMsgQueue) != 0:
+ h, m = self._OutMsgQueue.pop(0)
+ self.transport.write(''.join((h,m)))
+
+
+class BBTransServerFactory(transports._BasesServerFactory):
+ def buildProtocol(self, addr):
+ bb = _BBTransProtocol()
+ bb.factory = self
+ bb.Debug = self.Debug
+ return bb
+
+
+class BBTransClientFactory(transports._BasesClientFactory):
+ def buildProtocol(self, addr):
+ bb = _BBTransProtocol()
+ bb.factory = self
+ bb._ClientSide = True
+ bb.Debug = self.Debug
+ return bb
Added: trunk/bases/core/transports/transports.py
===================================================================
--- trunk/bases/core/transports/transports.py (rev 0)
+++ trunk/bases/core/transports/transports.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,13 @@
+
+from twisted.internet import protocol
+
+class _BasesServerFactory(protocol.ServerFactory):
+ def __init__(self, Debug=False):
+ self.Debug = Debug
+
+
+class _BasesClientFactory(protocol.ServerFactory):
+ def __init__(self, Debug=False):
+ self.Debug = Debug
+
+
Added: trunk/bases/interface.py
===================================================================
--- trunk/bases/interface.py (rev 0)
+++ trunk/bases/interface.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,13 @@
+
+
+class BasesInterface:
+ """
+ I validate a bases-interface. I can be used to validate method
+ calls with that advertized in the respective interface.
+ """
+
+ def __init__(self, Interface):
+ pass
+
+ def validateMethodCall(self, MethodName, args, kwargs):
+ pass
Added: trunk/bases/services/directory.py
===================================================================
--- trunk/bases/services/directory.py (rev 0)
+++ trunk/bases/services/directory.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,70 @@
+"""Provides the component-directory service for a bases-server"""
+import thread
+
+class BasesComponentDirectory:
+
+ # A dictionary of component namespaces. Organized as follows,
+ #
+ # "org.bases.app.ComponentName":(local, "protocol", "host", "port")
+
+ directory = []
+ directory_lock = thread.allocate_lock()
+
+ def __init__(self):
+ pass
+
+ def getComponentURI(self, ComponentName):
+ """ Get the URI of a given component.
+
+ @param ComponentName: Name of the component with full
+ namespace"""
+ uri=self._lookupKnownNameSpace(ComponentName)
+ if uri != None:
+ return uri
+
+ def _lookupKnownNameSpace(self,ComponentName):
+ """I lookup the local directory of Component-NameSpaces and
+ try to locate the component's location. If I cannot find the
+ component's location, I return a None
+
+ @param ComponentName: Name of the component to locate"""
+
+ cn_list = ComponentName.split('.')
+ cn_list_len = len(cn_list)
+
+ self.directory_lock.acquire()
+ try:
+ dcns_list = self.directory.keys()
+ for dcns in dcns_list:
+ # Check if matches directly
+ if dcns == ComponentName:
+ return self.dictionary[dcns]
+
+ dcn_list = dcns.split('.')
+ dcn_list_len = len(dcn_list)
+ # Eliminate sure cases of failure
+ if dcn_list_len > cn_list_len:
+ continue
+ if dcn_list_len < cn_list_lenlen \
+ and dcn_list[dcn_list_len] != "*":
+ continue
+ # Worst-case: Match and see
+ match_fl=True
+ for i in range(0, min(dcn_list_len, cn_list_len)):
+ if dcn_list[i] == "*":
+ break
+ if dcn_list[i] != cn_list[i]:
+ match_fl=False
+ break
+ if match_fl is True:
+ return self.directory[dcns]
+ return None
+ finally:
+ self.directory_lock.release()
+
+
+ def _addComponentURI(self, ComponentName, ComponentURI):
+ pass
+
+ def _optimizeDirectory(self):
+ pass
Added: trunk/bases/services/objrepo.py
===================================================================
--- trunk/bases/services/objrepo.py (rev 0)
+++ trunk/bases/services/objrepo.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,5 @@
+"""Provides the object-repository service for a bases-server """
+
+class BasesObjectRepository:
+ pass
+
Added: trunk/bases/synclient/app.py
===================================================================
--- trunk/bases/synclient/app.py (rev 0)
+++ trunk/bases/synclient/app.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,39 @@
+
+from bases.synclient import services
+from bases.synclient.component import sBasesComponentServerProxy
+from bases.synclient.broker import sBasesBroker
+
+
+def createBasesContext(conf):
+ """
+ I initialize a 'Bases application context'. The context is need to
+ communicate with other bases-servers.
+
+ @param conf: A configuration dictionary.
+ """
+ c = {}
+ b = sBasesBroker()
+ o = services.sBasesObjectRepoClient()
+ o.broker = b
+ d = services.sBasesDirectoryClient(conf['dircache'])
+ d.broker = b
+ d.objrepo = o
+
+ c['dir'] = d
+ c['objrepo'] = o
+ c['broker'] = b
+
+ return c
+
+
+def getBasesComponent(cxt, ComponentName):
+ """
+ I get a proxy object to a remote bases-component. The proxy can be
+ used to invoke all methods advertized in the interface.
+ """
+ i = cxt['dir'].getInterface(ComponentName)
+ l = cxt['dir'].getLocation(ComponentName)
+ obj = cxt['objrepo'].getObject(l, ComponentName)
+
+ p = sBasesComponentServerProxy(l, i, obj, cxt['broker'])
+ return p
Added: trunk/bases/synclient/broker.py
===================================================================
--- trunk/bases/synclient/broker.py (rev 0)
+++ trunk/bases/synclient/broker.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,114 @@
+
+from bases.synclient.transports import bbtrans
+from OpenSSL import SSL
+
+## Values for the MsgType in a message-list
+BASES_METHOD_CALL = 1
+BASES_METHOD_RESPONSE = 2
+
+
+def SSLVerificationCB(conn, cert, errnum, depth, ok):
+ #TBD: Server certificate verificaton should be done here.
+ return ok
+
+
+class sBasesTransportFactory:
+ """
+ I am different from the implementation in bases.core.broker. I
+ manage the transports for synchronous clients.
+ """
+
+ def __init__(self, SSLKey=None, SSLCert=None, CA=None):
+ if SSLKey is not None:
+ self._SSLKey = SSLKey
+ self._SSLCert = SSLCert
+ self._CA = CA
+ else:
+ self._SSLKey = None
+ self._SSLCert = None
+ self._CA = None
+
+
+ def getTransport(self, location):
+ """
+ Given a location tuple, I create a transport object. I do a
+ SSL connection to the remote location, and connect the
+ transport to it.
+ """
+ #TBD: Check the proto in location to obtain the correct
+ #transport. We should do this automagially.
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ if self._SSLKey is not None:
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ ctx.set_verify(SSL.VERIFY_PEER, SSLVerificationCB)
+ ctx.use_privatekey_file (self._SSLKey)
+ ctx.use_certificate_file(self._SSLCert)
+ ctx.load_verify_locations(self._CA)
+ sock = SSL.Connection(ctx, sock)
+
+ sock.connect((location(1), location(2)))
+
+ bb = bbtrans.sBasesBinaryTransport()
+ bb.conn = sock
+ bb.location = location
+ bb.connectionMade()
+ return bb
+
+
+class sBasesBroker:
+ def __init__(self):
+ self._transports={}
+ self._tfactory = sBasesTransportFactory()
+ self.next_msgid = id(self._tfactory)
+
+ def callRemote(self, location, ObjID, MethodName, *args, **kwargs):
+ """
+ I am called by a BasesComponentServerProxy object. I do a
+ method call to a component-object on a remote bases-server
+
+ @param tuple location: The location tuple
+
+ @param int ObjID: the ID of the component-object on the remote
+ bases-erver.
+
+ @param str MethodName: Name of the method on the object=ObjID to
+ be called on the remote side.
+
+ @param list args: A list of arguments
+
+ @param dict kwargs: A dict of keyword arguments
+
+ """
+
+ if self.Debug is True:
+ for m in out_method_calls:
+ if m[1] == self.next_msgid:
+ raise sBasesBrokerError \
+ ("Inconsistent next_msgid-%d" % (next_msgid))
+
+ msg = [BASES_METHOD_CALL, self.next_msgid, ObjectID, MethodName,
+ args, kwargs]
+ t = self.getTransport(location)
+ try:
+ t.sendMsg(call_list)
+ r = t.recvMsg()
+ if r[1] != self.next_msgid:
+ raise sBasesBroker ("Other side is giving us junk")
+ except Exception, e:
+ l = t.location
+ t.conn.shutdown()
+ t.conn.close()
+ del self._transports[l]
+ raise Exception(e)
+ return r
+
+
+ def getTransport(self, location):
+ if self._transports.has_key(location):
+ return self._transports[location]
+ else:
+ t = self._tfactory.getTransport(location)
+ self._transports[location] = t
+ return t
+
Added: trunk/bases/synclient/component.py
===================================================================
--- trunk/bases/synclient/component.py (rev 0)
+++ trunk/bases/synclient/component.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,39 @@
+
+class sBasesComponentServerProxy():
+ """A class to create sychronous proxies to remote BasesComponents"""
+
+ def __init__(self, location, interface, obj_id, broker):
+ """
+ @param location: This points to a Location tuple. The
+ location-tuple identifies the bases-server where the component
+ is hosted.
+
+ @param interface: The interface that should be proxied.
+
+ @param obj_id: This points to the instance of the component on
+ the remote bases-server. Basically, this is the object-id
+ obtained from the remote bases-server's object repository.
+ """
+
+ self._Location = location
+ self._Interface = interface
+ self._ObjectID = obj_id
+ self._Broker = broker
+
+
+ def __getattr__(self, MethodName):
+ """
+ I simply route all component-method invocations to the
+ MethodCallHandler.
+ """
+ self._CalledMethod = MethodName
+ return self.__MethodCallHandler
+
+
+ def __MethodCallHandler(self, *args, **kwargs):
+ # Validate the called method and its parameters against the
+ # published interface.
+ self._Interface.validateMethodCall(self._CalledMethod, args, kwargs)
+ r = self._Broker.callRemote (self._Location, self._ObjectID,
+ self._CalledMethod, args, kwargs)
+ return r
Added: trunk/bases/synclient/services.py
===================================================================
--- trunk/bases/synclient/services.py (rev 0)
+++ trunk/bases/synclient/services.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,20 @@
+
+
+class sBasesObjectRepoClient:
+ """
+ I talk to a remote bases-server's object-repo to get a reference
+ to a remote-object.
+ """
+
+ def __init__(self):
+ self.broker = None
+ pass
+
+ def getObject(self, location, componentname):
+ pass
+
+class sBasesDirectoryClient:
+
+ def __init__(self):
+ self.broker = None
+ pass
Added: trunk/bases/synclient/transports/bbtrans.py
===================================================================
--- trunk/bases/synclient/transports/bbtrans.py (rev 0)
+++ trunk/bases/synclient/transports/bbtrans.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,327 @@
+
+from datetime import datetime, timedelta
+import struct, traceback
+
+from bases.thirdparty import rencode
+
+# A limit on the maximum length of a bbtrans message.
+BBTRANS_MAX_MSG_SIZE = 64 * 1024 #64k
+BBTRANS_MAX_INIT_SIZE = 1024
+
+# Stuff related to the Hola signature
+BBTRANS_HOLA_FMT = "!4sLL" # "h0La, msg-type, msg-len"
+BBTRANS_HOLA_LEN = struct.calcsize(BBTRANS_HOLA_FMT)
+
+BBTRANS_HOLA_STR = "h0La" # Actual HOLA!!
+BBTRANS_HOLA_TYPE_MSG = 1
+BBTRANS_HOLA_TYPE_INIT = 2
+BBTRANS_HOLA_TYPE_INITRESPONSE = 3
+
+# Stuff that go into an init-type hola
+BBTRANS_VER_MAJOR = 0
+BBTRANS_VER_MINOR = 1
+# Init advertisement data
+BBTRANS_INIT = {"ver_major":BBTRANS_VER_MAJOR,
+ "ver_minor":BBTRANS_VER_MINOR,
+ "limit":BBTRANS_MAX_MSG_SIZE,
+ "keepalive":True,
+ "dumb":True}
+
+sBBTRANS_REINIT_TIMEOUT = 10
+
+# How many consecutive bad messages do we tolerate
+BBTRANS_ERROR_TOLERANCE=10
+
+
+## Internal states in which the protocol object will exist
+STATE_GET_HOLA = 1
+STATE_GET_MSG = 2
+STATE_GET_INIT = 3
+STATE_GET_INITRESPONSE = 4
+
+BBDUMP_FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
+
+def bbdump(src, length=8):
+ N=0; result=''
+ while src:
+ s,src = src[:length],src[length:]
+ hexa = ' '.join(["%02X"%ord(x) for x in s])
+ s = s.translate(BBDUMP_FILTER)
+ result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
+ N+=length
+ return result
+
+
+class sBasesBinaryTransportError(Exception):
+ pass
+
+
+class sBasesBinaryTransport():
+
+ def __init__(self):
+ self.conn = None
+ self._InitTimeStamp = None
+ self._InMsgQueue = []
+ self._InitDone = False
+
+
+ def connectionMade(self):
+ self.doInit()
+
+
+ def sendMsg(self, m):
+ """
+ I send a given message to the remote bases-server. I first
+ serialize the object before sending it on the wire.
+
+ @param m: A serializable python object.
+ """
+ self.doInit()
+ emsg = rencode.dumps(m)
+ l = len(emsg)
+ if l > BBTRANS_MAX_MSG_SIZE:
+ raise sBasesBinaryTransportError \
+ ("Object too long=%d to transmit" % (l))
+ hola = struct.pack(BBTRANS_HOLA_FMT,"h0La", l, BBTRANS_HOLA_TYPE_MSG)
+ self.conn.send(''.join((hola,emsg)))
+
+
+ def recvMsg(self):
+ """
+ I read incoming data till I get a message. Once I get one, I
+ return it back to the caller.
+
+ @rtype: A python object holding a message that we got
+ """
+ self.doInit()
+ while len(self._InMsgQueue) == 0:
+ data = self.conn.recv(1024)
+ self.dataRecieved(data)
+
+ emsg = self._InMsgQueue.pop(0)
+ r = rencode.loads(emsg)
+ return r
+
+
+ def doInit(self):
+ """
+ I initiate a 'Init sequence', and wait for a response.
+ """
+ if self._InitDone is True:
+ d = timedelta()
+ d = datetime.utcnow() - self._InitTimeStamp
+ if d.seconds <= sBBTRANS_REINIT_TIMEOUT:
+ return
+
+ self._InitTimeStamp = datetime.utcnow()
+ self._InitDone = False
+ self.sendInit()
+ while self._InitDone is False:
+ data = self.conn.recv(1024)
+ self.dataRecieved(data)
+
+
+ def dataReceived(self, data):
+ """I recieve data and break them into messages. The messages
+ are then delivered to the broker for processing.
+
+ @param data: a stream of bytes straight from the network"""
+
+ data_len = len(data)
+ bytes_left = len(data)
+ pos = 0
+
+ # Parse the stream and break it into messages
+ while bytes_left != 0:
+ if self._Msg_TransitState == STATE_GET_HOLA:
+ if self.Debug is True:
+ print "BBTrans: _Msg_TransitState == STATE_GET_HOLA"
+ pos = self._handleHola(data,pos)
+ elif self._Msg_TransitState == STATE_GET_MSG:
+ if self.Debug is True:
+ print "BBTrans: _Msg_TransitState == STATE_GET_MSG"
+ pos = self._handleMsg(data,pos)
+ elif self._Msg_TransitState == STATE_GET_INIT:
+ if self.Debug is True:
+ print "BBTrans: _Msg_TransitState == STATE_GET_INIT"
+ pos = self._handleInit(data,pos)
+ elif self._Msg_TransitState == STATE_GET_INITRESPONSE:
+ if self.Debug is True:
+ print "BBTrans: "\
+ "_Msg_TransitState == STATE_GET_INITRESPONSE"
+ pos = self._handleInitR(data,pos)
+ else:
+ print "BBTrans: ERROR. Undefined state = %d" % \
+ (self._Msg_TransitState)
+ bytes_left = data_len - pos
+ if self.Debug is True:
+ print "BBTrans: processed %d bytes, left %d" % \
+ (pos, bytes_left)
+
+
+ def _handleHola(self,data,pos):
+ done, npos, emsg = self._readData(data,pos,BBTRANS_HOLA_LEN)
+ if done is True:
+ l, t = self._checkHolaSig(emsg)
+ self._Msg_Len = l
+ if t == BBTRANS_HOLA_TYPE_MSG:
+ self._Msg_TransitState = STATE_GET_MSG
+ elif t == BBTRANS_HOLA_TYPE_INIT:
+ self._Msg_TransitState = STATE_GET_INIT
+ elif t == BBTRANS_HOLA_TYPE_INITRESPONSE:
+ self._Msg_TransitState = STATE_GET_INITRESPONSE
+ return npos
+
+
+ def _checkHolaSig(self,data):
+ """I check the given data stream for a HOLA message and return
+ the encoded length of the message
+
+ @param data: a packed string of length BBTRANS_HOLA_LEN
+
+ @return tuple(int, int): A tuple of (length, type). (1) length
+ of the incoming message and (2) type of the incoming message"""
+
+ print "Checking for hOla"
+ try:
+ hola,length,msgtype = struct.unpack(BBTRANS_HOLA_FMT,data)
+ except:
+ raise sBasesBinaryTransportError("HOLA signature format error")
+ # Check for hola signature
+ if hola != "h0La":
+ raise sBasesBinaryTransportError \
+ ("HOLA signature error. Expected:%s, Got:%s" % \
+ ("h0La", hola))
+ # Check the hola-msg-type
+ if msgtype not in (BBTRANS_HOLA_TYPE_MSG, BBTRANS_HOLA_TYPE_INIT,
+ BBTRANS_HOLA_TYPE_INITRESPONSE):
+ raise sBasesBinaryTransportError \
+ ("HOLA msgtype error. Got unknown type:%d" % \
+ (msgtype))
+ # Check the hola-msg-len
+ if msgtype==BBTRANS_HOLA_TYPE_MSG and length > BBTRANS_MAX_MSG_SIZE:
+ raise sBasesBinaryTransportError \
+ ("HOLA type=%d size=%d exceeds MAX=%d" % \
+ (msgtype, length, BBTRANS_MAX_MSG_SIZE))
+ elif (msgtype in (BBTRANS_HOLA_TYPE_INIT, BBTRANS_HOLA_TYPE_INIT)) \
+ and length > BBTRANS_MAX_INIT_SIZE:
+ raise sBasesBinaryTransportError \
+ ("HOLA type=%d size=%d exceeds MAX=%d" % \
+ (msgtype, length, BBTRANS_MAX_INIT_SIZE))
+
+ return (length, msgtype)
+
+
+ def _handleMsg(self,data,pos):
+ done, npos, emsg = self._readData(data,pos,self._Msg_Len)
+ if done is True:
+ self._InMsgQueue.append(emsg)
+ self._Msg_TransitState = STATE_GET_HOLA
+ return npos
+
+
+ def _handleInit(self,data,pos):
+ done, npos, emsg = self._readData(data,pos,self._Msg_Len)
+ if done is True:
+ self.recievedInit(emsg,respond=True)
+ self._Msg_TransitState = STATE_GET_HOLA
+ return npos
+
+ def _handleInitR(self,data,pos):
+ done, npos, emsg = self._readData(data,pos,self._Msg_Len)
+ if done is True:
+ self.recievedInit(emsg)
+ self._Msg_TransitState = STATE_GET_HOLA
+ return npos
+
+
+ def _readData(self,data,pos,maxLen=0):
+
+ """I read bytes from the given 'data' stream from position=pos
+ bytes until I get a byte stream of length maxLen. If the given
+ data-stream was not enough, I return a tuple saying so. I
+ should be called with more chunks of data till get the
+ required stream length.
+
+ When the given data-stream did not have enough bytes, I return
+ a (False, new-position, None).
+
+ Once I get all the required bytes, I return a (True,
+ new-position, byte-stream)
+
+ @param data: data stream to read data from
+
+ @param pos: position from which data should be read
+
+ @param maxLen: The maximum length of the bytestream that I
+ should gather
+
+ @return tuple (bool, int, str): I return a tuple containing
+ the following: (1) boolean saying whether I have collected the
+ byte-stream completely or whether I would need more data. (2)
+ int saying the new-position from where the next read should be
+ done. (3) byte-stream containing the requested bytestream
+ collection after I have finished reading the entire length."""
+
+ left = len(data) - pos
+ assert left > 0
+
+ if self._Buffer is None:
+ self._Buffer = []
+ assert maxLen != 0
+ self._bufferRequiredLen = maxLen
+
+ if self._bufferRequiredLen > left:
+ npos = pos + left
+ self._Buffer.append(data[pos:npos])
+ self._bufferRequiredLen -= left
+ return (False, npos, None)
+ else:
+ npos = pos + self._bufferRequiredLen
+ self._Buffer.append(data[pos:npos])
+ b = ''.join(self._Buffer)
+ self._Buffer = None
+ self._bufferRequiredLen = 0
+ return (True, npos, b)
+
+
+ def sendInit(self):
+ emsg = rencode.dumps(BBTRANS_INIT)
+ l = len(emsg)
+ if l > BBTRANS_MAX_INIT_SIZE:
+ raise sBasesBinaryTransportError\
+ ("Object too long=%d to transmit" % (l))
+ hola = struct.pack(BBTRANS_HOLA_FMT,"h0La", l, BBTRANS_HOLA_TYPE_INIT)
+ self.conn.send(''.join((hola,emsg)))
+
+
+ def recievedInit(self,einit,respond=False):
+ """I am called whenever we recieve a init-sequence from the
+ other end of the line. I see whether we are ok with it. When
+ respond is True, I send back a init-response."""
+
+ init = rencode.loads(einit)
+ # checking the protocol version
+ if (init['ver_major'], init['ver_minor']) != \
+ (BBTRANS_VER_MAJOR, BBTRANS_VER_MINOR):
+ raise sBasesBinaryTransportError \
+ ("BBTrans version mismatch. Expected:%d.%d, Got:%d.%d" % \
+ (BBTRANS_VER_MAJOR, BBTRANS_VER_MINOR, \
+ init['ver_major'], init['ver_minor']))
+ if init['limit'] < BBTRANS_MAX_INIT_SIZE:
+ raise sBasesBinaryTransportError \
+ ("Remote wont take even a init")
+ self._RemoteSizeLimit = init['limit']
+ self._KeepConnectionAlive = init['keepalive']
+
+ if respond is True:
+ emsg = rencode.dumps(BBTRANS_INIT)
+ l = len(emsg)
+ if l > BBTRANS_MAX_INIT_SIZE:
+ raise sBasesBinaryTransportError\
+ ("Object too long=%d to transmit" % (l))
+ hola = struct.pack(BBTRANS_HOLA_FMT,"h0La", l,
+ BBTRANS_HOLA_TYPE_INITRESPONSE)
+ self.conn.send(''.join((hola,emsg)))
+
+ self._InitDone = True
Added: trunk/bases/thirdparty/rencode.py
===================================================================
--- trunk/bases/thirdparty/rencode.py (rev 0)
+++ trunk/bases/thirdparty/rencode.py 2009-02-26 06:56:36 UTC (rev 2)
@@ -0,0 +1,396 @@
+
+"""
+rencode -- Web safe object pickling/unpickling.
+
+The rencode module is a modified version of bencode from the
+BitTorrent project. For complex, heterogeneous data structures with
+many small elements, r-encodings take up significantly less space than
+b-encodings:
+
+ >>> len(rencode.dumps({'a':0, 'b':[1,2], 'c':99}))
+ 13
+ >>> len(bencode.bencode({'a':0, 'b':[1,2], 'c':99}))
+ 26
+
+The rencode format is not standardized, and may change with different
+rencode module versions, so you should check that you are using the
+same rencode version throughout your project.
+"""
+
+__version__ = '1.0.0'
+__all__ = ['dumps', 'loads']
+
+# Original bencode module by Petru Paler, et al.
+#
+# Modifications by Connelly Barnes:
+#
+# - Added support for floats (sent as 32-bit or 64-bit in network
+# order), bools, None.
+# - Allowed dict keys to be of any serializable type.
+# - Lists/tuples are always decoded as tuples (thus, tuples can be
+# used as dict keys).
+# - Embedded extra information in the 'typecodes' to save some space.
+# - Added a restriction on integer length, so that malicious hosts
+# cannot pass us large integers which take a long time to decode.
+#
+# Licensed by Bram Cohen under the "MIT license":
+#
+# "Copyright (C) 2001-2002 Bram Cohen
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# The Software is provided "AS IS", without warranty of any kind,
+# express or implied, including but not limited to the warranties of
+# merchantability, fitness for a particular purpose and
+# noninfringement. In no event shall the authors or copyright holders
+# be liable for any claim, damages or other liability, whether in an
+# action of contract, tort or otherwise, arising from, out of or in
+# connection with the Software or the use or other dealings in the
+# Software."
+#
+# (The rencode module is licensed under the above license as well).
+#
+
+import struct
+import string
+
+# Number of bits for serialized floats, either 32 or 64.
+FLOAT_BITS = 64
+
+# Maximum length of integer when written as base 10 string.
+MAX_INT_LENGTH = 64
+
+# The bencode 'typecodes' such as i, d, etc have been extended and
+# relocated on the base-256 character set.
+CHR_LIST = chr(59)
+CHR_DICT = chr(60)
+CHR_INT = chr(61)
+CHR_INT1 = chr(62)
+CHR_INT2 = chr(63)
+CHR_INT4 = chr(64)
+CHR_INT8 = chr(65)
+CHR_FLOAT = chr(66)
+CHR_TRUE = chr(67)
+CHR_FALSE = chr(68)
+CHR_NONE = chr(69)
+CHR_TERM = chr(127)
+
+# Positive integers with value embedded in typecode.
+INT_POS_FIXED_START = 0
+INT_POS_FIXED_COUNT = 32
+
+# Dictionaries with length embedded in typecode.
+DICT_FIXED_START = 102
+DICT_FIXED_COUNT = 25
+
+# Negative integers with value embedded in typecode.
+INT_NEG_FIXED_START = 70
+INT_NEG_FIXED_COUNT = 32
+
+# Strings with length embedded in typecode.
+STR_FIXED_START = 128
+STR_FIXED_COUNT = 64
+
+# Lists with length embedded in typecode.
+LIST_FIXED_START = STR_FIXED_START+STR_FIXED_COUNT
+LIST_FIXED_COUNT = 64
+
+def decode_int(x, f):
+ f += 1
+ newf = x.index(CHR_TERM, f)
+ if newf - f >= MAX_INT_LENGTH:
+ raise ValueError('overflow')
+ try:
+ n = int(x[f:newf])
+ except (OverflowError, ValueError):
+ n = long(x[f:newf])
+ if x[f] == '-':
+ if x[f + 1] == '0':
+ raise ValueError
+ elif x[f] == '0' and newf != f+1:
+ raise ValueError
+ return (n, newf+1)
+
+def decode_intb(x, f):
+ f += 1
+ return (struct.unpack('!b', x[f:f+1])[0], f+1)
+
+def decode_inth(x, f):
+ f += 1
+ return (struct.unpack('!h', x[f:f+2])[0], f+2)
+
+def decode_intl(x, f):
+ f += 1
+ return (struct.unpack('!l', x[f:f+4])[0], f+4)
+
+def decode_intq(x, f):
+ f += 1
+ return (struct.unpack('!q', x[f:f+8])[0], f+8)
+
+def decode_float(x, f):
+ f += 1
+ if FLOAT_BITS == 32:
+ n = struct.unpack('!f', x[f:f+4])[0]
+ return (n, f+4)
+ elif FLOAT_BITS == 64:
+ n = struct.unpack('!d', x[f:f+8])[0]
+ return (n, f+8)
+ else:
+ raise ValueError
+
+def decode_string(x, f):
+ colon = x.index(':', f)
+ try:
+ n = int(x[f:colon])
+ except (OverflowError, ValueError):
+ n = long(x[f:colon])
+ if x[f] == '0' and colon != f+1:
+ raise ValueError
+ colon += 1
+ return (x[colon:colon+n], colon+n)
+
+def decode_list(x, f):
+ r, f = [], f+1
+ while x[f] != CHR_TERM:
+ v, f = decode_func[x[f]](x, f)
+ r.append(v)
+ return (tuple(r), f + 1)
+
+def decode_dict(x, f):
+ r, f = {}, f+1
+ while x[f] != CHR_TERM:
+ k, f = decode_func[x[f]](x, f)
+ r[k], f = decode_func[x[f]](x, f)
+ return (r, f + 1)
+
+def decode_true(x, f):
+ return (True, f+1)
+
+def decode_false(x, f):
+ return (False, f+1)
+
+def decode_none(x, f):
+ return (None, f+1)
+
+decode_func = {}
+decode_func['0'] = decode_string
+decode_func['1'] = decode_string
+decode_func['2'] = decode_string
+decode_func['3'] = decode_string
+decode_func['4'] = decode_string
+decode_func['5'] = decode_string
+decode_func['6'] = decode_string
+decode_func['7'] = decode_string
+decode_func['8'] = decode_string
+decode_func['9'] = decode_string
+decode_func[CHR_LIST ] = decode_list
+decode_func[CHR_DICT ] = decode_dict
+decode_func[CHR_INT ] = decode_int
+decode_func[CHR_INT1 ] = decode_intb
+decode_func[CHR_INT2 ] = decode_inth
+decode_func[CHR_INT4 ] = decode_intl
+decode_func[CHR_INT8 ] = decode_intq
+decode_func[CHR_FLOAT] = decode_float
+decode_func[CHR_TRUE ] = decode_true
+decode_func[CHR_FALSE] = decode_false
+decode_func[CHR_NONE ] = decode_none
+
+def make_fixed_length_string_decoders():
+ def make_decoder(slen):
+ def f(x, f):
+ return (x[f+1:f+1+slen], f+1+slen)
+ return f
+ for i in range(STR_FIXED_COUNT):
+ decode_func[chr(STR_FIXED_START+i)] = make_decoder(i)
+
+make_fixed_length_string_decoders()
+
+def make_fixed_length_list_decoders():
+ def make_decoder(slen):
+ def f(x, f):
+ r, f = [], f+1
+ for i in range(slen):
+ v, f = decode_func[x[f]](x, f)
+ r.append(v)
+ return (tuple(r), f)
+ return f
+ for i in range(LIST_FIXED_COUNT):
+ decode_func[chr(LIST_FIXED_START+i)] = make_decoder(i)
+
+make_fixed_length_list_decoders()
+
+def make_fixed_length_int_decoders():
+ def make_decoder(j):
+ def f(x, f):
+ return (j, f+1)
+ return f
+ for i in range(INT_POS_FIXED_COUNT):
+ decode_func[chr(INT_POS_FIXED_START+i)] = make_decoder(i)
+ for i in range(INT_NEG_FIXED_COUNT):
+ decode_func[chr(INT_NEG_FIXED_START+i)] = make_decoder(-1-i)
+
+make_fixed_length_int_decoders()
+
+def make_fixed_length_dict_decoders():
+ def make_decoder(slen):
+ def f(x, f):
+ r, f = {}, f+1
+ for j in range(slen):
+ k, f = decode_func[x[f]](x, f)
+ r[k], f = decode_func[x[f]](x, f)
+ return (r, f)
+ return f
+ for i in range(DICT_FIXED_COUNT):
+ decode_func[chr(DICT_FIXED_START+i)] = make_decoder(i)
+
+make_fixed_length_dict_decoders()
+
+def encode_dict(x,r):
+ r.append(CHR_DICT)
+ for k, v in x.items():
+ encode_func[type(k)](k, r)
+ encode_func[type(v)](v, r)
+ r.append(CHR_TERM)
+
+
+def loads(x):
+ try:
+ r, l = decode_func[x[0]](x, 0)
+ except (IndexError, KeyError):
+ raise ValueError
+ if l != len(x):
+ raise ValueError
+ return r
+
+from types import StringType, IntType, LongType, DictType, ListType, TupleType, FloatType, NoneType
+
+def encode_int(x, r):
+ if 0 <= x < INT_POS_FIXED_COUNT:
+ r.append(chr(INT_POS_FIXED_START+x))
+ elif -INT_NEG_FIXED_COUNT <= x < 0:
+ r.append(chr(INT_NEG_FIXED_START-1-x))
+ elif -128 <= x < 128:
+ r.extend((CHR_INT1, struct.pack('!b', x)))
+ elif -32768 <= x < 32768:
+ r.extend((CHR_INT2, struct.pack('!h', x)))
+ elif -2147483648 <= x < 2147483648:
+ r.extend((CHR_INT4, struct.pack('!l', x)))
+ elif -9223372036854775808 <= x < 9223372036854775808:
+ r.extend((CHR_INT8, struct.pack('!q', x)))
+ else:
+ s = str(x)
+ if len(s) >= MAX_INT_LENGTH:
+ raise ValueError('overflow')
+ r.extend((CHR_INT, s, CHR_TERM))
+
+def encode_float(x, r):
+ if FLOAT_BITS == 32:
+ r.extend((CHR_FLOAT, struct.pack('!f', x)))
+ elif FLOAT_BITS == 64:
+ r.extend((CHR_FLOAT, struct.pack('!d', x)))
+ else:
+ raise ValueError
+
+def encode_bool(x, r):
+ r.extend({False: CHR_FALSE, True: CHR_TRUE}[bool(x)])
+
+def encode_none(x, r):
+ r.extend(CHR_NONE)
+
+def encode_string(x, r):
+ if len(x) < STR_FIXED_COUNT:
+ r.extend((chr(STR_FIXED_START + len(x)), x))
+ else:
+ r.extend((str(len(x)), ':', x))
+
+def encode_list(x, r):
+ if len(x) < LIST_FIXED_COUNT:
+ r.append(chr(LIST_FIXED_START + len(x)))
+ for i in x:
+ encode_func[type(i)](i, r)
+ else:
+ r.append(CHR_LIST)
+ for i in x:
+ encode_func[type(i)](i, r)
+ r.append(CHR_TERM)
+
+def encode_dict(x,r):
+ if len(x) < DICT_FIXED_COUNT:
+ r.append(chr(DICT_FIXED_START + len(x)))
+ for k, v in x.items():
+ encode_func[type(k)](k, r)
+ encode_func[type(v)](v, r)
+ else:
+ r.append(CHR_DICT)
+ for k, v in x.items():
+ encode_func[type(k)](k, r)
+ encode_func[type(v)](v, r)
+ r.append(CHR_TERM)
+
+encode_func = {}
+encode_func[IntType] = encode_int
+encode_func[LongType] = encode_int
+encode_func[FloatType] = encode_float
+encode_func[StringType] = encode_string
+encode_func[ListType] = encode_list
+encode_func[TupleType] = encode_list
+encode_func[DictType] = encode_dict
+encode_func[NoneType] = encode_none
+
+try:
+ from types import BooleanType
+ encode_func[BooleanType] = encode_bool
+except ImportError:
+ pass
+
+def dumps(x):
+ r = []
+ encode_func[type(x)](x, r)
+ return ''.join(r)
+
+
+def test():
+ f1 = struct.unpack('!f', struct.pack('!f', 25.5))[0]
+ f2 = struct.unpack('!f', struct.pack('!f', 29.3))[0]
+ f3 = struct.unpack('!f', struct.pack('!f', -0.6))[0]
+ L = (({'a':15, 'bb':f1, 'ccc':f2, '':(f3,(),False,True,'')},('a',10**20),tuple(range(-100000,100000)),'b'*31,'b'*62,'b'*64,2**30,2**33,2**62,2**64,2**30,2**33,2**62,2**64,False,False, True, -1, 2, 0),)
+ assert loads(dumps(L)) == L
+ d = dict(zip(range(-100000,100000),range(-100000,100000)))
+ d.update({'a':20, 20:40, 40:41, f1:f2, f2:f3, f3:False, False:True, True:False})
+ L = (d, {}, {5:6}, {7:7,True:8}, {9:10, 22:39, 49:50, 44: ''})
+ assert loads(dumps(L)) == L
+ L = ('', 'a'*10, 'a'*100, 'a'*1000, 'a'*10000, 'a'*100000, 'a'*1000000, 'a'*10000000)
+ assert loads(dumps(L)) == L
+ L = tuple([dict(zip(range(n),range(n))) for n in range(100)]) + ('b',)
+ assert loads(dumps(L)) == L
+ L = tuple([dict(zip(range(n),range...
[truncated message content] |