From: <fwi...@us...> - 2009-08-10 17:23:32
|
Revision: 6647 http://jython.svn.sourceforge.net/jython/?rev=6647&view=rev Author: fwierzbicki Date: 2009-08-10 17:23:22 +0000 (Mon, 10 Aug 2009) Log Message: ----------- Large reindents, mainly 8 space -> 4 space indents. Modified Paths: -------------- trunk/jython/Lib/test/zxjdbc/dbextstest.py trunk/jython/Lib/test/zxjdbc/jndi.py trunk/jython/Lib/test/zxjdbc/runner.py trunk/jython/Lib/test/zxjdbc/sptest.py trunk/jython/Lib/test/zxjdbc/zxtest.py Modified: trunk/jython/Lib/test/zxjdbc/dbextstest.py =================================================================== --- trunk/jython/Lib/test/zxjdbc/dbextstest.py 2009-08-10 17:22:46 UTC (rev 6646) +++ trunk/jython/Lib/test/zxjdbc/dbextstest.py 2009-08-10 17:23:22 UTC (rev 6647) @@ -10,257 +10,257 @@ class dbextsTestCase(runner.SQLTestCase): - def setUp(self): - template = """ - [default] - name=dbexts_test + def setUp(self): + template = """ + [default] + name=dbexts_test - [jdbc] - name=dbexts_test - url=%s - user=%s - pwd=%s - driver=%s - """ + [jdbc] + name=dbexts_test + url=%s + user=%s + pwd=%s + driver=%s + """ - args = {} - for arg in self.factory.arguments: - args[arg[0]] = arg[1] + args = {} + for arg in self.factory.arguments: + args[arg[0]] = arg[1] - template = template % (args["url"], args["usr"], args["pwd"], args["driver"]) - if hasattr(self, "datahandler"): - template += "\tdatahandler=%s" % (self.datahandler.__name__) - template = os.linesep.join(template.split()) + template = template % (args["url"], args["usr"], args["pwd"], args["driver"]) + if hasattr(self, "datahandler"): + template += "\tdatahandler=%s" % (self.datahandler.__name__) + template = os.linesep.join(template.split()) - try: - fp = open(tempfile.mktemp(), "w") - fp.write(template) - fp.close() - self.db = dbexts.dbexts(cfg=fp.name) - self.db.verbose = 0 - for table in ("one", "two"): - try: self.db.raw("drop table %s" % (table)) - except: pass - self.db.raw("create table one (a int, b int, c varchar(32))") - self.db.raw("create table two (a int, b int, c varchar(32))") - finally: - try: - os.remove(fp.name) - except: - pass + try: + fp = open(tempfile.mktemp(), "w") + fp.write(template) + fp.close() + self.db = dbexts.dbexts(cfg=fp.name) + self.db.verbose = 0 + for table in ("one", "two"): + try: self.db.raw("drop table %s" % (table)) + except: pass + self.db.raw("create table one (a int, b int, c varchar(32))") + self.db.raw("create table two (a int, b int, c varchar(32))") + finally: + try: + os.remove(fp.name) + except: + pass - def tearDown(self): - self.db.raw("drop table one") - self.db.raw("drop table two") - self.db.close() + def tearDown(self): + self.db.raw("drop table one") + self.db.raw("drop table two") + self.db.close() - def testChoose(self): - """testing choose()""" - r = dbexts.choose(1, 4, 5) - assert r == 4, "choose failed, expected 4, got %d" % r + def testChoose(self): + """testing choose()""" + r = dbexts.choose(1, 4, 5) + assert r == 4, "choose failed, expected 4, got %d" % r - def _insertInto(self, table, num): - for i in range(0, num): - self.db.raw("insert into %s (a, b, c) values (?, ?, ?)" % (table), [(i, random()*100+i, "%s" % (random()*100+i))]) + def _insertInto(self, table, num): + for i in range(0, num): + self.db.raw("insert into %s (a, b, c) values (?, ?, ?)" % (table), [(i, random()*100+i, "%s" % (random()*100+i))]) - def testSqlFailure(self): - """testing isql with sql exception""" - try: - self.db.isql("select * from __garbage__") - self.fail("expected SQL exception") - except: - pass + def testSqlFailure(self): + """testing isql with sql exception""" + try: + self.db.isql("select * from __garbage__") + self.fail("expected SQL exception") + except: + pass - def testSqlFailureWithBeginCommit(self): - """testing failure with begin/commit""" - failed = 0 - c = self.db.begin() - try: - try: - c.execute("select * from __garbage__") - except: - failed = 1 - finally: - self.db.commit(c) + def testSqlFailureWithBeginCommit(self): + """testing failure with begin/commit""" + failed = 0 + c = self.db.begin() + try: + try: + c.execute("select * from __garbage__") + except: + failed = 1 + finally: + self.db.commit(c) - if not failed: - self.fail("expected SQL exception") + if not failed: + self.fail("expected SQL exception") - def testSqlWithBeginCommit(self): - """testing begin/commit""" - self._insertInto("two", 30) - c = self.db.begin() - c.execute("select * from two") - f = c.fetchall() - c.close() - self.db.commit() - assert len(f) == 30, "expected [30], got [%d]" % (len(f)) + def testSqlWithBeginCommit(self): + """testing begin/commit""" + self._insertInto("two", 30) + c = self.db.begin() + c.execute("select * from two") + f = c.fetchall() + c.close() + self.db.commit() + assert len(f) == 30, "expected [30], got [%d]" % (len(f)) - def testQueryMultipleReturnSets(self): - """testing multiple return sets""" - self._insertInto("two", 30) - h, r = self.db.raw("select * from two where a = ?", [(0,), (3,)]) - assert len(r) == 2, "expected [2], got [%d]" % (len(r)) + def testQueryMultipleReturnSets(self): + """testing multiple return sets""" + self._insertInto("two", 30) + h, r = self.db.raw("select * from two where a = ?", [(0,), (3,)]) + assert len(r) == 2, "expected [2], got [%d]" % (len(r)) - def testUpdateCount(self): - """testing update count""" + def testUpdateCount(self): + """testing update count""" - self._insertInto("one", 45) - self.db.raw("delete from one where a > ?", [(12,)]) - self.assertEquals(32, self.db.updatecount) + self._insertInto("one", 45) + self.db.raw("delete from one where a > ?", [(12,)]) + self.assertEquals(32, self.db.updatecount) - def testQueryWithMaxRows(self): - """testing query with max rows""" + def testQueryWithMaxRows(self): + """testing query with max rows""" - self._insertInto("one", 45) - self.db.raw("select * from one", maxrows=3) - self.assertEquals(3, len(self.db.results)) - self.db.raw("select * from one where a > ?", [(12,)], maxrows=3) - self.assertEquals(3, len(self.db.results)) + self._insertInto("one", 45) + self.db.raw("select * from one", maxrows=3) + self.assertEquals(3, len(self.db.results)) + self.db.raw("select * from one where a > ?", [(12,)], maxrows=3) + self.assertEquals(3, len(self.db.results)) - def testBulkcopy(self): - """testing bcp""" + def testBulkcopy(self): + """testing bcp""" - self._insertInto("two", 3) + self._insertInto("two", 3) - bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) - assert len(bcp.columns) == 1, "one column should be specified, [%d] found" % (len(bcp.columns)) + bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) + assert len(bcp.columns) == 1, "one column should be specified, [%d] found" % (len(bcp.columns)) - bcp = self.db.bulkcopy("dbexts_test", "two", include=['a', 'b', 'c'], exclude=['a']) - assert len(bcp.columns) == 2, "expected two columns, found [%d]" % (len(bcp.columns)) - a = filter(lambda x, c=bcp.columns: x in c, ['b', 'c']) - assert a, "expecting ['b', 'c'], found %s" % (str(a)) + bcp = self.db.bulkcopy("dbexts_test", "two", include=['a', 'b', 'c'], exclude=['a']) + assert len(bcp.columns) == 2, "expected two columns, found [%d]" % (len(bcp.columns)) + a = filter(lambda x, c=bcp.columns: x in c, ['b', 'c']) + assert a, "expecting ['b', 'c'], found %s" % (str(a)) - class _executor: - def __init__(self, table, cols): - self.cols = cols - if cols: - self.sql = "insert into %s (%s) values (%s)" % (table, ",".join(self.cols), ",".join(("?",) * len(self.cols))) - else: - self.sql = "insert into %s values (%%s)" % (table) - def execute(self, db, rows, bindings): - assert len(rows) > 0, "must have at least one row" - if self.cols: - sql = self.sql - else: - sql = self.sql % (",".join(("?",) * len(rows[0]))) + class _executor: + def __init__(self, table, cols): + self.cols = cols + if cols: + self.sql = "insert into %s (%s) values (%s)" % (table, ",".join(self.cols), ",".join(("?",) * len(self.cols))) + else: + self.sql = "insert into %s values (%%s)" % (table) + def execute(self, db, rows, bindings): + assert len(rows) > 0, "must have at least one row" + if self.cols: + sql = self.sql + else: + sql = self.sql % (",".join(("?",) * len(rows[0]))) - bcp = self.db.bulkcopy("dbexts_test", "two", include=['a'], executor=_executor) - done = bcp.transfer(self.db) - assert done == 3, "expecting three rows to be handled but not inserted, found [%d]" % (done) + bcp = self.db.bulkcopy("dbexts_test", "two", include=['a'], executor=_executor) + done = bcp.transfer(self.db) + assert done == 3, "expecting three rows to be handled but not inserted, found [%d]" % (done) - bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) - done = bcp.transfer(self.db) - assert done == 3, "expecting three rows to be inserted, found [%d]" % (done) + bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) + done = bcp.transfer(self.db) + assert done == 3, "expecting three rows to be inserted, found [%d]" % (done) - bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) - bcp.rowxfer([200]) - bcp.rowxfer([201]) - bcp.rowxfer([202]) - bcp.rowxfer([203]) - done = bcp.batch() - assert done == 4, "expecting four rows to be inserted, found [%d]" % (done) - bcp.rowxfer([300]) - bcp.rowxfer([401]) - bcp.rowxfer([502]) - bcp.rowxfer([603]) - done = bcp.batch() - assert done == 4, "expecting four rows to be inserted, found [%d]" % (done) - bcp.rowxfer([205]) - bcp.rowxfer([210]) - done = bcp.done() - assert done == 2, "expecting two rows to be inserted, found [%d]" % (done) - assert bcp.total == 10, "expecting 10 rows to be inserted, found [%d]" % (bcp.total) + bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) + bcp.rowxfer([200]) + bcp.rowxfer([201]) + bcp.rowxfer([202]) + bcp.rowxfer([203]) + done = bcp.batch() + assert done == 4, "expecting four rows to be inserted, found [%d]" % (done) + bcp.rowxfer([300]) + bcp.rowxfer([401]) + bcp.rowxfer([502]) + bcp.rowxfer([603]) + done = bcp.batch() + assert done == 4, "expecting four rows to be inserted, found [%d]" % (done) + bcp.rowxfer([205]) + bcp.rowxfer([210]) + done = bcp.done() + assert done == 2, "expecting two rows to be inserted, found [%d]" % (done) + assert bcp.total == 10, "expecting 10 rows to be inserted, found [%d]" % (bcp.total) - bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) - done = bcp.transfer(self.db) - assert done == 16, "expecting sixteen rows to be inserted, found [%d]" % (done) + bcp = self.db.bulkcopy("dbexts_test", "two", include=['a']) + done = bcp.transfer(self.db) + assert done == 16, "expecting sixteen rows to be inserted, found [%d]" % (done) - def testTable(self): - """testing dbexts.table(tabname)""" - self.db.table("one") - assert self.db.results is not None, "results were None" - self.assertEquals(3, len(self.db.results)) - self.db.table() - found = 0 - for a in self.db.results: - if a[2].lower() in ("one", "two"): found += 1 - self.assertEquals(2, found) + def testTable(self): + """testing dbexts.table(tabname)""" + self.db.table("one") + assert self.db.results is not None, "results were None" + self.assertEquals(3, len(self.db.results)) + self.db.table() + found = 0 + for a in self.db.results: + if a[2].lower() in ("one", "two"): found += 1 + self.assertEquals(2, found) - def testOut(self): - """testing dbexts.out""" - self.db.verbose = 1 - fp = open(tempfile.mktemp(), "w") - try: - self.db.out = fp - self.db.raw("insert into one (a, b) values (?, ?)", [(1, 2), (3, 4)]) - self.db.isql("select * from one") - self.db.verbose = 0 - fp.close() - fp = open(fp.name, "r") - data = fp.read() - assert len(data), "expected file to contain output" - finally: - fp.close() - os.remove(fp.name) + def testOut(self): + """testing dbexts.out""" + self.db.verbose = 1 + fp = open(tempfile.mktemp(), "w") + try: + self.db.out = fp + self.db.raw("insert into one (a, b) values (?, ?)", [(1, 2), (3, 4)]) + self.db.isql("select * from one") + self.db.verbose = 0 + fp.close() + fp = open(fp.name, "r") + data = fp.read() + assert len(data), "expected file to contain output" + finally: + fp.close() + os.remove(fp.name) - def testResultSetWrapper(self): - """testing result set wrapper""" - from dbexts import ResultSet - self._insertInto("two", 30) - h, r = self.db.raw("select * from two where a in (?, ?, ?, ?) order by a", [(12,15,17,8)]) - assert len(r) == 4, "expected [4], got [%d]" % (len(r)) - rs = ResultSet(map(lambda x: x[0], h), r) - assert len(rs[0]) == 3, "expected [3], got [%d]" % (len(rs[0])) - assert rs[0]['a'] == 8, "expected [8], got [%s]" % (rs[0]['a']) - assert rs[0]['A'] == 8, "expected [8], got [%s]" % (rs[0]['A']) - assert len(rs[0]['b':]) == 2, "expected [2], got [%s]" % (len(rs[0]['b':])) - assert len(rs[0]['a':'b']) == 1, "expected [1], got [%s]" % (len(rs[0]['a':'b'])) + def testResultSetWrapper(self): + """testing result set wrapper""" + from dbexts import ResultSet + self._insertInto("two", 30) + h, r = self.db.raw("select * from two where a in (?, ?, ?, ?) order by a", [(12,15,17,8)]) + assert len(r) == 4, "expected [4], got [%d]" % (len(r)) + rs = ResultSet(map(lambda x: x[0], h), r) + assert len(rs[0]) == 3, "expected [3], got [%d]" % (len(rs[0])) + assert rs[0]['a'] == 8, "expected [8], got [%s]" % (rs[0]['a']) + assert rs[0]['A'] == 8, "expected [8], got [%s]" % (rs[0]['A']) + assert len(rs[0]['b':]) == 2, "expected [2], got [%s]" % (len(rs[0]['b':])) + assert len(rs[0]['a':'b']) == 1, "expected [1], got [%s]" % (len(rs[0]['a':'b'])) - def testMultipleResultSetConcatentation(self): - """testing multiple result sets with some resulting in None""" - self._insertInto("two", 30) - # first is non None - h, r = self.db.raw("select * from two where a = ?", [(12,),(8001,),(15,),(17,),(8,),(9001,)]) - assert len(r) == 4, "expected [4], got [%d]" % (len(r)) - # first is None - h, r = self.db.raw("select * from two where a = ?", [(1200,),(8001,),(15,),(17,),(8,),(9001,)]) - assert len(r) == 3, "expected [3], got [%d]" % (len(r)) + def testMultipleResultSetConcatentation(self): + """testing multiple result sets with some resulting in None""" + self._insertInto("two", 30) + # first is non None + h, r = self.db.raw("select * from two where a = ?", [(12,),(8001,),(15,),(17,),(8,),(9001,)]) + assert len(r) == 4, "expected [4], got [%d]" % (len(r)) + # first is None + h, r = self.db.raw("select * from two where a = ?", [(1200,),(8001,),(15,),(17,),(8,),(9001,)]) + assert len(r) == 3, "expected [3], got [%d]" % (len(r)) - def testBulkcopyWithDynamicColumns(self): - """testing bcp with dynamic column names""" + def testBulkcopyWithDynamicColumns(self): + """testing bcp with dynamic column names""" - self.testBulkcopy() + self.testBulkcopy() - bcp = self.db.bulkcopy("dbexts_test", "two", exclude=['a']) - assert len(bcp.columns) == 2, "expected two columns, found [%d]" % (len(bcp.columns)) - a = filter(lambda x, c=bcp.columns: x in c, ['b', 'c']) - assert a == ['b', 'c'], "expecting ['b', 'c'], found %s" % (str(a)) + bcp = self.db.bulkcopy("dbexts_test", "two", exclude=['a']) + assert len(bcp.columns) == 2, "expected two columns, found [%d]" % (len(bcp.columns)) + a = filter(lambda x, c=bcp.columns: x in c, ['b', 'c']) + assert a == ['b', 'c'], "expecting ['b', 'c'], found %s" % (str(a)) - bcp = self.db.bulkcopy("dbexts_test", "two") - done = bcp.transfer(self.db) - assert done == 32, "expecting thirty two rows to be inserted, found [%d]" % (done) + bcp = self.db.bulkcopy("dbexts_test", "two") + done = bcp.transfer(self.db) + assert done == 32, "expecting thirty two rows to be inserted, found [%d]" % (done) - def testAutocommit(self): - """testing the autocommit functionality""" - for u in (0, 1): - self.db.autocommit = u - try: - self.db.isql("select * from one") - except Exception, e: - fail("failed autocommit query with u=[%d], v=[%d]" % (u, v)) - for v in (0, 1): - self.db.db.autocommit = v - try: - self.db.isql("select * from one") - except Exception, e: - self.fail("failed autocommit query with u=[%d], v=[%d]" % (u, v)) + def testAutocommit(self): + """testing the autocommit functionality""" + for u in (0, 1): + self.db.autocommit = u + try: + self.db.isql("select * from one") + except Exception, e: + fail("failed autocommit query with u=[%d], v=[%d]" % (u, v)) + for v in (0, 1): + self.db.db.autocommit = v + try: + self.db.isql("select * from one") + except Exception, e: + self.fail("failed autocommit query with u=[%d], v=[%d]" % (u, v)) - def testPrepare(self): - """testing the handling of a prepared statement""" - self._insertInto("one", 10) - p = self.db.prepare("select * from one") - self.db.isql(p) - self.db.isql(p) - p.close() - assert p.closed + def testPrepare(self): + """testing the handling of a prepared statement""" + self._insertInto("one", 10) + p = self.db.prepare("select * from one") + self.db.isql(p) + self.db.isql(p) + p.close() + assert p.closed Modified: trunk/jython/Lib/test/zxjdbc/jndi.py =================================================================== --- trunk/jython/Lib/test/zxjdbc/jndi.py 2009-08-10 17:22:46 UTC (rev 6646) +++ trunk/jython/Lib/test/zxjdbc/jndi.py 2009-08-10 17:23:22 UTC (rev 6647) @@ -22,12 +22,12 @@ ctx = InitialContext(env) try: - try: - ctx.bind("/jdbc/mysqldb", ds) - except NameAlreadyBoundException, e: - ctx.unbind("/jdbc/mysqldb") - ctx.bind("/jdbc/mysqldb", ds) + try: + ctx.bind("/jdbc/mysqldb", ds) + except NameAlreadyBoundException, e: + ctx.unbind("/jdbc/mysqldb") + ctx.bind("/jdbc/mysqldb", ds) finally: - ctx.close() + ctx.close() print "bound [%s] at /jdbc/mysqldb" % (ds) Modified: trunk/jython/Lib/test/zxjdbc/runner.py =================================================================== --- trunk/jython/Lib/test/zxjdbc/runner.py 2009-08-10 17:22:46 UTC (rev 6646) +++ trunk/jython/Lib/test/zxjdbc/runner.py 2009-08-10 17:23:22 UTC (rev 6647) @@ -18,187 +18,187 @@ import xmllib, __builtin__, re def __imp__(module, attr=None): - if attr: - j = __import__(module, globals(), locals()) - return getattr(j, attr) - else: - last = module.split(".")[-1] - return __import__(module, globals(), locals(), last) + if attr: + j = __import__(module, globals(), locals()) + return getattr(j, attr) + else: + last = module.split(".")[-1] + return __import__(module, globals(), locals(), last) class Factory: - def __init__(self, classname, method): - self.classname = classname - self.method = method - self.arguments = [] - self.keywords = {} + def __init__(self, classname, method): + self.classname = classname + self.method = method + self.arguments = [] + self.keywords = {} class Testcase: - def __init__(self, frm, impt): - self.frm = frm - self.impt = impt - self.ignore = [] + def __init__(self, frm, impt): + self.frm = frm + self.impt = impt + self.ignore = [] class Test: - def __init__(self, name, os): - self.name = name - self.os = os - self.factory = None - self.tests = [] + def __init__(self, name, os): + self.name = name + self.os = os + self.factory = None + self.tests = [] class Vendor: - def __init__(self, name, datahandler=None): - self.name = name - self.scroll = None - self.datahandler = datahandler - self.tests = [] - self.tables = {} + def __init__(self, name, datahandler=None): + self.name = name + self.scroll = None + self.datahandler = datahandler + self.tests = [] + self.tables = {} class ConfigParser(xmllib.XMLParser): - """ - A simple XML parser for the config file. - """ - def __init__(self, **kw): - apply(xmllib.XMLParser.__init__, (self,), kw) - self.vendors = [] - self.table_stack = [] - self.re_var = re.compile(r"\${(.*?)}") + """ + A simple XML parser for the config file. + """ + def __init__(self, **kw): + apply(xmllib.XMLParser.__init__, (self,), kw) + self.vendors = [] + self.table_stack = [] + self.re_var = re.compile(r"\${(.*?)}") - def vendor(self): - assert len(self.vendors) > 0, "no vendors" - return self.vendors[-1] + def vendor(self): + assert len(self.vendors) > 0, "no vendors" + return self.vendors[-1] - def test(self): - v = self.vendor() - assert len(v.tests) > 0, "no tests" - return v.tests[-1] + def test(self): + v = self.vendor() + assert len(v.tests) > 0, "no tests" + return v.tests[-1] - def factory(self): - c = self.test() - assert c.factory, "no factory" - return c.factory + def factory(self): + c = self.test() + assert c.factory, "no factory" + return c.factory - def testcase(self): - s = self.test() - assert len(s.tests) > 0, "no testcases" - return s.tests[-1] + def testcase(self): + s = self.test() + assert len(s.tests) > 0, "no testcases" + return s.tests[-1] - def value(self, value): - def repl(sub): - from java.lang import System - return System.getProperty(sub.group(1), sub.group(1)) - value = self.re_var.sub(repl, value) - return value + def value(self, value): + def repl(sub): + from java.lang import System + return System.getProperty(sub.group(1), sub.group(1)) + value = self.re_var.sub(repl, value) + return value - def start_vendor(self, attrs): - if attrs.has_key('datahandler'): - v = Vendor(attrs['name'], attrs['datahandler']) - else: - v = Vendor(attrs['name']) - if attrs.has_key('scroll'): - v.scroll = attrs['scroll'] - self.vendors.append(v) + def start_vendor(self, attrs): + if attrs.has_key('datahandler'): + v = Vendor(attrs['name'], attrs['datahandler']) + else: + v = Vendor(attrs['name']) + if attrs.has_key('scroll'): + v.scroll = attrs['scroll'] + self.vendors.append(v) - def start_test(self, attrs): - v = self.vendor() - c = Test(attrs['name'], attrs['os']) - v.tests.append(c) + def start_test(self, attrs): + v = self.vendor() + c = Test(attrs['name'], attrs['os']) + v.tests.append(c) - def start_factory(self, attrs): - c = self.test() - f = Factory(attrs['class'], attrs['method']) - c.factory = f + def start_factory(self, attrs): + c = self.test() + f = Factory(attrs['class'], attrs['method']) + c.factory = f - def start_argument(self, attrs): - f = self.factory() - if attrs.has_key('type'): - f.arguments.append((attrs['name'], getattr(__builtin__, attrs['type'])(self.value(attrs['value'])))) - else: - f.arguments.append((attrs['name'], self.value(attrs['value']))) + def start_argument(self, attrs): + f = self.factory() + if attrs.has_key('type'): + f.arguments.append((attrs['name'], getattr(__builtin__, attrs['type'])(self.value(attrs['value'])))) + else: + f.arguments.append((attrs['name'], self.value(attrs['value']))) - def start_keyword(self, attrs): - f = self.factory() - if attrs.has_key('type'): - f.keywords[attrs['name']] = getattr(__builtin__, attrs['type'])(self.value(attrs['value'])) - else: - f.keywords[attrs['name']] = self.value(attrs['value']) + def start_keyword(self, attrs): + f = self.factory() + if attrs.has_key('type'): + f.keywords[attrs['name']] = getattr(__builtin__, attrs['type'])(self.value(attrs['value'])) + else: + f.keywords[attrs['name']] = self.value(attrs['value']) - def start_ignore(self, attrs): - t = self.testcase() - t.ignore.append(attrs['name']) + def start_ignore(self, attrs): + t = self.testcase() + t.ignore.append(attrs['name']) - def start_testcase(self, attrs): - c = self.test() - c.tests.append(Testcase(attrs['from'], attrs['import'])) + def start_testcase(self, attrs): + c = self.test() + c.tests.append(Testcase(attrs['from'], attrs['import'])) - def start_table(self, attrs): - self.table_stack.append((attrs['ref'], attrs['name'])) + def start_table(self, attrs): + self.table_stack.append((attrs['ref'], attrs['name'])) - def end_table(self): - del self.table_stack[-1] + def end_table(self): + del self.table_stack[-1] - def handle_data(self, data): - if len(self.table_stack): - ref, tabname = self.table_stack[-1] - self.vendor().tables[ref] = (tabname, data.strip()) + def handle_data(self, data): + if len(self.table_stack): + ref, tabname = self.table_stack[-1] + self.vendor().tables[ref] = (tabname, data.strip()) class SQLTestCase(unittest.TestCase): - """ - Base testing class. It contains the list of table and factory information - to run any tests. - """ - def __init__(self, name, vendor, factory): - unittest.TestCase.__init__(self, name) - self.vendor = vendor - self.factory = factory - if self.vendor.datahandler: - self.datahandler = __imp__(self.vendor.datahandler) + """ + Base testing class. It contains the list of table and factory information + to run any tests. + """ + def __init__(self, name, vendor, factory): + unittest.TestCase.__init__(self, name) + self.vendor = vendor + self.factory = factory + if self.vendor.datahandler: + self.datahandler = __imp__(self.vendor.datahandler) - def table(self, name): - return self.vendor.tables[name] + def table(self, name): + return self.vendor.tables[name] - def has_table(self, name): - return self.vendor.tables.has_key(name) + def has_table(self, name): + return self.vendor.tables.has_key(name) def make_suite(vendor, testcase, factory, mask=None): - clz = __imp__(testcase.frm, testcase.impt) - caseNames = filter(lambda x, i=testcase.ignore: x not in i, unittest.getTestCaseNames(clz, "test")) - if mask is not None: - caseNames = filter(lambda x, mask=mask: x == mask, caseNames) - tests = [clz(caseName, vendor, factory) for caseName in caseNames] - return unittest.TestSuite(tests) + clz = __imp__(testcase.frm, testcase.impt) + caseNames = filter(lambda x, i=testcase.ignore: x not in i, unittest.getTestCaseNames(clz, "test")) + if mask is not None: + caseNames = filter(lambda x, mask=mask: x == mask, caseNames) + tests = [clz(caseName, vendor, factory) for caseName in caseNames] + return unittest.TestSuite(tests) def test(vendors, include=None, mask=None): - for vendor in vendors: - if not include or vendor.name in include: - print - print "testing [%s]" % (vendor.name) - for test in vendor.tests: - if not test.os or test.os == os.name: - for testcase in test.tests: - suite = make_suite(vendor, testcase, test.factory, mask) - unittest.TextTestRunner().run(suite) - else: - print - print "skipping [%s]" % (vendor.name) + for vendor in vendors: + if not include or vendor.name in include: + print + print "testing [%s]" % (vendor.name) + for test in vendor.tests: + if not test.os or test.os == os.name: + for testcase in test.tests: + suite = make_suite(vendor, testcase, test.factory, mask) + unittest.TextTestRunner().run(suite) + else: + print + print "skipping [%s]" % (vendor.name) if __name__ == '__main__': - import sys, getopt + import sys, getopt - try: - opts, args = getopt.getopt(sys.argv[1:], "t:", []) - except getopt.error, msg: - print "%s -t [testmask] <vendor>[,<vendor>]" - sys.exit(0) + try: + opts, args = getopt.getopt(sys.argv[1:], "t:", []) + except getopt.error, msg: + print "%s -t [testmask] <vendor>[,<vendor>]" + sys.exit(0) - mask = None - for a in opts: - opt, arg = a - if opt == '-t': - mask = arg + mask = None + for a in opts: + opt, arg = a + if opt == '-t': + mask = arg - configParser = ConfigParser() - fp = open(args[0], "r") - configParser.feed(fp.read()) - fp.close() - test(configParser.vendors, args[1:], mask=mask) - sys.exit(0) + configParser = ConfigParser() + fp = open(args[0], "r") + configParser.feed(fp.read()) + fp.close() + test(configParser.vendors, args[1:], mask=mask) + sys.exit(0) Modified: trunk/jython/Lib/test/zxjdbc/sptest.py =================================================================== --- trunk/jython/Lib/test/zxjdbc/sptest.py 2009-08-10 17:22:46 UTC (rev 6646) +++ trunk/jython/Lib/test/zxjdbc/sptest.py 2009-08-10 17:23:22 UTC (rev 6647) @@ -9,211 +9,211 @@ class OracleSPTest(zxCoreTestCase): - def setUp(self): - zxCoreTestCase.setUp(self) + def setUp(self): + zxCoreTestCase.setUp(self) - c = self.cursor() + c = self.cursor() - try: - try: - c.execute("drop table sptest") - except: - self.db.rollback() - try: - c.execute("create table sptest (x varchar2(20))") - c.execute("create or replace procedure procnone is begin insert into sptest values ('testing'); end;") - c.execute("create or replace procedure procin (y in varchar2) is begin insert into sptest values (y); end;") - c.execute("create or replace procedure procout (y out varchar2) is begin y := 'tested'; end;") - c.execute("create or replace procedure procinout (y out varchar2, z in varchar2) is begin insert into sptest values (z); y := 'tested'; end;") - c.execute("create or replace function funcnone return varchar2 is begin return 'tested'; end;") - c.execute("create or replace function funcin (y varchar2) return varchar2 is begin return y || y; end;") - c.execute("create or replace function funcout (y out varchar2) return varchar2 is begin y := 'tested'; return 'returned'; end;") - self.db.commit() - except: - self.db.rollback() - self.fail("procedure creation failed") + try: + try: + c.execute("drop table sptest") + except: + self.db.rollback() + try: + c.execute("create table sptest (x varchar2(20))") + c.execute("create or replace procedure procnone is begin insert into sptest values ('testing'); end;") + c.execute("create or replace procedure procin (y in varchar2) is begin insert into sptest values (y); end;") + c.execute("create or replace procedure procout (y out varchar2) is begin y := 'tested'; end;") + c.execute("create or replace procedure procinout (y out varchar2, z in varchar2) is begin insert into sptest values (z); y := 'tested'; end;") + c.execute("create or replace function funcnone return varchar2 is begin return 'tested'; end;") + c.execute("create or replace function funcin (y varchar2) return varchar2 is begin return y || y; end;") + c.execute("create or replace function funcout (y out varchar2) return varchar2 is begin y := 'tested'; return 'returned'; end;") + self.db.commit() + except: + self.db.rollback() + self.fail("procedure creation failed") - self.proc_errors("PROC") - self.proc_errors("FUNC") + self.proc_errors("PROC") + self.proc_errors("FUNC") - finally: - c.close() + finally: + c.close() - def tearDown(self): - zxCoreTestCase.tearDown(self) + def tearDown(self): + zxCoreTestCase.tearDown(self) - def proc_errors(self, name): - c = self.cursor() - try: - c.execute("select * from user_errors where name like '%s%%'" % (name.upper())) - errors = c.fetchall() - try: - assert not errors, "found errors" - except AssertionError, e: - print "printing errors:" - for a in errors: - print a - raise e - finally: - c.close() + def proc_errors(self, name): + c = self.cursor() + try: + c.execute("select * from user_errors where name like '%s%%'" % (name.upper())) + errors = c.fetchall() + try: + assert not errors, "found errors" + except AssertionError, e: + print "printing errors:" + for a in errors: + print a + raise e + finally: + c.close() - def testCursor(self): - c = self.cursor() - try: + def testCursor(self): + c = self.cursor() + try: - c.execute("insert into sptest values ('a')") - c.execute("insert into sptest values ('b')") - c.execute("insert into sptest values ('c')") - c.execute("insert into sptest values ('d')") - c.execute("insert into sptest values ('e')") + c.execute("insert into sptest values ('a')") + c.execute("insert into sptest values ('b')") + c.execute("insert into sptest values ('c')") + c.execute("insert into sptest values ('d')") + c.execute("insert into sptest values ('e')") - c.execute(""" - CREATE OR REPLACE PACKAGE types - AS - TYPE ref_cursor IS REF CURSOR; - END; - """) + c.execute(""" + CREATE OR REPLACE PACKAGE types + AS + TYPE ref_cursor IS REF CURSOR; + END; + """) - c.execute(""" - CREATE OR REPLACE FUNCTION funccur(v_x IN VARCHAR) - RETURN types.ref_cursor - AS - funccur_cursor types.ref_cursor; - BEGIN - OPEN funccur_cursor FOR - SELECT x FROM sptest WHERE x < v_x; - RETURN funccur_cursor; - END; - """) + c.execute(""" + CREATE OR REPLACE FUNCTION funccur(v_x IN VARCHAR) + RETURN types.ref_cursor + AS + funccur_cursor types.ref_cursor; + BEGIN + OPEN funccur_cursor FOR + SELECT x FROM sptest WHERE x < v_x; + RETURN funccur_cursor; + END; + """) - self.proc_errors("funccur") + self.proc_errors("funccur") - c.callproc("funccur", ("z",)) - data = c.fetchall() - self.assertEquals(5, len(data)) - c.callproc("funccur", ("c",)) - data = c.fetchall() - self.assertEquals(2, len(data)) + c.callproc("funccur", ("z",)) + data = c.fetchall() + self.assertEquals(5, len(data)) + c.callproc("funccur", ("c",)) + data = c.fetchall() + self.assertEquals(2, len(data)) - finally: - c.close() + finally: + c.close() - def testProcin(self): - c = self.cursor() - try: - params = ["testProcin"] - c.callproc("procin", params) - self.assertEquals([], c.fetchall()) - c.execute("select * from sptest") - self.assertEquals(1, len(c.fetchall())) - finally: - c.close() + def testProcin(self): + c = self.cursor() + try: + params = ["testProcin"] + c.callproc("procin", params) + self.assertEquals([], c.fetchall()) + c.execute("select * from sptest") + self.assertEquals(1, len(c.fetchall())) + finally: + c.close() - def testProcinout(self): - c = self.cursor() - try: - params = [None, "testing"] - c.callproc("procinout", params) - data = c.fetchone() - assert data is None, "data was not None" - c.execute("select * from sptest") - data = c.fetchone() - self.assertEquals("testing", data[0]) - self.assertEquals("tested", params[0]) - finally: - c.close() + def testProcinout(self): + c = self.cursor() + try: + params = [None, "testing"] + c.callproc("procinout", params) + data = c.fetchone() + assert data is None, "data was not None" + c.execute("select * from sptest") + data = c.fetchone() + self.assertEquals("testing", data[0]) + self.assertEquals("tested", params[0]) + finally: + c.close() - def testFuncnone(self): - c = self.cursor() - try: - c.callproc("funcnone") - data = c.fetchone() - assert data is not None, "data was None" - self.assertEquals(1, len(data)) - self.assertEquals("tested", data[0]) - finally: - c.close() + def testFuncnone(self): + c = self.cursor() + try: + c.callproc("funcnone") + data = c.fetchone() + assert data is not None, "data was None" + self.assertEquals(1, len(data)) + self.assertEquals("tested", data[0]) + finally: + c.close() - def testFuncin(self): - c = self.cursor() - try: - params = ["testing"] - c.callproc("funcin", params) - self.assertEquals(1, c.rowcount) - data = c.fetchone() - assert data is not None, "data was None" - self.assertEquals(1, len(data)) - self.assertEquals("testingtesting", data[0]) - finally: - c.close() + def testFuncin(self): + c = self.cursor() + try: + params = ["testing"] + c.callproc("funcin", params) + self.assertEquals(1, c.rowcount) + data = c.fetchone() + assert data is not None, "data was None" + self.assertEquals(1, len(data)) + self.assertEquals("testingtesting", data[0]) + finally: + c.close() - def testCallingWithKws(self): - c = self.cursor() - try: - params = ["testing"] - c.callproc("funcin", params=params) - self.assertEquals(1, c.rowcount) - data = c.fetchone() - assert data is not None, "data was None" - self.assertEquals(1, len(data)) - self.assertEquals("testingtesting", data[0]) - finally: - c.close() + def testCallingWithKws(self): + c = self.cursor() + try: + params = ["testing"] + c.callproc("funcin", params=params) + self.assertEquals(1, c.rowcount) + data = c.fetchone() + assert data is not None, "data was None" + self.assertEquals(1, len(data)) + self.assertEquals("testingtesting", data[0]) + finally: + c.close() - def testFuncout(self): - c = self.cursor() - try: - params = [None] - c.callproc("funcout", params) - data = c.fetchone() - assert data is not None, "data was None" - self.assertEquals(1, len(data)) - self.assertEquals("returned", data[0]) - self.assertEquals("tested", params[0].strip()) - finally: - c.close() + def testFuncout(self): + c = self.cursor() + try: + params = [None] + c.callproc("funcout", params) + data = c.fetchone() + assert data is not None, "data was None" + self.assertEquals(1, len(data)) + self.assertEquals("returned", data[0]) + self.assertEquals("tested", params[0].strip()) + finally: + c.close() - def testMultipleFetch(self): - """testing the second fetch call to a callproc() is None""" - c = self.cursor() - try: - c.callproc("funcnone") - data = c.fetchone() - assert data is not None, "data was None" - data = c.fetchone() - assert data is None, "data was not None" - finally: - c.close() + def testMultipleFetch(self): + """testing the second fetch call to a callproc() is None""" + c = self.cursor() + try: + c.callproc("funcnone") + data = c.fetchone() + assert data is not None, "data was None" + data = c.fetchone() + assert data is None, "data was not None" + finally: + c.close() class SQLServerSPTest(zxCoreTestCase): - def testProcWithResultSet(self): - c = self.cursor() + def testProcWithResultSet(self): + c = self.cursor() + try: + for a in (("table", "sptest"), ("procedure", "sp_proctest")): try: - for a in (("table", "sptest"), ("procedure", "sp_proctest")): - try: - c.execute("drop %s %s" % (a)) - except: - pass + c.execute("drop %s %s" % (a)) + except: + pass - c.execute("create table sptest (a int, b varchar(32))") - c.execute("insert into sptest values (1, 'hello')") - c.execute("insert into sptest values (2, 'there')") - c.execute("insert into sptest values (3, 'goodbye')") + c.execute("create table sptest (a int, b varchar(32))") + c.execute("insert into sptest values (1, 'hello')") + c.execute("insert into sptest values (2, 'there')") + c.execute("insert into sptest values (3, 'goodbye')") - c.execute(""" create procedure sp_proctest (@A int) as select a, b from sptest where a <= @A """) - self.db.commit() + c.execute(""" create procedure sp_proctest (@A int) as select a, b from sptest where a <= @A """) + self.db.commit() - c.callproc("sp_proctest", (2,)) - data = c.fetchall() - self.assertEquals(2, len(data)) - self.assertEquals(2, len(c.description)) - assert c.nextset() is not None, "expected an additional result set" - data = c.fetchall() - self.assertEquals(1, len(data)) - self.assertEquals(1, len(c.description)) - finally: - c.close() + c.callproc("sp_proctest", (2,)) + data = c.fetchall() + self.assertEquals(2, len(data)) + self.assertEquals(2, len(c.description)) + assert c.nextset() is not None, "expected an additional result set" + data = c.fetchall() + self.assertEquals(1, len(data)) + self.assertEquals(1, len(c.description)) + finally: + c.close() # def testSalesByCategory(self): # c = self.cursor() Modified: trunk/jython/Lib/test/zxjdbc/zxtest.py =================================================================== --- trunk/jython/Lib/test/zxjdbc/zxtest.py 2009-08-10 17:22:46 UTC (rev 6646) +++ trunk/jython/Lib/test/zxjdbc/zxtest.py 2009-08-10 17:23:22 UTC (rev 6647) @@ -12,1231 +12,1231 @@ class zxCoreTestCase(runner.SQLTestCase): - def setUp(self): - runner.SQLTestCase.setUp(self) - self.db = self.connect() - self.db.autocommit = 0 + def setUp(self): + runner.SQLTestCase.setUp(self) + self.db = self.connect() + self.db.autocommit = 0 - def tearDown(self): - self.db.close() - runner.SQLTestCase.tearDown(self) + def tearDown(self): + self.db.close() + runner.SQLTestCase.tearDown(self) - def connect(self): - factory = runner.__imp__(self.factory.classname) - args = map(lambda x: x[1], self.factory.arguments) - connect = getattr(factory, self.factory.method) - return apply(connect, args, self.factory.keywords) + def connect(self): + factory = runner.__imp__(self.factory.classname) + args = map(lambda x: x[1], self.factory.arguments) + connect = getattr(factory, self.factory.method) + return apply(connect, args, self.factory.keywords) - def cursor(self, *args, **kws): - c = apply(self.db.cursor, args, kws) - if hasattr(self, "datahandler"): - c.datahandler = self.datahandler(c.datahandler) - return c + def cursor(self, *args, **kws): + c = apply(self.db.cursor, args, kws) + if hasattr(self, "datahandler"): + c.datahandler = self.datahandler(c.datahandler) + return c class zxJDBCTestCase(zxCoreTestCase): - def setUp(self): - zxCoreTestCase.setUp(self) + def setUp(self): + zxCoreTestCase.setUp(self) - c = self.cursor() + c = self.cursor() - try: - c.execute("drop table zxtesting") - self.db.commit() - except: - self.db.rollback() + try: + c.execute("drop table zxtesting") + self.db.commit() + except: + self.db.rollback() - try: - c.execute("create table zxtesting (id int not null, name varchar(32), state varchar(32), primary key (id))") - self.db.commit() - c.execute("insert into zxtesting (id, name, state) values (1, 'test0', 'il')") - c.execute("insert into zxtesting (id, name, state) values (2, 'test1', 'wi')") - c.execute("insert into zxtesting (id, name, state) values (3, 'test2', 'tx')") - c.execute("insert into zxtesting (id, name, state) values (4, 'test3', 'co')") - c.execute("insert into zxtesting (id, name, state) values (5, 'test4', 'il')") - c.execute("insert into zxtesting (id, name, state) values (6, 'test5', 'ca')") - c.execute("insert into zxtesting (id, name, state) values (7, 'test6', 'wi')") - self.db.commit() - finally: - c.close() + try: + c.execute("create table zxtesting (id int not null, name varchar(32), state varchar(32), primary key (id))") + self.db.commit() + c.execute("insert into zxtesting (id, name, state) values (1, 'test0', 'il')") + c.execute("insert into zxtesting (id, name, state) values (2, 'test1', 'wi')") + c.execute("insert into zxtesting (id, name, state) values (3, 'test2', 'tx')") + c.execute("insert into zxtesting (id, name, state) values (4, 'test3', 'co')") + c.execute("insert into zxtesting (id, name, state) values (5, 'test4', 'il')") + c.execute("insert into zxtesting (id, name, state) values (6, 'test5', 'ca')") + c.execute("insert into zxtesting (id, name, state) values (7, 'test6', 'wi')") + self.db.commit() + finally: + c.close() - def tearDown(self): + def tearDown(self): - c = self.cursor() - try: - try: - c.execute("drop table zxtesting") - except: - self.db.rollback() - finally: - c.close() + c = self.cursor() + try: + try: + c.execute("drop table zxtesting") + except: + self.db.rollback() + finally: + c.close() - zxCoreTestCase.tearDown(self) + zxCoreTestCase.tearDown(self) class zxAPITestCase(zxJDBCTestCase): - def testConnection(self): - """testing connection""" - assert self.db, "invalid connection" + def testConnection(self): + """testing connection""" + assert self.db, "invalid connection" - def testAutocommit(self): - """testing autocommit functionality""" - if self.db.__connection__.getMetaData().supportsTransactions(): - self.db.autocommit = 1 - self.assertEquals(1, self.db.__connection__.getAutoCommit()) - self.db.autocommit = 0 - self.assertEquals(0, self.db.__connection__.getAutoCommit()) + def testAutocommit(self): + """testing autocommit functionality""" + if self.db.__connection__.getMetaData().supportsTransactions(): + self.db.autocommit = 1 + self.assertEquals(1, self.db.__connection__.getAutoCommit()) + self.db.autocommit = 0 + self.assertEquals(0, self.db.__connection__.getAutoCommit()) - def testSimpleQuery(self): - """testing simple queries with cursor.execute(), no parameters""" - c = self.cursor() - try: - c.execute("select count(*) from zxtesting") - f = c.fetchall() - assert len(f) == 1, "expecting one row" - c.execute("select * from zxtesting") - data = c.fetchone() - assert len(f) == 1, "expecting one row" - assert data[0] == 1, "expected [1] rows, got [%d]" % (data[0]) - finally: - c.close() + def testSimpleQuery(self): + """testing simple queries with cursor.execute(), no parameters""" + c = self.cursor() + try: + c.execute("select count(*) from zxtesting") + f = c.fetchall() + assert len(f) == 1, "expecting one row" + c.execute("select * from zxtesting") + data = c.fetchone() + assert len(f) == 1, "expecting one row" + assert data[0] == 1, "expected [1] rows, got [%d]" % (data[0]) + finally: + c.close() - def testNoneQuery(self): - """testing that executing None doesn't fail""" - c = self.cursor() - ... [truncated message content] |