|
From: <lu...@us...> - 2010-06-09 15:42:11
|
Revision: 475
http://pyscard.svn.sourceforge.net/pyscard/?rev=475&view=rev
Author: ludov
Date: 2010-06-09 15:42:05 +0000 (Wed, 09 Jun 2010)
Log Message:
-----------
make pep8 happy
Modified Paths:
--------------
trunk/pyscard/src/smartcard/test/framework/testcase_readermonitor.py
trunk/pyscard/src/smartcard/test/framework/testcase_readermonitorstress.py
Modified: trunk/pyscard/src/smartcard/test/framework/testcase_readermonitor.py
===================================================================
--- trunk/pyscard/src/smartcard/test/framework/testcase_readermonitor.py 2010-06-09 15:37:57 UTC (rev 474)
+++ trunk/pyscard/src/smartcard/test/framework/testcase_readermonitor.py 2010-06-09 15:42:05 UTC (rev 475)
@@ -49,50 +49,53 @@
# a simple reader observer that prints added/removed readers
-class printobserver( ReaderObserver ):
- def __init__( self, obsindex, testcase ):
- self.obsindex=obsindex
+class printobserver(ReaderObserver):
+
+ def __init__(self, obsindex, testcase):
+ self.obsindex = obsindex
self.testcase = testcase
- def update( self, observable, (addedreaders, removedreaders) ):
- foundreaders={}
- self.testcase.assertEquals( removedreaders, [] )
+ def update(self, observable, (addedreaders, removedreaders)):
+ foundreaders = {}
+ self.testcase.assertEquals(removedreaders, [])
for reader in addedreaders:
- foundreaders[str(reader)]=1
- if {}!=foundreaders:
+ foundreaders[str(reader)] = 1
+ if {} != foundreaders:
for reader in expectedReaders:
- self.testcase.assert_( foundreaders.has_key( reader ) )
+ self.testcase.assert_(foundreaders.has_key(reader))
-class testthread( threading.Thread ):
- def __init__(self, obsindex, testcase ):
+
+class testthread(threading.Thread):
+
+ def __init__(self, obsindex, testcase):
threading.Thread.__init__(self)
self.obsindex = obsindex
self.testcase = testcase
self.readermonitor = ReaderMonitor()
- self.observer=None
+ self.observer = None
def run(self):
# create and register observer
- self.observer = printobserver( self.obsindex, self.testcase )
- self.readermonitor.addObserver( self.observer )
+ self.observer = printobserver(self.obsindex, self.testcase)
+ self.readermonitor.addObserver(self.observer)
time.sleep(1)
self.readermonitor.deleteObserver(self.observer)
-
class testcase_readermonitor(unittest.TestCase):
"""Test smartcard framework reader monitoring methods"""
def testcase_readermonitorthread(self):
threads = []
- for i in range( 0, 4 ):
- t = testthread( i, self )
- threads.append( t )
+ for i in range(0, 4):
+ t = testthread(i, self)
+ threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
+
def suite():
suite1 = unittest.makeSuite(testcase_readermonitorthread)
return unittest.TestSuite((suite1))
Modified: trunk/pyscard/src/smartcard/test/framework/testcase_readermonitorstress.py
===================================================================
--- trunk/pyscard/src/smartcard/test/framework/testcase_readermonitorstress.py 2010-06-09 15:37:57 UTC (rev 474)
+++ trunk/pyscard/src/smartcard/test/framework/testcase_readermonitorstress.py 2010-06-09 15:42:05 UTC (rev 475)
@@ -34,15 +34,17 @@
from smartcard.ReaderMonitoring import ReaderMonitor, ReaderObserver
-period=.1
+period = .1
# stats on virtual reader insertion/removal
-insertedreaderstats={}
-removedreaderstats={}
+insertedreaderstats = {}
+removedreaderstats = {}
# the virtual list of readers
-mutexvreaders=threading.RLock()
-virtualreaders=[]
+mutexvreaders = threading.RLock()
+virtualreaders = []
+
+
def getReaders():
'''Return virtual list of inserted readers.
Replacement of smartcard.system.readers for testing purpose'''
@@ -62,78 +64,86 @@
readerEvent.clear()
# test running time in seconds
-RUNNING_TIME=15
+RUNNING_TIME = 15
# the count of registered observers
-OBS_COUNT=100
+OBS_COUNT = 100
-class readerInsertionThread( threading.Thread ):
+
+class readerInsertionThread(threading.Thread):
'''Simulate reader insertion every 2 to 4 periods.'''
- def __init__( self ):
- threading.Thread.__init__( self )
- def run( self ):
+
+ def __init__(self):
+ threading.Thread.__init__(self)
+
+ def run(self):
while not exitEvent.isSet():
- time.sleep( random.uniform( 2*period, 4*period ) )
+ time.sleep(random.uniform(2 * period, 4 * period))
readerEvent.wait()
- newreader=random.choice( 'abcdefghijklmnopqrstuvwxyz' )
+ newreader = random.choice('abcdefghijklmnopqrstuvwxyz')
try:
mutexvreaders.acquire()
if newreader not in virtualreaders:
- virtualreaders.append( newreader )
+ virtualreaders.append(newreader)
if insertedreaderstats.has_key(newreader):
- insertedreaderstats[newreader]+=1
+ insertedreaderstats[newreader] += 1
else:
- insertedreaderstats[newreader]=1
+ insertedreaderstats[newreader] = 1
finally:
readerEvent.clear()
mutexvreaders.release()
-class readerRemovalThread( threading.Thread ):
+class readerRemovalThread(threading.Thread):
'''Simulate reader removal every 5 to 6 periods.'''
- def __init__(self ):
+
+ def __init__(self):
threading.Thread.__init__(self)
+
def run(self):
while not exitEvent.isSet():
- time.sleep( random.uniform( 5*period, 6*period ) )
+ time.sleep(random.uniform(5 * period, 6 * period))
readerEvent.wait()
try:
mutexvreaders.acquire()
if virtualreaders:
- oldreader=random.choice( virtualreaders )
+ oldreader = random.choice(virtualreaders)
virtualreaders.remove(oldreader)
if removedreaderstats.has_key(oldreader):
- removedreaderstats[oldreader]+=1
+ removedreaderstats[oldreader] += 1
else:
- removedreaderstats[oldreader]=1
+ removedreaderstats[oldreader] = 1
finally:
readerEvent.clear()
mutexvreaders.release()
-class countobserver( ReaderObserver ):
+class countobserver(ReaderObserver):
'''A simple reader observer that counts added/removed readers.'''
- def __init__( self, obsindex ):
- self.obsindex=obsindex
- self.insertedreaderstats={}
- self.removedreaderstats={}
- self.countnotified=0
- def update( self, observable, (addedreaders, removedreaders) ):
- self.countnotified+=1
+
+ def __init__(self, obsindex):
+ self.obsindex = obsindex
+ self.insertedreaderstats = {}
+ self.removedreaderstats = {}
+ self.countnotified = 0
+
+ def update(self, observable, (addedreaders, removedreaders)):
+ self.countnotified += 1
for newreader in addedreaders:
if self.insertedreaderstats.has_key(newreader):
- self.insertedreaderstats[newreader]+=1
+ self.insertedreaderstats[newreader] += 1
else:
- self.insertedreaderstats[newreader]=1
+ self.insertedreaderstats[newreader] = 1
for oldreader in removedreaders:
if self.removedreaderstats.has_key(oldreader):
- self.removedreaderstats[oldreader]+=1
+ self.removedreaderstats[oldreader] += 1
else:
- self.removedreaderstats[oldreader]=1
+ self.removedreaderstats[oldreader] = 1
-class testcase_readermonitorstress( unittest.TestCase ):
+class testcase_readermonitorstress(unittest.TestCase):
'''Test smartcard framework reader monitoring'''
+
def testcase_readermonitorthread(self):
# create thread that simulates reader insertion
@@ -142,12 +152,12 @@
# create thread that simulates reader removal
removalthread = readerRemovalThread()
- readermonitor = ReaderMonitor( readerProc=getReaders, period=period )
- observers=[]
- for i in range( 0, OBS_COUNT ):
- observer = countobserver( i )
- readermonitor.addObserver( observer )
- observers.append( observer )
+ readermonitor = ReaderMonitor(readerProc=getReaders, period=period)
+ observers = []
+ for i in range(0, OBS_COUNT):
+ observer = countobserver(i)
+ readermonitor.addObserver(observer)
+ observers.append(observer)
# signal threads to start
insertionthread.start()
@@ -155,13 +165,13 @@
# let reader insertion/removal threads run for a while
# then signal threads to end
- time.sleep( RUNNING_TIME )
+ time.sleep(RUNNING_TIME)
exitEvent.set()
# wait until all threads ended
removalthread.join()
insertionthread.join()
- time.sleep( 2*period )
+ time.sleep(2 * period)
for observer in observers:
#print observer.obsindex, observer.insertedreaderstats==insertedreaderstats, observer.countnotified
@@ -170,11 +180,10 @@
#print insertedreaderstats
#print observer.removedreaderstats
#print removedreaderstats
- self.assertEquals( observer.insertedreaderstats, insertedreaderstats )
- self.assertEquals( observer.removedreaderstats, removedreaderstats )
+ self.assertEquals(observer.insertedreaderstats, insertedreaderstats)
+ self.assertEquals(observer.removedreaderstats, removedreaderstats)
-
def suite():
suite1 = unittest.makeSuite(testcase_readermonitorthread)
return unittest.TestSuite((suite1))
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|