From: <jda...@us...> - 2008-08-14 15:08:08
|
Revision: 209 http://pyscard.svn.sourceforge.net/pyscard/?rev=209&view=rev Author: jdaussel Date: 2008-08-14 15:08:16 +0000 (Thu, 14 Aug 2008) Log Message: ----------- Added testcase_readermonitorstress Modified Paths: -------------- trunk/pyscard/src/smartcard/test/framework/testsuite_framework.py Added Paths: ----------- trunk/pyscard/src/smartcard/test/framework/testcase_readermonitorstress.py Added: trunk/pyscard/src/smartcard/test/framework/testcase_readermonitorstress.py =================================================================== --- trunk/pyscard/src/smartcard/test/framework/testcase_readermonitorstress.py (rev 0) +++ trunk/pyscard/src/smartcard/test/framework/testcase_readermonitorstress.py 2008-08-14 15:08:16 UTC (rev 209) @@ -0,0 +1,177 @@ +#! /usr/bin/env python +"""Unit tests for smartcard.ReaderMonitoring + +This test case can be executed individually, or with all other test cases +thru testsuite_framework.py. + +__author__ = "http://www.gemalto.com" + +Copyright 2001-2008 gemalto +Author: Jean-Daniel Aussel, mailto:jea...@ge... + +This file is part of pyscard. + +pyscard is free software; you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or +(at your option) any later version. + +pyscard is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with pyscard; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +""" + + +import random +import threading +import time +import unittest + +from smartcard.ReaderMonitoring import ReaderMonitor, ReaderObserver + +period=.1 + +# stats on virtual reader insertion/removal +insertedreaderstats={} +removedreaderstats={} + +# the virtual list of readers +mutexvreaders=threading.RLock() +virtualreaders=[] +def getReaders(): + '''Return virtual list of inserted readers. + Replacement of smartcard.system.readers for testing purpose''' + try: + mutexvreaders.acquire() + currentreaders = virtualreaders + finally: + mutexvreaders.release() + readerEvent.set() + return currentreaders + +# an event to signal test threads to end +exitEvent = threading.Event() + +# an event to ensure only one insertion/removal between getReaders() calls +readerEvent = threading.Event() +readerEvent.clear() + +# test running time in seconds +RUNNING_TIME=15 + +# the count of registered observers +OBS_COUNT=100 + +class readerInsertionThread( threading.Thread ): + '''Simulate reader insertion every 2 to 4 periods.''' + def __init__( self ): + threading.Thread.__init__( self ) + def run( self ): + while not exitEvent.isSet(): + time.sleep( random.uniform( 2*period, 4*period ) ) + readerEvent.wait() + newreader=random.choice( 'abcdefghijklmnopqrstuvwxyz' ) + try: + mutexvreaders.acquire() + if newreader not in virtualreaders: + virtualreaders.append( newreader ) + if insertedreaderstats.has_key(newreader): insertedreaderstats[newreader]+=1 + else: insertedreaderstats[newreader]=1 + finally: + readerEvent.clear() + mutexvreaders.release() + + +class readerRemovalThread( threading.Thread ): + '''Simulate reader removal every 5 to 6 periods.''' + def __init__(self ): + threading.Thread.__init__(self) + def run(self): + while not exitEvent.isSet(): + time.sleep( random.uniform( 5*period, 6*period ) ) + readerEvent.wait() + try: + mutexvreaders.acquire() + if virtualreaders: + oldreader=random.choice( virtualreaders ) + virtualreaders.remove(oldreader) + if removedreaderstats.has_key(oldreader): removedreaderstats[oldreader]+=1 + else: removedreaderstats[oldreader]=1 + finally: + readerEvent.clear() + mutexvreaders.release() + + +class countobserver( ReaderObserver ): + '''A simple reader observer that counts added/removed readers.''' + def __init__( self, obsindex ): + self.obsindex=obsindex + self.insertedreaderstats={} + self.removedreaderstats={} + self.countnotified=0 + def update( self, observable, (addedreaders, removedreaders) ): + self.countnotified+=1 + for newreader in addedreaders: + if self.insertedreaderstats.has_key(newreader): self.insertedreaderstats[newreader]+=1 + else: self.insertedreaderstats[newreader]=1 + for oldreader in removedreaders: + if self.removedreaderstats.has_key(oldreader): self.removedreaderstats[oldreader]+=1 + else: self.removedreaderstats[oldreader]=1 + + +class testcase_readermonitorstress( unittest.TestCase ): + '''Test smartcard framework reader monitoring''' + def testcase_readermonitorthread(self): + + # create thread that simulates reader insertion + insertionthread = readerInsertionThread() + + # create thread that simulates reader removal + removalthread = readerRemovalThread() + + readermonitor = ReaderMonitor( readerProc=getReaders, period=period ) + observers=[] + for i in range( 0, OBS_COUNT ): + observer = countobserver( i ) + readermonitor.addObserver( observer ) + observers.append( observer ) + + # signal threads to start + insertionthread.start() + removalthread.start() + + # let reader insertion/removal threads run for a while + # then signal threads to end + time.sleep( RUNNING_TIME ) + exitEvent.set() + + # wait until all threads ended + removalthread.join() + insertionthread.join() + time.sleep( 2*period ) + + for observer in observers: + #print observer.obsindex, observer.insertedreaderstats==insertedreaderstats, observer.countnotified + #print observer.obsindex, observer.removedreaderstats==removedreaderstats, observer.countnotified + #print observer.insertedreaderstats + #print insertedreaderstats + #print observer.removedreaderstats + #print removedreaderstats + self.assertEquals( observer.insertedreaderstats, insertedreaderstats ) + self.assertEquals( observer.removedreaderstats, removedreaderstats ) + + + +def suite(): + suite1 = unittest.makeSuite(testcase_readermonitorthread) + return unittest.TestSuite((suite1)) + + +if __name__ == '__main__': + unittest.main() + Modified: trunk/pyscard/src/smartcard/test/framework/testsuite_framework.py =================================================================== --- trunk/pyscard/src/smartcard/test/framework/testsuite_framework.py 2008-08-14 12:47:34 UTC (rev 208) +++ trunk/pyscard/src/smartcard/test/framework/testsuite_framework.py 2008-08-14 15:08:16 UTC (rev 209) @@ -45,6 +45,7 @@ 'testcase_readers', 'testcase_readergroups', 'testcase_readermonitor', + 'testcase_readermonitorstress', 'testcase_ulist', 'testcase_utils', ) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |