[SQL-CVS] r727 - in trunk/SQLObject/sqlobject: . tests
SQLObject is a Python ORM.
Brought to you by:
ianbicking,
phd
From: <sub...@co...> - 2005-04-18 14:36:50
|
Author: phd Date: 2005-04-18 14:36:32 +0000 (Mon, 18 Apr 2005) New Revision: 727 Added: trunk/SQLObject/sqlobject/tests/test_aliases.py Modified: trunk/SQLObject/sqlobject/sqlbuilder.py Log: Implemented table aliases; now one can join a table with itself. Modified: trunk/SQLObject/sqlobject/sqlbuilder.py =================================================================== --- trunk/SQLObject/sqlobject/sqlbuilder.py 2005-04-18 14:23:23 UTC (rev 726) +++ trunk/SQLObject/sqlobject/sqlbuilder.py 2005-04-18 14:36:32 UTC (rev 727) @@ -69,6 +69,7 @@ import re, fnmatch import operator +import threading from converters import sqlrepr, registerConverter, TRUE, FALSE safeSQLRE = re.compile(r'^[a-zA-Z][a-zA-Z0-9_\.]*$') @@ -307,25 +308,40 @@ ## Namespaces ######################################## -class TableSpace: - def __getattr__(self, attr): - if attr.startswith('__'): - raise AttributeError - return Table(attr) +class Field(SQLExpression): + def __init__(self, tableName, fieldName): + self.tableName = tableName + self.fieldName = fieldName + def __sqlrepr__(self, db): + return self.tableName + "." + self.fieldName + def tablesUsedImmediate(self): + return [self.tableName] + def execute(self, executor): + return executor.field(self.tableName, self.fieldName) +class SQLObjectField(Field): + def __init__(self, tableName, fieldName, original): + self.original = original + Field.__init__(self, tableName, fieldName) + +registerConverter(SQLObjectField, SQLExprConverter) + class Table(SQLExpression): + FieldClass = Field + def __init__(self, tableName): self.tableName = tableName def __getattr__(self, attr): if attr.startswith('__'): raise AttributeError - return Field(self.tableName, attr) + return self.FieldClass(self.tableName, attr) def __sqlrepr__(self, db): return str(self.tableName) def execute(self, executor): raise ValueError, "Tables don't have values" class SQLObjectTable(Table): + FieldClass = SQLObjectField def __init__(self, soClass): self.soClass = soClass @@ -338,37 +354,71 @@ if attr.startswith('__'): raise AttributeError if attr == 'id': - return SQLObjectField(self.tableName, self.soClass.sqlmeta.idName, attr) + return self.FieldClass(self.tableName, self.soClass.sqlmeta.idName, attr) else: - return SQLObjectField(self.tableName, + return self.FieldClass(self.tableName, self.soClass.sqlmeta._columnDict[attr].dbName, attr) -class Field(SQLExpression): - def __init__(self, tableName, fieldName): - self.tableName = tableName - self.fieldName = fieldName +class TableSpace: + TableClass = Table + + def __getattr__(self, attr): + if attr.startswith('__'): + raise AttributeError + return self.TableClass(attr) + +class ConstantSpace: + def __getattr__(self, attr): + if attr.startswith('__'): + raise AttributeError + return SQLConstant(attr) + + +######################################## +## Table aliases +######################################## + +class AliasField(Field): + def __init__(self, tableName, fieldName, alias): + Field.__init__(self, tableName, fieldName) + self.alias = alias + def __sqlrepr__(self, db): - return self.tableName + "." + self.fieldName + return self.alias + "." + self.fieldName + def tablesUsedImmediate(self): - return [self.tableName] - def execute(self, executor): - return executor.field(self.tableName, self.fieldName) + return ["%s AS %s" % (self.tableName, self.alias)] -class SQLObjectField(Field): - def __init__(self, tableName, fieldName, original): - self.original = original - Field.__init__(self, tableName, fieldName) +class AliasTable(Table): + FieldClass = AliasField -registerConverter(SQLObjectField, SQLExprConverter) + _alias_lock = threading.Lock() + _alias_counter = 0 -class ConstantSpace: + def __init__(self, tableName, alias=None): + Table.__init__(self, tableName) + if alias is None: + self._alias_lock.acquire() + try: + AliasTable._alias_counter += 1 + alias = "%s_alias%d" % (tableName, AliasTable._alias_counter) + finally: + self._alias_lock.release() + self.alias = alias + def __getattr__(self, attr): if attr.startswith('__'): raise AttributeError - return SQLConstant(attr) + return self.FieldClass(self.tableName, attr, self.alias) +class Alias: + def __init__(self, table, alias=None): + if hasattr(table, "sqlmeta"): + table = table.sqlmeta.table + self.q = AliasTable(table, alias) + ######################################## ## SQL Statements ######################################## @@ -599,8 +649,16 @@ class SQLJoin(SQLExpression): def __init__(self, table1, table2, op=','): - if table1 and type(table1) <> str: table1 = table1.sqlmeta.table - if type(table2) <> str: table2 = table2.sqlmeta.table + if table1 and type(table1) <> str: + if isinstance(table1, Alias): + table1 = "%s AS %s" % (table1.q.tableName, table1.q.alias) + else: + table1 = table1.sqlmeta.table + if type(table2) <> str: + if isinstance(table2, Alias): + table2 = "%s AS %s" % (table2.q.tableName, table2.q.alias) + else: + table2 = table2.sqlmeta.table self.table1 = table1 self.table2 = table2 self.op = op Added: trunk/SQLObject/sqlobject/tests/test_aliases.py =================================================================== --- trunk/SQLObject/sqlobject/tests/test_aliases.py 2005-04-18 14:23:23 UTC (rev 726) +++ trunk/SQLObject/sqlobject/tests/test_aliases.py 2005-04-18 14:36:32 UTC (rev 727) @@ -0,0 +1,27 @@ +from sqlobject import * +from sqlobject.sqlbuilder import * +from sqlobject.tests.dbtest import * + +######################################## +## Table aliases and self-joins +######################################## + +class TestJoinAlias(SQLObject): + name = StringCol() + parent = StringCol() + +def test_1syntax(): + setupClass(TestJoinAlias) + alias = Alias(TestJoinAlias) + select = TestJoinAlias.select(TestJoinAlias.q.parent == alias.q.name) + assert str(select) == \ + "SELECT test_join_alias.id, test_join_alias.name, test_join_alias.parent FROM test_join_alias, test_join_alias AS test_join_alias_alias1 WHERE (test_join_alias.parent = test_join_alias_alias1.name)" + +def test_2perform_join(): + setupClass(TestJoinAlias) + TestJoinAlias(name="grandparent", parent=None) + TestJoinAlias(name="parent", parent="grandparent") + TestJoinAlias(name="child", parent="parent") + alias = Alias(TestJoinAlias) + select = TestJoinAlias.select(TestJoinAlias.q.parent == alias.q.name) + assert select.count() == 2 |