Revision: 7104
http://jython.svn.sourceforge.net/jython/?rev=7104&view=rev
Author: zyasoft
Date: 2010-08-25 01:40:46 +0000 (Wed, 25 Aug 2010)
Log Message:
-----------
Now test that defaultdict's __missing__/default_factory is computed
atomically.
Modified Paths:
--------------
trunk/jython/Lib/test/test_defaultdict_jy.py
Modified: trunk/jython/Lib/test/test_defaultdict_jy.py
===================================================================
--- trunk/jython/Lib/test/test_defaultdict_jy.py 2010-08-25 01:38:26 UTC (rev 7103)
+++ trunk/jython/Lib/test/test_defaultdict_jy.py 2010-08-25 01:40:46 UTC (rev 7104)
@@ -3,10 +3,15 @@
Made for Jython.
"""
import pickle
+import time
+import threading
import unittest
from collections import defaultdict
from test import test_support
+from random import randint
+from java.util.concurrent.atomic import AtomicInteger
+
class PickleTestCase(unittest.TestCase):
def test_pickle(self):
@@ -15,8 +20,63 @@
self.assertEqual(pickle.loads(pickle.dumps(d, proto)), d)
+# TODO push into test_support or some other common module - run_threads
+# is originally from test_list_jy.py
+
+class ThreadSafetyTestCase(unittest.TestCase):
+
+ def run_threads(self, f, num=10):
+ threads = []
+ for i in xrange(num):
+ t = threading.Thread(target=f)
+ t.start()
+ threads.append(t)
+ timeout = 10. # be especially generous
+ for t in threads:
+ t.join(timeout)
+ timeout = 0.
+ for t in threads:
+ self.assertFalse(t.isAlive())
+
+ def test_inc_dec(self):
+
+ class Counter(object):
+ def __init__(self):
+ self.atomic = AtomicInteger()
+ # waiting is important here to ensure that
+ # defaultdict factories can step on each other
+ time.sleep(0.001)
+
+ def decrementAndGet(self):
+ return self.atomic.decrementAndGet()
+
+ def incrementAndGet(self):
+ return self.atomic.incrementAndGet()
+
+ def get(self):
+ return self.atomic.get()
+
+ def __repr__(self):
+ return "Counter<%s>" % (self.atomic.get())
+
+ counters = defaultdict(Counter)
+ size = 17
+
+ def tester():
+ for i in xrange(1000):
+ j = (i + randint(0, size)) % size
+ counters[j].decrementAndGet()
+ time.sleep(0.0001)
+ counters[j].incrementAndGet()
+
+ self.run_threads(tester, 20)
+
+ for i in xrange(size):
+ self.assertEqual(counters[i].get(), 0, counters)
+
+
def test_main():
- test_support.run_unittest(PickleTestCase)
+ test_support.run_unittest(PickleTestCase, ThreadSafetyTestCase)
if __name__ == '__main__':
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|