Commit [r530] Maximize Restore History

Tons of changes from major refactoring/cleanup. This is all really broken

right now. In particular, all results are returned as strings.

adustman 2008-03-14

added /trunk/MySQLdb/tests
removed /trunk/MySQLdb/_mysql_exceptions.py
removed /trunk/MySQLdb/dbapi20.py
removed /trunk/MySQLdb/test_MySQLdb_capabilities.py
removed /trunk/MySQLdb/test_capabilities.py
changed /trunk/MySQLdb/MANIFEST.in
changed /trunk/MySQLdb/MySQLdb/__init__.py
changed /trunk/MySQLdb/MySQLdb/connections.py
changed /trunk/MySQLdb/MySQLdb/cursors.py
changed /trunk/MySQLdb/_mysql.c
changed /trunk/MySQLdb/_mysql.h
changed /trunk/MySQLdb/_mysql_results.c
changed /trunk/MySQLdb/metadata.cfg
changed /trunk/MySQLdb/setup.py
copied /trunk/MySQLdb/_mysql_exceptions.py -> /trunk/MySQLdb/MySQLdb/exceptions.py
copied /trunk/MySQLdb/dbapi20.py -> /trunk/MySQLdb/tests/dbapi20.py
copied /trunk/MySQLdb/test_MySQLdb_capabilities.py -> /trunk/MySQLdb/tests/test_MySQLdb_capabilities.py
copied /trunk/MySQLdb/test_MySQLdb_dbapi20.py -> /trunk/MySQLdb/tests/test_MySQLdb_dbapi20.py
copied /trunk/MySQLdb/test_MySQLdb_dbapi20.py -> /trunk/MySQLdb/tests/capabilities.py
/trunk/MySQLdb/dbapi20.py
File was removed.
/trunk/MySQLdb/MANIFEST.in Diff Switch to side-by-side view
Loading...
/trunk/MySQLdb/MySQLdb/__init__.py Diff Switch to side-by-side view
Loading...
/trunk/MySQLdb/MySQLdb/connections.py Diff Switch to side-by-side view
Loading...
/trunk/MySQLdb/MySQLdb/cursors.py Diff Switch to side-by-side view
Loading...
/trunk/MySQLdb/_mysql.c Diff Switch to side-by-side view
Loading...
/trunk/MySQLdb/_mysql.h Diff Switch to side-by-side view
Loading...
/trunk/MySQLdb/_mysql_results.c Diff Switch to side-by-side view
Loading...
/trunk/MySQLdb/metadata.cfg Diff Switch to side-by-side view
Loading...
/trunk/MySQLdb/setup.py Diff Switch to side-by-side view
Loading...
/trunk/MySQLdb/test_MySQLdb_dbapi20.py to /trunk/MySQLdb/tests/capabilities.py
--- a/trunk/MySQLdb/test_MySQLdb_dbapi20.py
+++ b/trunk/MySQLdb/tests/capabilities.py
@@ -1,205 +1,289 @@
-#!/usr/bin/env python
-import dbapi20
+#!/usr/bin/env python -O
+""" Script to test database capabilities and the DB-API interface
+    for functionality and memory leaks.
+
+    Adapted from a script by M-A Lemburg.
+    
+"""
+from time import time
+import array
 import unittest
-import MySQLdb
-
-class test_MySQLdb(dbapi20.DatabaseAPI20Test):
-    driver = MySQLdb
+
+
+class DatabaseTest(unittest.TestCase):
+
+    db_module = None
     connect_args = ()
