From: <zy...@us...> - 2010-08-25 01:40:52
|
Revision: 7104 http://jython.svn.sourceforge.net/jython/?rev=7104&view=rev Author: zyasoft Date: 2010-08-25 01:40:46 +0000 (Wed, 25 Aug 2010) Log Message: ----------- Now test that defaultdict's __missing__/default_factory is computed atomically. Modified Paths: -------------- trunk/jython/Lib/test/test_defaultdict_jy.py Modified: trunk/jython/Lib/test/test_defaultdict_jy.py =================================================================== --- trunk/jython/Lib/test/test_defaultdict_jy.py 2010-08-25 01:38:26 UTC (rev 7103) +++ trunk/jython/Lib/test/test_defaultdict_jy.py 2010-08-25 01:40:46 UTC (rev 7104) @@ -3,10 +3,15 @@ Made for Jython. """ import pickle +import time +import threading import unittest from collections import defaultdict from test import test_support +from random import randint +from java.util.concurrent.atomic import AtomicInteger + class PickleTestCase(unittest.TestCase): def test_pickle(self): @@ -15,8 +20,63 @@ self.assertEqual(pickle.loads(pickle.dumps(d, proto)), d) +# TODO push into test_support or some other common module - run_threads +# is originally from test_list_jy.py + +class ThreadSafetyTestCase(unittest.TestCase): + + def run_threads(self, f, num=10): + threads = [] + for i in xrange(num): + t = threading.Thread(target=f) + t.start() + threads.append(t) + timeout = 10. # be especially generous + for t in threads: + t.join(timeout) + timeout = 0. + for t in threads: + self.assertFalse(t.isAlive()) + + def test_inc_dec(self): + + class Counter(object): + def __init__(self): + self.atomic = AtomicInteger() + # waiting is important here to ensure that + # defaultdict factories can step on each other + time.sleep(0.001) + + def decrementAndGet(self): + return self.atomic.decrementAndGet() + + def incrementAndGet(self): + return self.atomic.incrementAndGet() + + def get(self): + return self.atomic.get() + + def __repr__(self): + return "Counter<%s>" % (self.atomic.get()) + + counters = defaultdict(Counter) + size = 17 + + def tester(): + for i in xrange(1000): + j = (i + randint(0, size)) % size + counters[j].decrementAndGet() + time.sleep(0.0001) + counters[j].incrementAndGet() + + self.run_threads(tester, 20) + + for i in xrange(size): + self.assertEqual(counters[i].get(), 0, counters) + + def test_main(): - test_support.run_unittest(PickleTestCase) + test_support.run_unittest(PickleTestCase, ThreadSafetyTestCase) if __name__ == '__main__': This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |