[Sqlalchemy-tickets] Issue #2985: simplify pool recycle logic (zzzeek/sqlalchemy)
Brought to you by:
zzzeek
|
From: Mike B. <iss...@bi...> - 2014-03-05 05:22:18
|
New issue 2985: simplify pool recycle logic https://bitbucket.org/zzzeek/sqlalchemy/issue/2985/simplify-pool-recycle-logic Mike Bayer: using a simple invalidation time we can do away with all the "pool replacement" logic. the current logic is subject to a pretty obvious race condition, where as many connections all hit a disconnect wall, all of the Connection objects hosting them will simultaneously call upon self.engine.dispose(). this means we could have N pools generated and immediately chucked within a disconnect cycle. the patch below removes all of that and replaces with a simple timeout which incurs no overhead and no race conditions. the only difference is that the "bad" connections hang around until they are invalidated on checkout. ``` #!diff diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 888a15f..20b5227 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -1084,9 +1084,7 @@ class Connection(Connectable): del self._is_disconnect dbapi_conn_wrapper = self.connection self.invalidate(e) - if not hasattr(dbapi_conn_wrapper, '_pool') or \ - dbapi_conn_wrapper._pool is self.engine.pool: - self.engine.dispose() + self.engine.pool._invalidate(dbapi_conn_wrapper) if self.should_close_with_result: self.close() @@ -1496,7 +1494,7 @@ class Engine(Connectable, log.Identified): the engine are not affected. """ - self.pool = self.pool._replace() + self.pool.dispose() def _execute_default(self, default): with self.contextual_connect() as conn: diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py index 473b665..4a07e78 100644 --- a/lib/sqlalchemy/orm/strategies.py +++ b/lib/sqlalchemy/orm/strategies.py @@ -528,7 +528,6 @@ class LazyLoader(AbstractRelationshipLoader): def _emit_lazyload(self, strategy_options, session, state, ident_key, passive): q = session.query(self.mapper)._adapt_all_clauses() - if self.parent_property.secondary is not None: q = q.select_from(self.mapper, self.parent_property.secondary) diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index af9b8fc..f78825e 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -210,6 +210,7 @@ class Pool(log.Identified): self._threadconns = threading.local() self._creator = creator self._recycle = recycle + self._invalidate_time = 0 self._use_threadlocal = use_threadlocal if reset_on_return in ('rollback', True, reset_rollback): self._reset_on_return = reset_rollback @@ -276,6 +277,22 @@ class Pool(log.Identified): return _ConnectionRecord(self) + def _invalidate(self, connection): + """Mark all connections established within the generation + of the given connection as invalidated. + + If this pool's last invalidate time is before when the given + connection was created, update the timestamp til now. Otherwise, + no action is performed. + + Connections with a start time prior to this pool's invalidation + time will be recycled upon next checkout. + """ + rec = getattr(connection, "_connection_record", None) + if not rec or self._invalidate_time < rec.starttime: + self._invalidate_time = time.time() + + def recreate(self): """Return a new :class:`.Pool`, of the same class as this one and configured with identical creation arguments. @@ -301,17 +318,6 @@ class Pool(log.Identified): raise NotImplementedError() - def _replace(self): - """Dispose + recreate this pool. - - Subclasses may employ special logic to - move threads waiting on this pool to the - new one. - - """ - self.dispose() - return self.recreate() - def connect(self): """Return a DBAPI connection from the pool. @@ -483,6 +489,7 @@ class _ConnectionRecord(object): self.connection = None def get_connection(self): + recycle = False if self.connection is None: self.connection = self.__connect() self.info.clear() @@ -493,6 +500,15 @@ class _ConnectionRecord(object): self.__pool.logger.info( "Connection %r exceeded timeout; recycling", self.connection) + recycle = True + elif self.__pool._invalidate_time > self.starttime: + self.__pool.logger.info( + "Connection %r invalidated due to pool invalidation; recycling", + self.connection + ) + recycle = True + + if recycle: self.__close() self.connection = self.__connect() self.info.clear() @@ -911,8 +927,6 @@ class QueuePool(Pool): try: wait = use_overflow and self._overflow >= self._max_overflow return self._pool.get(wait, self._timeout) - except sqla_queue.SAAbort as aborted: - return aborted.context._do_get() except sqla_queue.Empty: if use_overflow and self._overflow >= self._max_overflow: if not wait: @@ -974,12 +988,6 @@ class QueuePool(Pool): self._overflow = 0 - self.size() self.logger.info("Pool disposed. %s", self.status()) - def _replace(self): - self.dispose() - np = self.recreate() - self._pool.abort(np) - return np - def status(self): return "Pool size: %d Connections in pool: %d "\ "Current Overflow: %d Current Checked out "\ diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index fc6f3dc..cde19b3 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -1069,7 +1069,8 @@ class QueuePoolTest(PoolTestBase): # inside the queue, before we invalidate the other # two conns time.sleep(.2) - p2 = p._replace() + p._invalidate(c2) + c2.invalidate() for t in threads: t.join(join_timeout) @@ -1079,19 +1080,18 @@ class QueuePoolTest(PoolTestBase): @testing.requires.threading_with_mock def test_notify_waiters(self): dbapi = MockDBAPI() + canary = [] - def creator1(): + def creator(): canary.append(1) return dbapi.connect() - def creator2(): - canary.append(2) - return dbapi.connect() - p1 = pool.QueuePool(creator=creator1, + p1 = pool.QueuePool(creator=creator, pool_size=1, timeout=None, max_overflow=0) - p2 = pool.NullPool(creator=creator2) + #p2 = pool.NullPool(creator=creator2) def waiter(p): conn = p.connect() + canary.append(2) time.sleep(.5) conn.close() @@ -1104,12 +1104,14 @@ class QueuePoolTest(PoolTestBase): threads.append(t) time.sleep(.5) eq_(canary, [1]) - p1._pool.abort(p2) + + c1.invalidate() + p1._invalidate(c1) for t in threads: t.join(join_timeout) - eq_(canary, [1, 2, 2, 2, 2, 2]) + eq_(canary, [1, 1, 2, 2, 2, 2, 2]) def test_dispose_closes_pooled(self): dbapi = MockDBAPI() diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index ba336a1..a3ad9c5 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -146,16 +146,20 @@ class MockReconnectTest(fixtures.TestBase): # close shouldnt break conn.close() - is_not_(self.db.pool, db_pool) - - # ensure all connections closed (pool was recycled) + # ensure one connection closed... eq_( [c.close.mock_calls for c in self.dbapi.connections], - [[call()], [call()]] + [[call()], []] ) conn = self.db.connect() + + eq_( + [c.close.mock_calls for c in self.dbapi.connections], + [[call()], [call()], []] + ) + conn.execute(select([1])) conn.close() @@ -534,8 +538,6 @@ class RealReconnectTest(fixtures.TestBase): # invalidate() also doesn't screw up assert_raises(exc.DBAPIError, engine.connect) - # pool was recreated - assert engine.pool is not p1 def test_null_pool(self): engine = \ ``` |