-    connect_kw_args = dict(db='test',
-                           read_default_file='~/.my.cnf',
-                           charset='utf8',
-                           sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")
-
-    def test_setoutputsize(self): pass
-    def test_setoutputsize_basic(self): pass
-    def test_nextset(self): pass
-
-    """The tests on fetchone and fetchall and rowcount bogusly
-    test for an exception if the statement cannot return a
-    result set. MySQL always returns a result set; it's just that
-    some things return empty result sets."""
-    
-    def test_fetchall(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            # cursor.fetchall should raise an Error if called
-            # without executing a query that may return rows (such
-            # as a select)
-            self.assertRaises(self.driver.Error, cur.fetchall)
-
-            self.executeDDL1(cur)
-            for sql in self._populate():
-                cur.execute(sql)
-
-            # cursor.fetchall should raise an Error if called
-            # after executing a a statement that cannot return rows
-##             self.assertRaises(self.driver.Error,cur.fetchall)
-
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            rows = cur.fetchall()
-            self.failUnless(cur.rowcount in (-1,len(self.samples)))
-            self.assertEqual(len(rows),len(self.samples),
-                'cursor.fetchall did not retrieve all rows'
-                )
-            rows = [r[0] for r in rows]
-            rows.sort()
-            for i in range(0,len(self.samples)):
-                self.assertEqual(rows[i],self.samples[i],
-                'cursor.fetchall retrieved incorrect rows'
-                )
-            rows = cur.fetchall()
-            self.assertEqual(
-                len(rows),0,
-                'cursor.fetchall should return an empty list if called '
-                'after the whole result set has been fetched'
-                )
-            self.failUnless(cur.rowcount in (-1,len(self.samples)))
-
-            self.executeDDL2(cur)
-            cur.execute('select name from %sbarflys' % self.table_prefix)
-            rows = cur.fetchall()
-            self.failUnless(cur.rowcount in (-1,0))
-            self.assertEqual(len(rows),0,
-                'cursor.fetchall should return an empty list if '
-                'a select query returns no rows'
-                )
+    connect_kwargs = dict()
+    create_table_extra = ''
+    rows = 10
+    debug = False
+    
+    def setUp(self):
+        import gc
+        db = self.db_module.connect(*self.connect_args, **self.connect_kwargs)
+        self.connection = db
+        self.cursor = db.cursor()
+        self.BLOBText = ''.join([chr(i) for i in range(256)] * 100);
+        self.BLOBUText = u''.join([unichr(i) for i in range(16384)])
+        self.BLOBBinary = self.db_module.Binary(''.join([chr(i) for i in range(256)] * 16))
+
+    leak_test = True
+    
+    def tearDown(self):
+        if self.leak_test:
+            import gc
+            del self.cursor
+            orphans = gc.collect()
+            self.failIf(orphans, "%d orphaned objects found after deleting cursor" % orphans)
             
+            del self.connection
+            orphans = gc.collect()
+            self.failIf(orphans, "%d orphaned objects found after deleting connection" % orphans)
+            
+    def table_exists(self, name):
+        try:
+            self.cursor.execute('select * from %s where 1=0' % name)
+        except:
+            return False
+        else:
+            return True
+
+    def quote_identifier(self, ident):
+        return '"%s"' % ident
+    
+    def new_table_name(self):
+        i = id(self.cursor)
+        while True:
+            name = self.quote_identifier('tb%08x' % i)
+            if not self.table_exists(name):
+                return name
+            i = i + 1
+
+    def create_table(self, columndefs):
+
+        """ Create a table using a list of column definitions given in
+            columndefs.
+        
+            generator must be a function taking arguments (row_number,
+            col_number) returning a suitable data object for insertion
+            into the table.
+
+        """
+        self.table = self.new_table_name()
+        self.cursor.execute('CREATE TABLE %s (%s) %s' % 
+                            (self.table,
+                             ',\n'.join(columndefs),
+                             self.create_table_extra))
+
+    def check_data_integrity(self, columndefs, generator):
+        # insert
+        self.create_table(columndefs)
+        insert_statement = ('INSERT INTO %s VALUES (%s)' % 
+                            (self.table,
+                             ','.join(['%s'] * len(columndefs))))
+        data = [ [ generator(i,j) for j in range(len(columndefs)) ]
+                 for i in range(self.rows) ]
+        if self.debug:
+            print data
+        self.cursor.executemany(insert_statement, data)
+        self.connection.commit()
+        # verify
+        self.cursor.execute('select * from %s' % self.table)
+        l = self.cursor.fetchall()
+        if self.debug:
+            print l
+        self.assertEquals(len(l), self.rows)
+        try:
+            for i in range(self.rows):
+                for j in range(len(columndefs)):
+                    self.assertEquals(l[i][j], generator(i,j))
         finally:
-            con.close()
-                
-    def test_fetchone(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-
-            # cursor.fetchone should raise an Error if called before
-            # executing a select-type query
-            self.assertRaises(self.driver.Error,cur.fetchone)
-
-            # cursor.fetchone should raise an Error if called after
-            # executing a query that cannnot return rows
-            self.executeDDL1(cur)
-##             self.assertRaises(self.driver.Error,cur.fetchone)
-
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            self.assertEqual(cur.fetchone(),None,
-                'cursor.fetchone should return None if a query retrieves '
-                'no rows'
-                )
-            self.failUnless(cur.rowcount in (-1,0))
-
-            # cursor.fetchone should raise an Error if called after
-            # executing a query that cannnot return rows
-            cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
-                self.table_prefix
-                ))
-##             self.assertRaises(self.driver.Error,cur.fetchone)
-
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            r = cur.fetchone()
-            self.assertEqual(len(r),1,
-                'cursor.fetchone should have retrieved a single row'
-                )
-            self.assertEqual(r[0],'Victoria Bitter',
-                'cursor.fetchone retrieved incorrect data'
-                )
-##             self.assertEqual(cur.fetchone(),None,
-##                 'cursor.fetchone should return None if no more rows available'
-##                 )
-            self.failUnless(cur.rowcount in (-1,1))
-        finally:
-            con.close()
-
-    # Same complaint as for fetchall and fetchone
-    def test_rowcount(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self.executeDDL1(cur)
-##             self.assertEqual(cur.rowcount,-1,
-##                 'cursor.rowcount should be -1 after executing no-result '
-##                 'statements'
-##                 )
-            cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
-                self.table_prefix
-                ))
-##             self.failUnless(cur.rowcount in (-1,1),
-##                 'cursor.rowcount should == number or rows inserted, or '
-##                 'set to -1 after executing an insert statement'
-##                 )
-            cur.execute("select name from %sbooze" % self.table_prefix)
-            self.failUnless(cur.rowcount in (-1,1),
-                'cursor.rowcount should == number of rows returned, or '
-                'set to -1 after executing a select statement'
-                )
-            self.executeDDL2(cur)
-##             self.assertEqual(cur.rowcount,-1,
-##                 'cursor.rowcount not being reset to -1 after executing '
-##                 'no-result statements'
-##                 )
-        finally:
-            con.close()
-
-    def test_callproc(self):
-        pass # performed in test_MySQL_capabilities
-
-    def help_nextset_setUp(self,cur):
-        ''' Should create a procedure called deleteme
-            that returns two result sets, first the 
-	    number of rows in booze then "name from booze"
-        '''
-        sql="""
-           create procedure deleteme()
-           begin
-               select count(*) from %(tp)sbooze;
-               select name from %(tp)sbooze;
-           end
-        """ % dict(tp=self.table_prefix)
-        cur.execute(sql)
-
-    def help_nextset_tearDown(self,cur):
-        'If cleaning up is needed after nextSetTest'
-        cur.execute("drop procedure deleteme")
-
-    def test_nextset(self):
-        from warnings import warn
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            if not hasattr(cur,'nextset'):
-                return
-
-            try:
-                self.executeDDL1(cur)
-                sql=self._populate()
-                for sql in self._populate():
-                    cur.execute(sql)
-
-                self.help_nextset_setUp(cur)
-
-                cur.callproc('deleteme')
-                numberofrows=cur.fetchone()
-                assert numberofrows[0]== len(self.samples)
-                assert cur.nextset()
-                names=cur.fetchall()
-                assert len(names) == len(self.samples)
-                s=cur.nextset()
-                if s:
-                    empty = cur.fetchall()
-                    self.assertEquals(len(empty), 0,
-                                      "non-empty result set after other result sets")
-                    #warn("Incompatibility: MySQL returns an empty result set for the CALL itself",
-                    #     Warning)
-                #assert s == None,'No more return sets, should return None'
-            finally:
-                self.help_nextset_tearDown(cur)
-
-        finally:
-            con.close()
-
-    
-if __name__ == '__main__':
-    unittest.main()
-    print '''"Huh-huh, he said 'unit'." -- Butthead'''
+            if not self.debug:
+                self.cursor.execute('drop table %s' % (self.table))
+
+    def test_transactions(self):
+        columndefs = ( 'col1 INT', 'col2 VARCHAR(255)')
+        def generator(row, col):
+            if col == 0: return row
+            else: return ('%i' % (row%10))*255
+        self.create_table(columndefs)
+        insert_statement = ('INSERT INTO %s VALUES (%s)' % 
+                            (self.table,
+                             ','.join(['%s'] * len(columndefs))))
+        data = [ [ generator(i,j) for j in range(len(columndefs)) ]
+                 for i in range(self.rows) ]
+        self.cursor.executemany(insert_statement, data)
+        # verify
+        self.connection.commit()
+        self.cursor.execute('select * from %s' % self.table)
+        l = self.cursor.fetchall()
+        self.assertEquals(len(l), self.rows)
+        for i in range(self.rows):
+            for j in range(len(columndefs)):
+                self.assertEquals(l[i][j], generator(i,j))
+        delete_statement = 'delete from %s where col1=%%s' % self.table
+        self.cursor.execute(delete_statement, (0,))
+        self.cursor.execute('select col1 from %s where col1=%s' % \
+                            (self.table, 0))
+        l = self.cursor.fetchall()
+        self.failIf(l, "DELETE didn't work")
+        self.connection.rollback()
+        self.cursor.execute('select col1 from %s where col1=%s' % \
+                            (self.table, 0))
+        l = self.cursor.fetchall()
+        self.failUnless(len(l) == 1, "ROLLBACK didn't work")
+        self.cursor.execute('drop table %s' % (self.table))
+
+    def test_truncation(self):
+        columndefs = ( 'col1 INT', 'col2 VARCHAR(255)')
+        def generator(row, col):
+            if col == 0: return row
+            else: return ('%i' % (row%10))*((255-self.rows/2)+row)
+        self.create_table(columndefs)
+        insert_statement = ('INSERT INTO %s VALUES (%s)' % 
+                            (self.table,
+                             ','.join(['%s'] * len(columndefs))))
+
+        try:
+            self.cursor.execute(insert_statement, (0, '0'*256))
+        except Warning:
+            if self.debug: print self.cursor.messages
+        except self.connection.DataError:
+            pass
+        else:
+            self.fail("Over-long column did not generate warnings/exception with single insert")
+
+        self.connection.rollback()
+        
+        try:
+            for i in range(self.rows):
+                data = []
+                for j in range(len(columndefs)):
+                    data.append(generator(i,j))
+                self.cursor.execute(insert_statement,tuple(data))
+        except Warning:
+            if self.debug: print self.cursor.messages
+        except self.connection.DataError:
+            pass
+        else:
+            self.fail("Over-long columns did not generate warnings/exception with execute()")
+
+        self.connection.rollback()
+        
+        try:
+            data = [ [ generator(i,j) for j in range(len(columndefs)) ]
+                     for i in range(self.rows) ]
+            self.cursor.executemany(insert_statement, data)
+        except Warning:
+            if self.debug: print self.cursor.messages
+        except self.connection.DataError:
+            pass
+        else:
+            self.fail("Over-long columns did not generate warnings/exception with executemany()")
+
+        self.connection.rollback()
+        self.cursor.execute('drop table %s' % (self.table))
+
+    def test_CHAR(self):
+        # Character data
+        def generator(row,col):
+            return ('%i' % ((row+col) % 10)) * 255
+        self.check_data_integrity(
+            ('col1 char(255)','col2 char(255)'),
+            generator)
+
+    def test_INT(self):
+        # Number data
+        def generator(row,col):
+            return row*row
+        self.check_data_integrity(
+            ('col1 INT',),
+            generator)
+
+    def test_DECIMAL(self):
+        # DECIMAL
+        def generator(row,col):
+            from decimal import Decimal
+            return Decimal("%d.%02d" % (row, col))
+        self.check_data_integrity(
+            ('col1 DECIMAL(5,2)',),
+            generator)
+
+    def test_DATE(self):
+        ticks = time()
+        def generator(row,col):
+            return self.db_module.DateFromTicks(ticks+row*86400-col*1313)
+        self.check_data_integrity(
+                 ('col1 DATE',),
+                 generator)
+
+    def test_TIME(self):
+        ticks = time()
+        def generator(row,col):
+            return self.db_module.TimeFromTicks(ticks+row*86400-col*1313)
+        self.check_data_integrity(
+                 ('col1 TIME',),
+                 generator)
+
+    def test_DATETIME(self):
+        ticks = time()
+        def generator(row,col):
+            return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313)
+        self.check_data_integrity(
+                 ('col1 DATETIME',),
+                 generator)
+
+    def test_TIMESTAMP(self):
+        ticks = time()
+        def generator(row,col):
+            return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313)
+        self.check_data_integrity(
+                 ('col1 TIMESTAMP',),
+                 generator)
+
+    def test_fractional_TIMESTAMP(self):
+        ticks = time()
+        def generator(row,col):
+            return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313+row*0.7*col/3.0)
+        self.check_data_integrity(
+                 ('col1 TIMESTAMP',),
+                 generator)
+
+    def test_LONG(self):
+        def generator(row,col):
+            if col == 0:
+                return row
+            else:
+                return self.BLOBUText # 'BLOB Text ' * 1024
+        self.check_data_integrity(
+                 ('col1 INT','col2 LONG'),
+                 generator)
+
+    def test_TEXT(self):
+        def generator(row,col):
+            return self.BLOBUText # 'BLOB Text ' * 1024
+        self.check_data_integrity(
+                 ('col2 TEXT',),
+                 generator)
+
+    def test_LONG_BYTE(self):
+        def generator(row,col):
+            if col == 0:
+                return row
+            else:
+                return self.BLOBBinary # 'BLOB\000Binary ' * 1024
+        self.check_data_integrity(
+                 ('col1 INT','col2 LONG BYTE'),
+                 generator)
+
+    def test_BLOB(self):
+        def generator(row,col):
+            if col == 0:
+                return row
+            else:
+                return self.BLOBBinary # 'BLOB\000Binary ' * 1024
+        self.check_data_integrity(
+                 ('col1 INT','col2 BLOB'),
+                 generator)
+