From: <np...@us...> - 2008-10-15 05:48:26
|
Revision: 1096 http://omc.svn.sourceforge.net/omc/?rev=1096&view=rev Author: npaxton Date: 2008-10-15 05:48:20 +0000 (Wed, 15 Oct 2008) Log Message: ----------- get indications working as standalone test... requires new internalIndicationFix.patch Modified Paths: -------------- cmpi-bindings/trunk/test/python/UpcallAtom.sfcb.reg cmpi-bindings/trunk/test/python/UpcallAtomProvider.py cmpi-bindings/trunk/test/python/UpcallAtomTest.py Added Paths: ----------- cmpi-bindings/trunk/test/python/UpcallAtomIndListener.py Modified: cmpi-bindings/trunk/test/python/UpcallAtom.sfcb.reg =================================================================== --- cmpi-bindings/trunk/test/python/UpcallAtom.sfcb.reg 2008-10-14 20:49:31 UTC (rev 1095) +++ cmpi-bindings/trunk/test/python/UpcallAtom.sfcb.reg 2008-10-15 05:48:20 UTC (rev 1096) @@ -9,9 +9,3 @@ location: pyCmpiProvider type: indication namespace: root/cimv2 - -#[UpcallAtom_Indication_Handler] -# provider: UpcallAtomProvider -# location: pyCmpiProvider -# type: instance method -# namespace: root/cimv2 Added: cmpi-bindings/trunk/test/python/UpcallAtomIndListener.py =================================================================== --- cmpi-bindings/trunk/test/python/UpcallAtomIndListener.py (rev 0) +++ cmpi-bindings/trunk/test/python/UpcallAtomIndListener.py 2008-10-15 05:48:20 UTC (rev 1096) @@ -0,0 +1,109 @@ +#!/usr/bin/python +# +# Simple indication receiver using Twisted Python. HTTP post requests +# are listened for on port 5988 and port 5899 using SSL. +# +# Requires Twisted Python and +# + +import sys +from twisted.internet import reactor +from twisted.web import server, resource +import pywbem +import threading +import os + +from twisted.internet import ssl, reactor +from twisted.python import log + +from time import sleep + +class CIMListener(resource.Resource): + """ CIM Listener + """ + + isLeaf = 1 + + class ServerContextFactory(object): + def __init__(self, cert, key): + self.cert = cert + self.key = key + + def getContext(self): + """Create an SSL context with a dodgy certificate.""" + + from OpenSSL import SSL + ctx = SSL.Context(SSL.SSLv23_METHOD) + ctx.use_certificate_file(self.cert) + ctx.use_privatekey_file(self.key) + return ctx + + def __init__(self, callback, + http_port=5988, https_port=5989, + ssl_key=None, ssl_cert=None): + self.callback = callback + self.http_port = http_port + self.https_port = https_port + self.ssl_key = ssl_key + self.ssl_cert = ssl_cert + + site = server.Site(self) + + if self.http_port and self.http_port > 0: + reactor.listenTCP(self.http_port, site) + if self.https_port and self.https_port > 0: + reactor.listenSSL(self.https_port, site, + self.ServerContextFactory(cert=ssl_cert, key=ssl_key)) + + def start(self): + ''' doesn't work''' + thread = threading.Thread(target=reactor.run) + thread.start() + + def stop(self): + reactor.stop() + + def run(self): + reactor.run() + + def render_POST(self, request): + tt = pywbem.parse_cim(pywbem.xml_to_tupletree(request.content.read())) + insts = [x[1] for x in tt[2][2][0][2][2]] + for inst in insts: + self.callback(inst) + return '' + + +#log.startLogging(sys.stdout) + + +rcv_count=0 + +if __name__ == '__main__': + global rcv_count + rcv_count = 0 + + killval = sys.argv[1] + filename = sys.argv[2] + print + print "killval=",killval + + cl=None + + def update_file(): + global rcv_count + fd = open(filename, 'w') + fd.write('%s'%rcv_count) + fd.close + + def cb(inst): + global rcv_count + rcv_count+=1 + update_file() + print inst['IndicationTime'], inst['Description'] + if inst['Description']==killval: + cl.stop() + + update_file() + cl = CIMListener(cb, https_port=None) + cl.run() Property changes on: cmpi-bindings/trunk/test/python/UpcallAtomIndListener.py ___________________________________________________________________ Added: svn:executable + * Modified: cmpi-bindings/trunk/test/python/UpcallAtomProvider.py =================================================================== --- cmpi-bindings/trunk/test/python/UpcallAtomProvider.py 2008-10-14 20:49:31 UTC (rev 1095) +++ cmpi-bindings/trunk/test/python/UpcallAtomProvider.py 2008-10-15 05:48:20 UTC (rev 1096) @@ -24,82 +24,6 @@ logger.log_debug(msg) -# start indication support methods -''' -def _createFilter(ch, query='select * from CIM_ProcessIndication', - ns='root/interop', - querylang='WQL', - src_ns='root/cimv2', - in_name=None): - name = in_name or 'cimfilter%s'%time.time() - filterinst=pywbem.CIMInstance('CIM_IndicationFilter') - filterinst['CreationClassName']='CIM_IndicationFilter' - filterinst['SystemCreationClassName']='CIM_ComputerSystem' - filterinst['SystemName']=getfqdn() - filterinst['Name']=name - filterinst['Query']=query - filterinst['QueryLanguage']=querylang - filterinst['SourceNamespace']=src_ns - cop = pywbem.CIMInstanceName('CIM_IndicationFilter') - cop.keybindings = { 'CreationClassName':'CIM_IndicationFilter', - 'SystemClassName':'CIM_ComputerSystem', - 'SystemName':getfqdn(), - 'Name':name } - cop.namespace=ns - filterinst.path = cop - filtercop = ch.CreateInstance(cop, filterinst) - return filtercop - -def _createDest(ch, destination='http://localhost:5988', - ns='root/interop', - in_name=None): - name = in_name or 'cimlistener%s'%time.time() - destinst=pywbem.CIMInstance('CIM_ListenerDestinationCIMXML') - destinst['CreationClassName']='CIM_ListenerDestinationCIMXML' - destinst['SystemCreationClassName']='CIM_ComputerSystem' - destinst['SystemName']=getfqdn() - print "destname=",name - destinst['Name']=name - destinst['Destination']=destination - cop = pywbem.CIMInstanceName('CIM_ListenerDestinationCIMXML') - cop.keybindings = { 'CreationClassName':'CIM_ListenerDestinationCIMXML', - 'SystemClassName':'CIM_ComputerSystem', - 'SystemName':getfqdn(), - 'Name':name } - cop.namespace=ns - destinst.path = cop - destcop = ch.CreateInstance(cop, destinst) - return destcop - -def _createSubscription(ch, ns='root/interop'): - replace_ns = ch.default_namespace - ch.default_namespace=ns - indfilter=_createFilter(ch) - indhandler=_createDest(ch) - subinst=pywbem.CIMInstance('CIM_IndicationSubscription') - subinst['Filter']=indfilter - subinst['Handler']=indhandler - cop = pywbem.CIMInstanceName('CIM_IndicationSubscription') - cop.keybindings = { 'Filter':indfilter, - 'Handler':indhandler } - cop.namespace=ns - subinst.path = cop - subcop = ch.CreateInstance(cop, subinst) - ch.default_namespace=replace_ns - return subcop - - -def _deleteSubscription(ch, subcop): - indfilter = subcop['Filter'] - indhandler= subcop['Handler'] - ch.DeleteInstance(subcop) - ch.DeleteInstance(indfilter) - ch.DeleteInstance(indhandler) -''' -# end indication support methods - - - ################################################################################ _atoms = {'Hydrogen': 1, Modified: cmpi-bindings/trunk/test/python/UpcallAtomTest.py =================================================================== --- cmpi-bindings/trunk/test/python/UpcallAtomTest.py 2008-10-14 20:49:31 UTC (rev 1095) +++ cmpi-bindings/trunk/test/python/UpcallAtomTest.py 2008-10-15 05:48:20 UTC (rev 1096) @@ -7,17 +7,21 @@ import os import time from socket import getfqdn +from subprocess import Popen +import signal +import sys + conn = None #This test requires the usage of elementtree _g_opts = None _g_args = None +_g_cwd = None - # start indication support methods def createFilter( ch, query='select * from CIM_ProcessIndication', ns='root/interop', @@ -119,25 +123,45 @@ else: print('') - ''' def test_a_upcalls_all(self): rv,outs = self.conn.InvokeMethod('test_all_upcalls', 'Test_UpcallAtom') self.assertEquals(rv, 'Success!') self.assertFalse(outs) - ''' def test_b_indications(self): + global _g_cwd + dir=_g_cwd + listenername=dir+'/UpcallAtomIndListener.py' + outputname=dir+'/IndCount.txt' + listenpid=Popen([listenername, "6", outputname]).pid numrcv = 0 self.subcop = createSubscription(self.conn) num_to_send = pywbem.Uint16(7) + time.sleep(1) self.conn.InvokeMethod('reset_indication_count', 'Test_UpcallAtom') countsent,outs = self.conn.InvokeMethod('send_indications', 'Test_UpcallAtom', num_to_send=num_to_send) numsent,outs = self.conn.InvokeMethod('get_indication_send_count', 'Test_UpcallAtom') deleteSubscription(self.conn, self.subcop) if (countsent != numsent): self.fail("send_indications NumSent(%d) doesn't match get_indication_send_count NumSent(%d)"%(countsent, numsent)); + time.sleep(1) + fd=open(outputname, 'r') + countstr=fd.read() + fd.close() + numrcv=int(countstr) if (numrcv != numsent): + for i in xrange(10): + time.sleep(1) + fd=open(outputname, 'r') + countstr=fd.read() + numrcv=int(countstr) + print "read IndCount: ",numrcv + fd.close() + if (numrcv == numsent): + break; self.fail("number received(%d) doesn't match number sent(%d)"%(numrcv,numsent)); + os.remove(outputname) + os.kill(listenpid, signal.SIGTERM) def get_unit_test(): @@ -145,6 +169,9 @@ if __name__ == '__main__': + global _g_cwd + _g_cwd=sys.path[0] + print "Current Working Directory: ",_g_cwd parser = optparse.OptionParser() wbem_connection.getWBEMConnParserOptions(parser) parser.add_option('--verbose', '', action='store_true', default=False, This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |