[Sqlalchemy-commits] [1154] sqlalchemy/trunk/test: refactor to engine to have a separate SQLSession
Brought to you by:
zzzeek
From: <co...@sq...> - 2006-03-17 02:15:24
|
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head><style type="text/css"><!-- #msg dl { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; } #msg dt { float: left; width: 6em; font-weight: bold; } #msg dt:after { content:':';} #msg dl, #msg dt, #msg ul, #msg li { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; } #msg dl a { font-weight: bold} #msg dl a:link { color:#fc3; } #msg dl a:active { color:#ff0; } #msg dl a:visited { color:#cc6; } h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; } #msg pre { overflow: auto; background: #ffc; border: 1px #fc0 solid; padding: 6px; } #msg ul, pre { overflow: auto; } #patch { width: 100%; } #patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;} #patch .propset h4, #patch .binary h4 {margin:0;} #patch pre {padding:0;line-height:1.2em;margin:0;} #patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;} #patch .propset .diff, #patch .binary .diff {padding:10px 0;} #patch span {display:block;padding:0 10px;} #patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;} #patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;} #patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;} #patch .lines, .info {color:#888;background:#fff;} --></style> <title>[1154] sqlalchemy/trunk/test: refactor to engine to have a separate SQLSession object.</title> </head> <body> <div id="msg"> <dl> <dt>Revision</dt> <dd>1154</dd> <dt>Author</dt> <dd>zzzeek</dd> <dt>Date</dt> <dd>2006-03-16 20:15:09 -0600 (Thu, 16 Mar 2006)</dd> </dl> <h3>Log Message</h3> <pre>refactor to engine to have a separate SQLSession object. allows nested transactions. util.ThreadLocal __hasattr__ method/raise_error param meaningless, removed renamed old engines test to reflection</pre> <h3>Modified Paths</h3> <ul> <li><a href="#sqlalchemytrunklibsqlalchemydatabasessqlitepy">sqlalchemy/trunk/lib/sqlalchemy/databases/sqlite.py</a></li> <li><a href="#sqlalchemytrunklibsqlalchemyenginepy">sqlalchemy/trunk/lib/sqlalchemy/engine.py</a></li> <li><a href="#sqlalchemytrunklibsqlalchemyutilpy">sqlalchemy/trunk/lib/sqlalchemy/util.py</a></li> <li><a href="#sqlalchemytrunktestalltestspy">sqlalchemy/trunk/test/alltests.py</a></li> <li><a href="#sqlalchemytrunktesttablespy">sqlalchemy/trunk/test/tables.py</a></li> </ul> <h3>Added Paths</h3> <ul> <li><a href="#sqlalchemytrunktestreflectionpy">sqlalchemy/trunk/test/reflection.py</a></li> </ul> <h3>Removed Paths</h3> <ul> <li><a href="#sqlalchemytrunktestenginespy">sqlalchemy/trunk/test/engines.py</a></li> </ul> </div> <div id="patch"> <h3>Diff</h3> <a id="sqlalchemytrunklibsqlalchemydatabasessqlitepy"></a> <div class="modfile"><h4>Modified: sqlalchemy/trunk/lib/sqlalchemy/databases/sqlite.py (1153 => 1154)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/trunk/lib/sqlalchemy/databases/sqlite.py 2006-03-17 01:01:34 UTC (rev 1153) +++ sqlalchemy/trunk/lib/sqlalchemy/databases/sqlite.py 2006-03-17 02:15:09 UTC (rev 1154) </span><span class="lines">@@ -151,6 +151,9 @@ </span><span class="cx"> </span><span class="cx"> def dbapi(self): </span><span class="cx"> return sqlite </span><ins>+ + def push_session(self): + raise InvalidRequestError("SQLite doesn't support nested sessions") </ins><span class="cx"> </span><span class="cx"> def schemagenerator(self, **params): </span><span class="cx"> return SQLiteSchemaGenerator(self, **params) </span></span></pre></div> <a id="sqlalchemytrunklibsqlalchemyenginepy"></a> <div class="modfile"><h4>Modified: sqlalchemy/trunk/lib/sqlalchemy/engine.py (1153 => 1154)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/trunk/lib/sqlalchemy/engine.py 2006-03-17 01:01:34 UTC (rev 1153) +++ sqlalchemy/trunk/lib/sqlalchemy/engine.py 2006-03-17 02:15:09 UTC (rev 1154) </span><span class="lines">@@ -171,8 +171,38 @@ </span><span class="cx"> return default.arg </span><span class="cx"> </span><span class="cx"> class SQLSession(object): </span><del>- pass </del><ins>+ """represents a particular connection retrieved from a SQLEngine, and associated transactional state.""" + def __init__(self, engine, parent=None): + self.engine = engine + self.parent = parent + self.__tcount = 0 + def _connection(self): + try: + return self.__connection + except AttributeError: + self.__connection = self.engine._pool.connect() + return self.__connection + connection = property(_connection, doc="the connection represented by this SQLSession. The connection is late-connecting, meaning the call to the connection pool only occurs when it is first called (and the pool will typically only connect the first time it is called as well)") </ins><span class="cx"> </span><ins>+ def begin(self): + """begins" a transaction on this SQLSession's connection. repeated calls to begin() will increment a counter that must be decreased by corresponding commit() statements before an actual commit occurs. this is to provide "nested" behavior of transactions so that different functions in a particular call stack can call begin()/commit() independently of each other without knowledge of an existing transaction.""" + if self.__tcount == 0: + self.engine.do_begin(self.connection) + self.__tcount += 1 + def rollback(self): + """rolls back the transaction on this SQLSession's connection. this can be called regardless of the "begin" counter value, i.e. can be called from anywhere inside a callstack. the "begin" counter is cleared.""" + if self.__tcount > 0: + self.engine.do_rollback(self.connection) + self.__tcount = 0 + def commit(self): + """commits the transaction started by begin(). If begin() was called multiple times, a counter will be decreased for each call to commit(), with the actual commit operation occuring when the counter reaches zero. this is to provide "nested" behavior of transactions so that different functions in a particular call stack can call begin()/commit() independently of each other without knowledge of an existing transaction.""" + if self.__tcount == 1: + self.engine.do_commit(self.connection) + elif self.__tcount > 1: + self.__tcount -= 1 + def is_begun(self): + return self.__tcount > 0 + </ins><span class="cx"> class SQLEngine(schema.SchemaEngine): </span><span class="cx"> """ </span><span class="cx"> The central "database" object used by an application. Subclasses of this object is used </span><span class="lines">@@ -192,6 +222,7 @@ </span><span class="cx"> (cargs, cparams) = self.connect_args() </span><span class="cx"> if pool is None: </span><span class="cx"> params['echo'] = echo_pool </span><ins>+ params['use_threadlocal'] = False </ins><span class="cx"> self._pool = sqlalchemy.pool.manage(self.dbapi(), **params).get_pool(*cargs, **cparams) </span><span class="cx"> else: </span><span class="cx"> self._pool = pool </span><span class="lines">@@ -200,7 +231,7 @@ </span><span class="cx"> self.echo_uow = echo_uow </span><span class="cx"> self.convert_unicode = convert_unicode </span><span class="cx"> self.encoding = encoding </span><del>- self.context = util.ThreadLocal(raiseerror=False) </del><ins>+ self.context = util.ThreadLocal() </ins><span class="cx"> self._ischema = None </span><span class="cx"> self._figure_paramstyle() </span><span class="cx"> self.logger = logger or util.Logger(origin='engine') </span><span class="lines">@@ -390,9 +421,28 @@ </span><span class="cx"> """implementations might want to put logic here for turning autocommit on/off, etc.""" </span><span class="cx"> connection.commit() </span><span class="cx"> </span><ins>+ def _session(self): + if not hasattr(self.context, 'session'): + self.context.session = SQLSession(self) + return self.context.session + session = property(_session, doc="returns the current thread's SQLSession") + + def push_session(self): + """pushes a new SQLSession onto this engine, temporarily replacing the previous one for the current thread. The previous session can be restored by calling pop_session(). this allows the usage of a new connection and possibly transaction within a particular block, superceding the existing one, including any transactions that are in progress. Returns the new SQLSession object.""" + sess = SQLSession(self, self.context.session) + self.context.session = sess + return sess + def pop_session(self): + """restores the current thread's SQLSession to that before the last push_session. Returns the restored SQLSession object. Raises an exception if there is no SQLSession pushed onto the stack.""" + sess = self.context.session.parent + if sess is None: + raise InvalidRequestError("No SQLSession is pushed onto the stack.") + self.context.session = sess + return sess + </ins><span class="cx"> def connection(self): </span><span class="cx"> """returns a managed DBAPI connection from this SQLEngine's connection pool.""" </span><del>- return self._pool.connect() </del><ins>+ return self.session.connection </ins><span class="cx"> </span><span class="cx"> def unique_connection(self): </span><span class="cx"> """returns a DBAPI connection from this SQLEngine's connection pool that is distinct from the current thread's connection.""" </span><span class="lines">@@ -434,40 +484,15 @@ </span><span class="cx"> self.commit() </span><span class="cx"> </span><span class="cx"> def begin(self): </span><del>- """"begins" a transaction on a pooled connection, and stores the connection in a thread-local - context. repeated calls to begin() within the same thread will increment a counter that must be - decreased by corresponding commit() statements before an actual commit occurs. this is to provide - "nested" behavior of transactions so that different functions can all call begin()/commit() and still - call each other.""" - if getattr(self.context, 'transaction', None) is None: - conn = self.connection() - self.do_begin(conn) - self.context.transaction = conn - self.context.tcount = 1 - else: - self.context.tcount += 1 </del><ins>+ """"begins a transaction on the current thread's SQLSession.""" + self.session.begin() </ins><span class="cx"> </span><span class="cx"> def rollback(self): </span><del>- """rolls back the current thread-local transaction started by begin(). the "begin" counter - is cleared and the transaction ended.""" - if self.context.transaction is not None: - self.do_rollback(self.context.transaction) - self.context.transaction = None - self.context.tcount = None </del><ins>+ """rolls back the transaction on the current thread's SQLSession.""" + self.session.rollback() </ins><span class="cx"> </span><span class="cx"> def commit(self): </span><del>- """commits the current thread-local transaction started by begin(). If begin() was called multiple - times, a counter will be decreased for each call to commit(), with the actual commit operation occuring - when the counter reaches zero. this is to provide - "nested" behavior of transactions so that different functions can all call begin()/commit() and still - call each other.""" - if self.context.transaction is not None: - count = self.context.tcount - 1 - self.context.tcount = count - if count == 0: - self.do_commit(self.context.transaction) - self.context.transaction = None - self.context.tcount = None </del><ins>+ self.session.commit() </ins><span class="cx"> </span><span class="cx"> def _process_defaults(self, proxy, compiled, parameters, **kwargs): </span><span class="cx"> """INSERT and UPDATE statements, when compiled, may have additional columns added to their </span><span class="lines">@@ -642,7 +667,7 @@ </span><span class="cx"> self._executemany(cursor, statement, parameters) </span><span class="cx"> else: </span><span class="cx"> self._execute(cursor, statement, parameters) </span><del>- if self.context.transaction is None: </del><ins>+ if not self.session.is_begun(): </ins><span class="cx"> self.do_commit(connection) </span><span class="cx"> except: </span><span class="cx"> self.do_rollback(connection) </span></span></pre></div> <a id="sqlalchemytrunklibsqlalchemyutilpy"></a> <div class="modfile"><h4>Modified: sqlalchemy/trunk/lib/sqlalchemy/util.py (1153 => 1154)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/trunk/lib/sqlalchemy/util.py 2006-03-17 01:01:34 UTC (rev 1153) +++ sqlalchemy/trunk/lib/sqlalchemy/util.py 2006-03-17 02:15:09 UTC (rev 1154) </span><span class="lines">@@ -214,11 +214,8 @@ </span><span class="cx"> </span><span class="cx"> class ThreadLocal(object): </span><span class="cx"> """an object in which attribute access occurs only within the context of the current thread""" </span><del>- def __init__(self, raiseerror = True): </del><ins>+ def __init__(self): </ins><span class="cx"> self.__dict__['_tdict'] = {} </span><del>- self.__dict__['_raiseerror'] = raiseerror - def __hasattr__(self, key): - return self._tdict.has_key("%d_%s" % (thread.get_ident(), key)) </del><span class="cx"> def __delattr__(self, key): </span><span class="cx"> try: </span><span class="cx"> del self._tdict["%d_%s" % (thread.get_ident(), key)] </span><span class="lines">@@ -228,10 +225,7 @@ </span><span class="cx"> try: </span><span class="cx"> return self._tdict["%d_%s" % (thread.get_ident(), key)] </span><span class="cx"> except KeyError: </span><del>- if self._raiseerror: - raise AttributeError(key) - else: - return None </del><ins>+ raise AttributeError(key) </ins><span class="cx"> def __setattr__(self, key, value): </span><span class="cx"> self._tdict["%d_%s" % (thread.get_ident(), key)] = value </span><span class="cx"> </span></span></pre></div> <a id="sqlalchemytrunktestalltestspy"></a> <div class="modfile"><h4>Modified: sqlalchemy/trunk/test/alltests.py (1153 => 1154)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/trunk/test/alltests.py 2006-03-17 01:01:34 UTC (rev 1153) +++ sqlalchemy/trunk/test/alltests.py 2006-03-17 02:15:09 UTC (rev 1154) </span><span class="lines">@@ -12,11 +12,12 @@ </span><span class="cx"> 'attributes', </span><span class="cx"> 'dependency', </span><span class="cx"> </span><del>- # connectivity </del><ins>+ # connectivity, execution </ins><span class="cx"> 'pool', </span><ins>+ 'engine', </ins><span class="cx"> </span><span class="cx"> # schema/tables </span><del>- 'engines', </del><ins>+ 'reflection', </ins><span class="cx"> 'testtypes', </span><span class="cx"> 'indexes', </span><span class="cx"> </span></span></pre></div> <a id="sqlalchemytrunktestenginespy"></a> <div class="delfile"><h4>Deleted: sqlalchemy/trunk/test/engines.py (1153 => 1154)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/trunk/test/engines.py 2006-03-17 01:01:34 UTC (rev 1153) +++ sqlalchemy/trunk/test/engines.py 2006-03-17 02:15:09 UTC (rev 1154) </span><span class="lines">@@ -1,171 +0,0 @@ </span><del>- -import sqlalchemy.ansisql as ansisql -import sqlalchemy.databases.postgres as postgres -import sqlalchemy.databases.oracle as oracle -import sqlalchemy.databases.sqlite as sqllite - -from sqlalchemy import * - -from testbase import PersistTest -import testbase -import unittest, re - -class EngineTest(PersistTest): - def testbasic(self): - # really trip it up with a circular reference - - use_function_defaults = testbase.db.engine.__module__.endswith('postgres') or testbase.db.engine.__module__.endswith('oracle') - - use_string_defaults = use_function_defaults or testbase.db.engine.__module__.endswith('sqlite') - - if use_function_defaults: - defval = func.current_date() - deftype = Date - else: - defval = "3" - deftype = Integer - - if use_string_defaults: - deftype2 = String - defval2 = "im a default" - else: - deftype2 = Integer - defval2 = "15" - - users = Table('engine_users', testbase.db, - Column('user_id', INT, primary_key = True), - Column('user_name', VARCHAR(20), nullable = False), - Column('test1', CHAR(5), nullable = False), - Column('test2', FLOAT(5), nullable = False), - Column('test3', TEXT), - Column('test4', DECIMAL, nullable = False), - Column('test5', TIMESTAMP), - Column('parent_user_id', Integer, ForeignKey('engine_users.user_id')), - Column('test6', DateTime, nullable = False), - Column('test7', String), - Column('test8', Binary), - Column('test_passivedefault', deftype, PassiveDefault(defval)), - Column('test_passivedefault2', Integer, PassiveDefault("5")), - Column('test_passivedefault3', deftype2, PassiveDefault(defval2)), - Column('test9', Binary(100)), - mysql_engine='InnoDB' - ) - - addresses = Table('engine_email_addresses', testbase.db, - Column('address_id', Integer, primary_key = True), - Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), - Column('email_address', String(20)), - mysql_engine='InnoDB' - ) - - -# users.c.parent_user_id.set_foreign_key(ForeignKey(users.c.user_id)) - - users.create() - addresses.create() - - # clear out table registry - users.deregister() - addresses.deregister() - - try: - users = Table('engine_users', testbase.db, autoload = True) - addresses = Table('engine_email_addresses', testbase.db, autoload = True) - finally: - addresses.drop() - users.drop() - - users.create() - addresses.create() - try: - # create a join from the two tables, this insures that - # theres a foreign key set up - # previously, we couldnt get foreign keys out of mysql. seems like - # we can now as long as we use InnoDB -# if testbase.db.engine.__module__.endswith('mysql'): - # addresses.c.remote_user_id.append_item(ForeignKey('engine_users.user_id')) - print users - print addresses - j = join(users, addresses) - print str(j.onclause) - self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause)) - finally: - addresses.drop() - users.drop() - - def testmultipk(self): - table = Table( - 'engine_multi', testbase.db, - Column('multi_id', Integer, primary_key=True), - Column('multi_rev', Integer, primary_key=True), - Column('name', String(50), nullable=False), - Column('value', String(100)) - ) - table.create() - # clear out table registry - table.deregister() - - try: - table = Table('engine_multi', testbase.db, autoload=True) - finally: - table.drop() - - print repr( - [table.c['multi_id'].primary_key, - table.c['multi_rev'].primary_key - ] - ) - table.create() - table.insert().execute({'multi_id':1,'multi_rev':1,'name':'row1', 'value':'value1'}) - table.insert().execute({'multi_id':2,'multi_rev':18,'name':'row2', 'value':'value2'}) - table.insert().execute({'multi_id':3,'multi_rev':3,'name':'row3', 'value':'value3'}) - table.select().execute().fetchall() - table.drop() - - def testtoengine(self): - db = ansisql.engine() - - table = Table('mytable', db, - Column('myid', Integer, key = 'id'), - Column('name', String, key = 'name', nullable=False), - Column('description', String, key = 'description'), - ) - - print repr(table) - - pgdb = postgres.engine({}) - - pgtable = table.toengine(pgdb) - - print repr(pgtable) - assert pgtable.c.id.nullable - assert not pgtable.c.name.nullable - assert pgtable.c.description.nullable - - def testoverride(self): - table = Table( - 'override_test', testbase.db, - Column('col1', Integer, primary_key=True), - Column('col2', String(20)), - Column('col3', Numeric) - ) - table.create() - # clear out table registry - table.deregister() - - try: - table = Table( - 'override_test', testbase.db, - Column('col2', Unicode()), - Column('col4', String(30)), autoload=True) - - print repr(table) - self.assert_(isinstance(table.c.col1.type, Integer)) - self.assert_(isinstance(table.c.col2.type, Unicode)) - self.assert_(isinstance(table.c.col4.type, String)) - finally: - table.drop() - -if __name__ == "__main__": - testbase.main() - </del></span></pre></div> <a id="sqlalchemytrunktestreflectionpyfromrev1140sqlalchemytrunktestenginespy"></a> <div class="copfile"><h4>Copied: sqlalchemy/trunk/test/reflection.py (from rev 1140, sqlalchemy/trunk/test/engines.py) (1140 => 1154)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/trunk/test/engines.py 2006-03-13 17:39:06 UTC (rev 1140) +++ sqlalchemy/trunk/test/reflection.py 2006-03-17 02:15:09 UTC (rev 1154) </span><span class="lines">@@ -0,0 +1,171 @@ </span><ins>+ +import sqlalchemy.ansisql as ansisql +import sqlalchemy.databases.postgres as postgres +import sqlalchemy.databases.oracle as oracle +import sqlalchemy.databases.sqlite as sqllite + +from sqlalchemy import * + +from testbase import PersistTest +import testbase +import unittest, re + +class ReflectionTest(PersistTest): + def testbasic(self): + # really trip it up with a circular reference + + use_function_defaults = testbase.db.engine.__module__.endswith('postgres') or testbase.db.engine.__module__.endswith('oracle') + + use_string_defaults = use_function_defaults or testbase.db.engine.__module__.endswith('sqlite') + + if use_function_defaults: + defval = func.current_date() + deftype = Date + else: + defval = "3" + deftype = Integer + + if use_string_defaults: + deftype2 = String + defval2 = "im a default" + else: + deftype2 = Integer + defval2 = "15" + + users = Table('engine_users', testbase.db, + Column('user_id', INT, primary_key = True), + Column('user_name', VARCHAR(20), nullable = False), + Column('test1', CHAR(5), nullable = False), + Column('test2', FLOAT(5), nullable = False), + Column('test3', TEXT), + Column('test4', DECIMAL, nullable = False), + Column('test5', TIMESTAMP), + Column('parent_user_id', Integer, ForeignKey('engine_users.user_id')), + Column('test6', DateTime, nullable = False), + Column('test7', String), + Column('test8', Binary), + Column('test_passivedefault', deftype, PassiveDefault(defval)), + Column('test_passivedefault2', Integer, PassiveDefault("5")), + Column('test_passivedefault3', deftype2, PassiveDefault(defval2)), + Column('test9', Binary(100)), + mysql_engine='InnoDB' + ) + + addresses = Table('engine_email_addresses', testbase.db, + Column('address_id', Integer, primary_key = True), + Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), + Column('email_address', String(20)), + mysql_engine='InnoDB' + ) + + +# users.c.parent_user_id.set_foreign_key(ForeignKey(users.c.user_id)) + + users.create() + addresses.create() + + # clear out table registry + users.deregister() + addresses.deregister() + + try: + users = Table('engine_users', testbase.db, autoload = True) + addresses = Table('engine_email_addresses', testbase.db, autoload = True) + finally: + addresses.drop() + users.drop() + + users.create() + addresses.create() + try: + # create a join from the two tables, this insures that + # theres a foreign key set up + # previously, we couldnt get foreign keys out of mysql. seems like + # we can now as long as we use InnoDB +# if testbase.db.engine.__module__.endswith('mysql'): + # addresses.c.remote_user_id.append_item(ForeignKey('engine_users.user_id')) + print users + print addresses + j = join(users, addresses) + print str(j.onclause) + self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause)) + finally: + addresses.drop() + users.drop() + + def testmultipk(self): + table = Table( + 'engine_multi', testbase.db, + Column('multi_id', Integer, primary_key=True), + Column('multi_rev', Integer, primary_key=True), + Column('name', String(50), nullable=False), + Column('value', String(100)) + ) + table.create() + # clear out table registry + table.deregister() + + try: + table = Table('engine_multi', testbase.db, autoload=True) + finally: + table.drop() + + print repr( + [table.c['multi_id'].primary_key, + table.c['multi_rev'].primary_key + ] + ) + table.create() + table.insert().execute({'multi_id':1,'multi_rev':1,'name':'row1', 'value':'value1'}) + table.insert().execute({'multi_id':2,'multi_rev':18,'name':'row2', 'value':'value2'}) + table.insert().execute({'multi_id':3,'multi_rev':3,'name':'row3', 'value':'value3'}) + table.select().execute().fetchall() + table.drop() + + def testtoengine(self): + db = ansisql.engine() + + table = Table('mytable', db, + Column('myid', Integer, key = 'id'), + Column('name', String, key = 'name', nullable=False), + Column('description', String, key = 'description'), + ) + + print repr(table) + + pgdb = postgres.engine({}) + + pgtable = table.toengine(pgdb) + + print repr(pgtable) + assert pgtable.c.id.nullable + assert not pgtable.c.name.nullable + assert pgtable.c.description.nullable + + def testoverride(self): + table = Table( + 'override_test', testbase.db, + Column('col1', Integer, primary_key=True), + Column('col2', String(20)), + Column('col3', Numeric) + ) + table.create() + # clear out table registry + table.deregister() + + try: + table = Table( + 'override_test', testbase.db, + Column('col2', Unicode()), + Column('col4', String(30)), autoload=True) + + print repr(table) + self.assert_(isinstance(table.c.col1.type, Integer)) + self.assert_(isinstance(table.c.col2.type, Unicode)) + self.assert_(isinstance(table.c.col4.type, String)) + finally: + table.drop() + +if __name__ == "__main__": + testbase.main() + </ins></span></pre></div> <a id="sqlalchemytrunktesttablespy"></a> <div class="modfile"><h4>Modified: sqlalchemy/trunk/test/tables.py (1153 => 1154)</h4> <pre class="diff"><span> <span class="info">--- sqlalchemy/trunk/test/tables.py 2006-03-17 01:01:34 UTC (rev 1153) +++ sqlalchemy/trunk/test/tables.py 2006-03-17 02:15:09 UTC (rev 1154) </span><span class="lines">@@ -14,7 +14,7 @@ </span><span class="cx"> users = Table('users', db, </span><span class="cx"> Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key = True), </span><span class="cx"> Column('user_name', String(40)), </span><del>- </del><ins>+ mysql_engine='innodb' </ins><span class="cx"> ) </span><span class="cx"> </span><span class="cx"> addresses = Table('email_addresses', db, </span></span></pre> </div> </div> </body> </html> |