[Sqlalchemy-commits] [6067] sqlalchemy/branches/rel_0_6: jython pool tests pass 100% [ticket: 1444]
Brought to you by:
zzzeek
From: <co...@sq...> - 2009-06-19 22:16:25
|
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head><meta http-equiv="content-type" content="text/html; charset=utf-8" /><style type="text/css"><!-- #msg dl { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; } #msg dt { float: left; width: 6em; font-weight: bold; } #msg dt:after { content:':';} #msg dl, #msg dt, #msg ul, #msg li, #header, #footer { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; } #msg dl a { font-weight: bold} #msg dl a:link { color:#fc3; } #msg dl a:active { color:#ff0; } #msg dl a:visited { color:#cc6; } h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; } #msg pre { overflow: auto; background: #ffc; border: 1px #fc0 solid; padding: 6px; } #msg ul, pre { overflow: auto; } #header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; } #patch { width: 100%; } #patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;} #patch .propset h4, #patch .binary h4 {margin:0;} #patch pre {padding:0;line-height:1.2em;margin:0;} #patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;} #patch .propset .diff, #patch .binary .diff {padding:10px 0;} #patch span {display:block;padding:0 10px;} #patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;} #patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;} #patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;} #patch .lines, .info {color:#888;background:#fff;} --></style> <title>[6067] sqlalchemy/branches/rel_0_6: jython pool tests pass 100% [ticket: 1444]</title> </head> <body> <div id="msg"> <dl> <dt>Revision</dt> <dd>6067</dd> <dt>Author</dt> <dd>zzzeek</dd> <dt>Date</dt> <dd>2009-06-19 18:15:23 -0400 (Fri, 19 Jun 2009)</dd> </dl> <h3>Log Message</h3> <pre>jython pool tests pass 100% [ticket:1444]</pre> <h3>Modified Paths</h3> <ul> <li><a href="#sqlalchemybranchesrel_0_6libsqlalchemypoolpy">sqlalchemy/branches/rel_0_6/lib/sqlalchemy/pool.py</a></li> <li><a href="#sqlalchemybranchesrel_0_6libsqlalchemytestnosepluginpy">sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/noseplugin.py</a></li> <li><a href="#sqlalchemybranchesrel_0_6libsqlalchemytesttestingpy">sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/testing.py</a></li> <li><a href="#sqlalchemybranchesrel_0_6libsqlalchemytestutilpy">sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/util.py</a></li> <li><a href="#sqlalchemybranchesrel_0_6testenginetest_poolpy">sqlalchemy/branches/rel_0_6/test/engine/test_pool.py</a></li> </ul> </div> <div id="patch"> <h3>Diff</h3> <a id="sqlalchemybranchesrel_0_6libsqlalchemypoolpy"></a> <div class="modfile"><h4>Modified: sqlalchemy/branches/rel_0_6/lib/sqlalchemy/pool.py (6066 => 6067)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/branches/rel_0_6/lib/sqlalchemy/pool.py 2009-06-18 19:37:16 UTC (rev 6066) +++ sqlalchemy/branches/rel_0_6/lib/sqlalchemy/pool.py 2009-06-19 22:15:23 UTC (rev 6067) </span><span class="lines">@@ -276,8 +276,11 @@ </span><span class="cx"> </span><span class="cx"> </span><span class="cx"> def _finalize_fairy(connection, connection_record, pool, ref=None): </span><del>- if ref is not None and (connection_record.backref is not ref or isinstance(pool, AssertionPool)): </del><ins>+ _refs.discard(connection_record) + + if ref is not None and (connection_record.fairy is not ref or isinstance(pool, AssertionPool)): </ins><span class="cx"> return </span><ins>+ </ins><span class="cx"> if connection is not None: </span><span class="cx"> try: </span><span class="cx"> if pool._reset_on_return: </span><span class="lines">@@ -291,7 +294,7 @@ </span><span class="cx"> if isinstance(e, (SystemExit, KeyboardInterrupt)): </span><span class="cx"> raise </span><span class="cx"> if connection_record is not None: </span><del>- connection_record.backref = None </del><ins>+ connection_record.fairy = None </ins><span class="cx"> if pool._should_log_info: </span><span class="cx"> pool.log("Connection %r being returned to pool" % connection) </span><span class="cx"> if pool._on_checkin: </span><span class="lines">@@ -299,6 +302,8 @@ </span><span class="cx"> l.checkin(connection, connection_record) </span><span class="cx"> pool.return_conn(connection_record) </span><span class="cx"> </span><ins>+_refs = set() + </ins><span class="cx"> class _ConnectionFairy(object): </span><span class="cx"> """Proxies a DB-API connection and provides return-on-dereference support.""" </span><span class="cx"> </span><span class="lines">@@ -310,7 +315,8 @@ </span><span class="cx"> try: </span><span class="cx"> rec = self._connection_record = pool.get() </span><span class="cx"> conn = self.connection = self._connection_record.get_connection() </span><del>- self._connection_record.backref = weakref.ref(self, lambda ref:_finalize_fairy(conn, rec, pool, ref)) </del><ins>+ rec.fairy = weakref.ref(self, lambda ref:_finalize_fairy(conn, rec, pool, ref)) + _refs.add(rec) </ins><span class="cx"> except: </span><span class="cx"> self.connection = None # helps with endless __getattr__ loops later on </span><span class="cx"> self._connection_record = None </span><span class="lines">@@ -409,8 +415,9 @@ </span><span class="cx"> """ </span><span class="cx"> </span><span class="cx"> if self._connection_record is not None: </span><ins>+ _refs.remove(self._connection_record) + self._connection_record.fairy = None </ins><span class="cx"> self._connection_record.connection = None </span><del>- self._connection_record.backref = None </del><span class="cx"> self._pool.do_return_conn(self._connection_record) </span><span class="cx"> self._detached_info = \ </span><span class="cx"> self._connection_record.info.copy() </span><span class="lines">@@ -508,10 +515,8 @@ </span><span class="cx"> del self._conn.current </span><span class="cx"> </span><span class="cx"> def cleanup(self): </span><del>- for conn in list(self._all_conns): - self._all_conns.discard(conn) - if len(self._all_conns) <= self.size: - return </del><ins>+ while len(self._all_conns) > self.size: + self._all_conns.pop() </ins><span class="cx"> </span><span class="cx"> def status(self): </span><span class="cx"> return "SingletonThreadPool id:%d size: %d" % (id(self), len(self._all_conns)) </span></span></pre></div> <a id="sqlalchemybranchesrel_0_6libsqlalchemytestnosepluginpy"></a> <div class="modfile"><h4>Modified: sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/noseplugin.py (6066 => 6067)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/noseplugin.py 2009-06-18 19:37:16 UTC (rev 6066) +++ sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/noseplugin.py 2009-06-19 22:15:23 UTC (rev 6067) </span><span class="lines">@@ -149,6 +149,7 @@ </span><span class="cx"> </span><span class="cx"> def afterTest(self, test): </span><span class="cx"> testing.resetwarnings() </span><ins>+ testing.global_cleanup_assertions() </ins><span class="cx"> </span><span class="cx"> #def handleError(self, test, err): </span><span class="cx"> #pass </span></span></pre></div> <a id="sqlalchemybranchesrel_0_6libsqlalchemytesttestingpy"></a> <div class="modfile"><h4>Modified: sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/testing.py (6066 => 6067)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/testing.py 2009-06-18 19:37:16 UTC (rev 6066) +++ sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/testing.py 2009-06-19 22:15:23 UTC (rev 6067) </span><span class="lines">@@ -8,11 +8,11 @@ </span><span class="cx"> import warnings </span><span class="cx"> from cStringIO import StringIO </span><span class="cx"> </span><del>-from sqlalchemy.test import config, assertsql </del><ins>+from sqlalchemy.test import config, assertsql, util as testutil </ins><span class="cx"> from sqlalchemy.util import function_named </span><span class="cx"> from engines import drop_all_tables </span><span class="cx"> </span><del>-from sqlalchemy import exc as sa_exc, util, types as sqltypes, schema </del><ins>+from sqlalchemy import exc as sa_exc, util, types as sqltypes, schema, pool </ins><span class="cx"> </span><span class="cx"> _ops = { '<': operator.lt, </span><span class="cx"> '>': operator.gt, </span><span class="lines">@@ -413,7 +413,20 @@ </span><span class="cx"> if sys.version_info < (2, 4): </span><span class="cx"> warnings.filterwarnings('ignore', category=FutureWarning) </span><span class="cx"> </span><ins>+def global_cleanup_assertions(): + """Check things that have to be finalized at the end of a test suite. + + Hardcoded at the moment, a modular system can be built here + to support things like PG prepared transactions, tables all + dropped, etc. + + """ </ins><span class="cx"> </span><ins>+ testutil.lazy_gc() + assert not pool._refs + + + </ins><span class="cx"> def against(*queries): </span><span class="cx"> """Boolean predicate, compares to testing database configuration. </span><span class="cx"> </span></span></pre></div> <a id="sqlalchemybranchesrel_0_6libsqlalchemytestutilpy"></a> <div class="modfile"><h4>Modified: sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/util.py (6066 => 6067)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/util.py 2009-06-18 19:37:16 UTC (rev 6066) +++ sqlalchemy/branches/rel_0_6/lib/sqlalchemy/test/util.py 2009-06-19 22:15:23 UTC (rev 6067) </span><span class="lines">@@ -1,17 +1,24 @@ </span><span class="cx"> from sqlalchemy.util import jython, function_named </span><span class="cx"> </span><span class="cx"> import gc </span><ins>+import time </ins><span class="cx"> </span><span class="cx"> if jython: </span><span class="cx"> def gc_collect(*args): </span><ins>+ """aggressive gc.collect for tests.""" </ins><span class="cx"> gc.collect() </span><span class="cx"> time.sleep(0.1) </span><span class="cx"> gc.collect() </span><span class="cx"> gc.collect() </span><span class="cx"> return 0 </span><ins>+ + # "lazy" gc, for VM's that don't GC on refcount == 0 + lazy_gc = gc_collect + </ins><span class="cx"> else: </span><ins>+ # assume CPython - straight gc.collect, lazy_gc() is a pass </ins><span class="cx"> gc_collect = gc.collect </span><ins>+ def lazy_gc(): + pass </ins><span class="cx"> </span><span class="cx"> </span><del>- - </del></span></pre></div> <a id="sqlalchemybranchesrel_0_6testenginetest_poolpy"></a> <div class="modfile"><h4>Modified: sqlalchemy/branches/rel_0_6/test/engine/test_pool.py (6066 => 6067)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/branches/rel_0_6/test/engine/test_pool.py 2009-06-18 19:37:16 UTC (rev 6066) +++ sqlalchemy/branches/rel_0_6/test/engine/test_pool.py 2009-06-19 22:15:23 UTC (rev 6067) </span><span class="lines">@@ -2,7 +2,7 @@ </span><span class="cx"> from sqlalchemy import pool, interfaces, create_engine </span><span class="cx"> import sqlalchemy as tsa </span><span class="cx"> from sqlalchemy.test import TestBase, testing </span><del>-from sqlalchemy.test.util import gc_collect </del><ins>+from sqlalchemy.test.util import gc_collect, lazy_gc </ins><span class="cx"> </span><span class="cx"> </span><span class="cx"> mcid = 1 </span><span class="lines">@@ -52,7 +52,6 @@ </span><span class="cx"> connection2 = manager.connect('foo.db') </span><span class="cx"> connection3 = manager.connect('bar.db') </span><span class="cx"> </span><del>- print "connection " + repr(connection) </del><span class="cx"> self.assert_(connection.cursor() is not None) </span><span class="cx"> self.assert_(connection is connection2) </span><span class="cx"> self.assert_(connection2 is not connection3) </span><span class="lines">@@ -71,8 +70,6 @@ </span><span class="cx"> connection = manager.connect('foo.db') </span><span class="cx"> connection2 = manager.connect('foo.db') </span><span class="cx"> </span><del>- print "connection " + repr(connection) - </del><span class="cx"> self.assert_(connection.cursor() is not None) </span><span class="cx"> self.assert_(connection is not connection2) </span><span class="cx"> </span><span class="lines">@@ -104,7 +101,8 @@ </span><span class="cx"> c2.close() </span><span class="cx"> else: </span><span class="cx"> c2 = None </span><del>- </del><ins>+ lazy_gc() + </ins><span class="cx"> if useclose: </span><span class="cx"> c1 = p.connect() </span><span class="cx"> c2 = p.connect() </span><span class="lines">@@ -118,6 +116,8 @@ </span><span class="cx"> </span><span class="cx"> # extra tests with QueuePool to ensure connections get __del__()ed when dereferenced </span><span class="cx"> if isinstance(p, pool.QueuePool): </span><ins>+ lazy_gc() + </ins><span class="cx"> self.assert_(p.checkedout() == 0) </span><span class="cx"> c1 = p.connect() </span><span class="cx"> c2 = p.connect() </span><span class="lines">@@ -127,6 +127,7 @@ </span><span class="cx"> else: </span><span class="cx"> c2 = None </span><span class="cx"> c1 = None </span><ins>+ lazy_gc() </ins><span class="cx"> self.assert_(p.checkedout() == 0) </span><span class="cx"> </span><span class="cx"> def test_properties(self): </span><span class="lines">@@ -255,7 +256,6 @@ </span><span class="cx"> assert_listeners(p, 5, 2, 2, 2, 2) </span><span class="cx"> del p </span><span class="cx"> </span><del>- print "----" </del><span class="cx"> snoop = ListenAll() </span><span class="cx"> p = _pool(listeners=[snoop]) </span><span class="cx"> assert_listeners(p, 1, 1, 1, 1, 1) </span><span class="lines">@@ -276,6 +276,7 @@ </span><span class="cx"> snoop.assert_in(cc, False, False, True, False) </span><span class="cx"> snoop.assert_total(0, 0, 1, 0) </span><span class="cx"> del c, cc </span><ins>+ lazy_gc() </ins><span class="cx"> snoop.assert_total(0, 0, 1, 1) </span><span class="cx"> </span><span class="cx"> p.dispose() </span><span class="lines">@@ -299,11 +300,13 @@ </span><span class="cx"> c.close() </span><span class="cx"> snoop.assert_total(1, 0, 1, 1) </span><span class="cx"> del c </span><ins>+ lazy_gc() </ins><span class="cx"> snoop.assert_total(1, 0, 1, 1) </span><span class="cx"> c = p.connect() </span><span class="cx"> snoop.assert_total(2, 0, 2, 1) </span><span class="cx"> c.close() </span><span class="cx"> del c </span><ins>+ lazy_gc() </ins><span class="cx"> snoop.assert_total(2, 0, 2, 2) </span><span class="cx"> </span><span class="cx"> # detached </span><span class="lines">@@ -394,261 +397,280 @@ </span><span class="cx"> </span><span class="cx"> class QueuePoolTest(PoolTestBase): </span><span class="cx"> </span><del>- def testqueuepool_del(self): - self._do_testqueuepool(useclose=False) </del><ins>+ def testqueuepool_del(self): + self._do_testqueuepool(useclose=False) </ins><span class="cx"> </span><del>- def testqueuepool_close(self): - self._do_testqueuepool(useclose=True) </del><ins>+ def testqueuepool_close(self): + self._do_testqueuepool(useclose=True) </ins><span class="cx"> </span><del>- def _do_testqueuepool(self, useclose=False): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = False) </del><ins>+ def _do_testqueuepool(self, useclose=False): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = False) </ins><span class="cx"> </span><del>- def status(pool): - tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout()) - print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup - return tup </del><ins>+ def status(pool): + tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout()) + print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup + return tup </ins><span class="cx"> </span><del>- c1 = p.connect() - self.assert_(status(p) == (3,0,-2,1)) - c2 = p.connect() - self.assert_(status(p) == (3,0,-1,2)) - c3 = p.connect() - self.assert_(status(p) == (3,0,0,3)) - c4 = p.connect() - self.assert_(status(p) == (3,0,1,4)) - c5 = p.connect() - self.assert_(status(p) == (3,0,2,5)) - c6 = p.connect() - self.assert_(status(p) == (3,0,3,6)) - if useclose: - c4.close() - c3.close() - c2.close() - else: - c4 = c3 = c2 = None - self.assert_(status(p) == (3,3,3,3)) - if useclose: - c1.close() - c5.close() - c6.close() - else: - c1 = c5 = c6 = None - self.assert_(status(p) == (3,3,0,0)) - c1 = p.connect() - c2 = p.connect() - self.assert_(status(p) == (3, 1, 0, 2), status(p)) - if useclose: - c2.close() - else: - c2 = None - self.assert_(status(p) == (3, 2, 0, 1)) </del><ins>+ c1 = p.connect() + self.assert_(status(p) == (3,0,-2,1)) + c2 = p.connect() + self.assert_(status(p) == (3,0,-1,2)) + c3 = p.connect() + self.assert_(status(p) == (3,0,0,3)) + c4 = p.connect() + self.assert_(status(p) == (3,0,1,4)) + c5 = p.connect() + self.assert_(status(p) == (3,0,2,5)) + c6 = p.connect() + self.assert_(status(p) == (3,0,3,6)) + if useclose: + c4.close() + c3.close() + c2.close() + else: + c4 = c3 = c2 = None + lazy_gc() + + self.assert_(status(p) == (3,3,3,3)) + if useclose: + c1.close() + c5.close() + c6.close() + else: + c1 = c5 = c6 = None + lazy_gc() + + self.assert_(status(p) == (3,3,0,0)) + + c1 = p.connect() + c2 = p.connect() + self.assert_(status(p) == (3, 1, 0, 2), status(p)) + if useclose: + c2.close() + else: + c2 = None + lazy_gc() + + self.assert_(status(p) == (3, 2, 0, 1)) </ins><span class="cx"> </span><del>- def test_timeout(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2) - c1 = p.connect() - c2 = p.connect() - c3 = p.connect() - now = time.time() - try: - c4 = p.connect() - assert False - except tsa.exc.TimeoutError, e: - assert int(time.time() - now) == 2 - - def test_timeout_race(self): - # test a race condition where the initial connecting threads all race - # to queue.Empty, then block on the mutex. each thread consumes a - # connection as they go in. when the limit is reached, the remaining - # threads go in, and get TimeoutError; even though they never got to - # wait for the timeout on queue.get(). the fix involves checking the - # timeout again within the mutex, and if so, unlocking and throwing - # them back to the start of do_get() - p = pool.QueuePool(creator = lambda: mock_dbapi.connect(delay=.05), pool_size = 2, max_overflow = 1, use_threadlocal = False, timeout=3) - timeouts = [] - def checkout(): - for x in xrange(1): - now = time.time() - try: - c1 = p.connect() - except tsa.exc.TimeoutError, e: - timeouts.append(int(time.time()) - now) - continue - time.sleep(4) - c1.close() - - threads = [] - for i in xrange(10): - th = threading.Thread(target=checkout) - th.start() - threads.append(th) - for th in threads: - th.join() - - print timeouts - assert len(timeouts) > 0 - for t in timeouts: - assert abs(t - 3) < 1, "Not all timeouts were 3 seconds: " + repr(timeouts) - - def _test_overflow(self, thread_count, max_overflow): - def creator(): - time.sleep(.05) - return mock_dbapi.connect() - - p = pool.QueuePool(creator=creator, - pool_size=3, timeout=2, - max_overflow=max_overflow) - peaks = [] - def whammy(): - for i in range(10): - try: - con = p.connect() - time.sleep(.005) - peaks.append(p.overflow()) - con.close() - del con - except tsa.exc.TimeoutError: - pass - threads = [] - for i in xrange(thread_count): - th = threading.Thread(target=whammy) - th.start() - threads.append(th) - for th in threads: - th.join() - - self.assert_(max(peaks) <= max_overflow) - - def test_no_overflow(self): - self._test_overflow(40, 0) - - def test_max_overflow(self): - self._test_overflow(40, 5) - - def test_mixed_close(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) - c1 = p.connect() - c2 = p.connect() - assert c1 is c2 - c1.close() - c2 = None - assert p.checkedout() == 1 - c1 = None - assert p.checkedout() == 0 - - def test_weakref_kaboom(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) - c1 = p.connect() - c2 = p.connect() - c1.close() - c2 = None - del c1 - del c2 - gc_collect() - assert p.checkedout() == 0 - c3 = p.connect() - assert c3 is not None - - def test_trick_the_counter(self): - """this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread - with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an - ambiguous counter. i.e. its not true reference counting.""" - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) - c1 = p.connect() - c2 = p.connect() - assert c1 is c2 - c1.close() - c2 = p.connect() - c2.close() - self.assert_(p.checkedout() != 0) - - c2.close() - self.assert_(p.checkedout() == 0) - - def test_recycle(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 1, max_overflow = 0, use_threadlocal = False, recycle=3) - - c1 = p.connect() - c_id = id(c1.connection) - c1.close() - c2 = p.connect() - assert id(c2.connection) == c_id - c2.close() - time.sleep(4) - c3= p.connect() - assert id(c3.connection) != c_id - - def test_invalidate(self): - dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) - c1 = p.connect() - c_id = c1.connection.id - c1.close(); c1=None - c1 = p.connect() - assert c1.connection.id == c_id - c1.invalidate() - c1 = None - - c1 = p.connect() - assert c1.connection.id != c_id - - def test_recreate(self): - dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) - p2 = p.recreate() - assert p2.size() == 1 - assert p2._use_threadlocal is False - assert p2._max_overflow == 0 - - def test_reconnect(self): - """tests reconnect operations at the pool level. SA's engine/dialect includes another - layer of reconnect support for 'database was lost' errors.""" </del><ins>+ c1.close() </ins><span class="cx"> </span><del>- dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) - c1 = p.connect() - c_id = c1.connection.id - c1.close(); c1=None </del><ins>+ lazy_gc() + assert not pool._refs + + def test_timeout(self): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2) + c1 = p.connect() + c2 = p.connect() + c3 = p.connect() + now = time.time() + try: + c4 = p.connect() + assert False + except tsa.exc.TimeoutError, e: + assert int(time.time() - now) == 2 </ins><span class="cx"> </span><del>- c1 = p.connect() - assert c1.connection.id == c_id - dbapi.raise_error = True - c1.invalidate() - c1 = None </del><ins>+ def test_timeout_race(self): + # test a race condition where the initial connecting threads all race + # to queue.Empty, then block on the mutex. each thread consumes a + # connection as they go in. when the limit is reached, the remaining + # threads go in, and get TimeoutError; even though they never got to + # wait for the timeout on queue.get(). the fix involves checking the + # timeout again within the mutex, and if so, unlocking and throwing + # them back to the start of do_get() + p = pool.QueuePool(creator = lambda: mock_dbapi.connect(delay=.05), pool_size = 2, max_overflow = 1, use_threadlocal = False, timeout=3) + timeouts = [] + def checkout(): + for x in xrange(1): + now = time.time() + try: + c1 = p.connect() + except tsa.exc.TimeoutError, e: + timeouts.append(int(time.time()) - now) + continue + time.sleep(4) + c1.close() </ins><span class="cx"> </span><del>- c1 = p.connect() - assert c1.connection.id != c_id </del><ins>+ threads = [] + for i in xrange(10): + th = threading.Thread(target=checkout) + th.start() + threads.append(th) + for th in threads: + th.join() </ins><span class="cx"> </span><del>- def test_detach(self): - dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) </del><ins>+ print timeouts + assert len(timeouts) > 0 + for t in timeouts: + assert abs(t - 3) < 1, "Not all timeouts were 3 seconds: " + repr(timeouts) </ins><span class="cx"> </span><del>- c1 = p.connect() - c1.detach() - c_id = c1.connection.id </del><ins>+ def _test_overflow(self, thread_count, max_overflow): + def creator(): + time.sleep(.05) + return mock_dbapi.connect() + + p = pool.QueuePool(creator=creator, + pool_size=3, timeout=2, + max_overflow=max_overflow) + peaks = [] + def whammy(): + for i in range(10): + try: + con = p.connect() + time.sleep(.005) + peaks.append(p.overflow()) + con.close() + del con + except tsa.exc.TimeoutError: + pass + threads = [] + for i in xrange(thread_count): + th = threading.Thread(target=whammy) + th.start() + threads.append(th) + for th in threads: + th.join() + + self.assert_(max(peaks) <= max_overflow) + + lazy_gc() + assert not pool._refs + + def test_no_overflow(self): + self._test_overflow(40, 0) + + def test_max_overflow(self): + self._test_overflow(40, 5) + + def test_mixed_close(self): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + c1 = p.connect() + c2 = p.connect() + assert c1 is c2 + c1.close() + c2 = None + assert p.checkedout() == 1 + c1 = None + lazy_gc() + assert p.checkedout() == 0 + + lazy_gc() + assert not pool._refs + + def test_weakref_kaboom(self): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + c1 = p.connect() + c2 = p.connect() + c1.close() + c2 = None + del c1 + del c2 + gc_collect() + assert p.checkedout() == 0 + c3 = p.connect() + assert c3 is not None + + def test_trick_the_counter(self): + """this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread + with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an + ambiguous counter. i.e. its not true reference counting.""" + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + c1 = p.connect() + c2 = p.connect() + assert c1 is c2 + c1.close() + c2 = p.connect() + c2.close() + self.assert_(p.checkedout() != 0) + + c2.close() + self.assert_(p.checkedout() == 0) + + def test_recycle(self): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 1, max_overflow = 0, use_threadlocal = False, recycle=3) + + c1 = p.connect() + c_id = id(c1.connection) + c1.close() + c2 = p.connect() + assert id(c2.connection) == c_id + c2.close() + time.sleep(4) + c3= p.connect() + assert id(c3.connection) != c_id + + def test_invalidate(self): + dbapi = MockDBAPI() + p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + c1 = p.connect() + c_id = c1.connection.id + c1.close(); c1=None + c1 = p.connect() + assert c1.connection.id == c_id + c1.invalidate() + c1 = None + + c1 = p.connect() + assert c1.connection.id != c_id + + def test_recreate(self): + dbapi = MockDBAPI() + p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + p2 = p.recreate() + assert p2.size() == 1 + assert p2._use_threadlocal is False + assert p2._max_overflow == 0 + + def test_reconnect(self): + """tests reconnect operations at the pool level. SA's engine/dialect includes another + layer of reconnect support for 'database was lost' errors.""" + + dbapi = MockDBAPI() + p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + c1 = p.connect() + c_id = c1.connection.id + c1.close(); c1=None + + c1 = p.connect() + assert c1.connection.id == c_id + dbapi.raise_error = True + c1.invalidate() + c1 = None + + c1 = p.connect() + assert c1.connection.id != c_id + + def test_detach(self): + dbapi = MockDBAPI() + p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + + c1 = p.connect() + c1.detach() + c_id = c1.connection.id + + c2 = p.connect() + assert c2.connection.id != c1.connection.id + dbapi.raise_error = True + + c2.invalidate() + c2 = None + + c2 = p.connect() + assert c2.connection.id != c1.connection.id + + con = c1.connection + + assert not con.closed + c1.close() + assert con.closed + + def test_threadfairy(self): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + c1 = p.connect() + c1.close() + c2 = p.connect() + assert c2.connection is not None </ins><span class="cx"> </span><del>- c2 = p.connect() - assert c2.connection.id != c1.connection.id - dbapi.raise_error = True - - c2.invalidate() - c2 = None - - c2 = p.connect() - assert c2.connection.id != c1.connection.id - - con = c1.connection - - assert not con.closed - c1.close() - assert con.closed - - def test_threadfairy(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) - c1 = p.connect() - c1.close() - c2 = p.connect() - assert c2.connection is not None - </del><span class="cx"> class SingletonThreadPoolTest(PoolTestBase): </span><span class="cx"> def test_cleanup(self): </span><span class="cx"> """test that the pool's connections are OK after cleanup() has been called.""" </span></span></pre> </div> </div> </body> </html> |