Author: phd
Date: 2005-04-18 14:36:32 +0000 (Mon, 18 Apr 2005)
New Revision: 727
Added:
trunk/SQLObject/sqlobject/tests/test_aliases.py
Modified:
trunk/SQLObject/sqlobject/sqlbuilder.py
Log:
Implemented table aliases; now one can join a table with itself.
Modified: trunk/SQLObject/sqlobject/sqlbuilder.py
===================================================================
--- trunk/SQLObject/sqlobject/sqlbuilder.py 2005-04-18 14:23:23 UTC (rev 726)
+++ trunk/SQLObject/sqlobject/sqlbuilder.py 2005-04-18 14:36:32 UTC (rev 727)
@@ -69,6 +69,7 @@
import re, fnmatch
import operator
+import threading
from converters import sqlrepr, registerConverter, TRUE, FALSE
safeSQLRE = re.compile(r'^[a-zA-Z][a-zA-Z0-9_\.]*$')
@@ -307,25 +308,40 @@
## Namespaces
########################################
-class TableSpace:
- def __getattr__(self, attr):
- if attr.startswith('__'):
- raise AttributeError
- return Table(attr)
+class Field(SQLExpression):
+ def __init__(self, tableName, fieldName):
+ self.tableName = tableName
+ self.fieldName = fieldName
+ def __sqlrepr__(self, db):
+ return self.tableName + "." + self.fieldName
+ def tablesUsedImmediate(self):
+ return [self.tableName]
+ def execute(self, executor):
+ return executor.field(self.tableName, self.fieldName)
+class SQLObjectField(Field):
+ def __init__(self, tableName, fieldName, original):
+ self.original = original
+ Field.__init__(self, tableName, fieldName)
+
+registerConverter(SQLObjectField, SQLExprConverter)
+
class Table(SQLExpression):
+ FieldClass = Field
+
def __init__(self, tableName):
self.tableName = tableName
def __getattr__(self, attr):
if attr.startswith('__'):
raise AttributeError
- return Field(self.tableName, attr)
+ return self.FieldClass(self.tableName, attr)
def __sqlrepr__(self, db):
return str(self.tableName)
def execute(self, executor):
raise ValueError, "Tables don't have values"
class SQLObjectTable(Table):
+ FieldClass = SQLObjectField
def __init__(self, soClass):
self.soClass = soClass
@@ -338,37 +354,71 @@
if attr.startswith('__'):
raise AttributeError
if attr == 'id':
- return SQLObjectField(self.tableName, self.soClass.sqlmeta.idName, attr)
+ return self.FieldClass(self.tableName, self.soClass.sqlmeta.idName, attr)
else:
- return SQLObjectField(self.tableName,
+ return self.FieldClass(self.tableName,
self.soClass.sqlmeta._columnDict[attr].dbName,
attr)
-class Field(SQLExpression):
- def __init__(self, tableName, fieldName):
- self.tableName = tableName
- self.fieldName = fieldName
+class TableSpace:
+ TableClass = Table
+
+ def __getattr__(self, attr):
+ if attr.startswith('__'):
+ raise AttributeError
+ return self.TableClass(attr)
+
+class ConstantSpace:
+ def __getattr__(self, attr):
+ if attr.startswith('__'):
+ raise AttributeError
+ return SQLConstant(attr)
+
+
+########################################
+## Table aliases
+########################################
+
+class AliasField(Field):
+ def __init__(self, tableName, fieldName, alias):
+ Field.__init__(self, tableName, fieldName)
+ self.alias = alias
+
def __sqlrepr__(self, db):
- return self.tableName + "." + self.fieldName
+ return self.alias + "." + self.fieldName
+
def tablesUsedImmediate(self):
- return [self.tableName]
- def execute(self, executor):
- return executor.field(self.tableName, self.fieldName)
+ return ["%s AS %s" % (self.tableName, self.alias)]
-class SQLObjectField(Field):
- def __init__(self, tableName, fieldName, original):
- self.original = original
- Field.__init__(self, tableName, fieldName)
+class AliasTable(Table):
+ FieldClass = AliasField
-registerConverter(SQLObjectField, SQLExprConverter)
+ _alias_lock = threading.Lock()
+ _alias_counter = 0
-class ConstantSpace:
+ def __init__(self, tableName, alias=None):
+ Table.__init__(self, tableName)
+ if alias is None:
+ self._alias_lock.acquire()
+ try:
+ AliasTable._alias_counter += 1
+ alias = "%s_alias%d" % (tableName, AliasTable._alias_counter)
+ finally:
+ self._alias_lock.release()
+ self.alias = alias
+
def __getattr__(self, attr):
if attr.startswith('__'):
raise AttributeError
- return SQLConstant(attr)
+ return self.FieldClass(self.tableName, attr, self.alias)
+class Alias:
+ def __init__(self, table, alias=None):
+ if hasattr(table, "sqlmeta"):
+ table = table.sqlmeta.table
+ self.q = AliasTable(table, alias)
+
########################################
## SQL Statements
########################################
@@ -599,8 +649,16 @@
class SQLJoin(SQLExpression):
def __init__(self, table1, table2, op=','):
- if table1 and type(table1) <> str: table1 = table1.sqlmeta.table
- if type(table2) <> str: table2 = table2.sqlmeta.table
+ if table1 and type(table1) <> str:
+ if isinstance(table1, Alias):
+ table1 = "%s AS %s" % (table1.q.tableName, table1.q.alias)
+ else:
+ table1 = table1.sqlmeta.table
+ if type(table2) <> str:
+ if isinstance(table2, Alias):
+ table2 = "%s AS %s" % (table2.q.tableName, table2.q.alias)
+ else:
+ table2 = table2.sqlmeta.table
self.table1 = table1
self.table2 = table2
self.op = op
Added: trunk/SQLObject/sqlobject/tests/test_aliases.py
===================================================================
--- trunk/SQLObject/sqlobject/tests/test_aliases.py 2005-04-18 14:23:23 UTC (rev 726)
+++ trunk/SQLObject/sqlobject/tests/test_aliases.py 2005-04-18 14:36:32 UTC (rev 727)
@@ -0,0 +1,27 @@
+from sqlobject import *
+from sqlobject.sqlbuilder import *
+from sqlobject.tests.dbtest import *
+
+########################################
+## Table aliases and self-joins
+########################################
+
+class TestJoinAlias(SQLObject):
+ name = StringCol()
+ parent = StringCol()
+
+def test_1syntax():
+ setupClass(TestJoinAlias)
+ alias = Alias(TestJoinAlias)
+ select = TestJoinAlias.select(TestJoinAlias.q.parent == alias.q.name)
+ assert str(select) == \
+ "SELECT test_join_alias.id, test_join_alias.name, test_join_alias.parent FROM test_join_alias, test_join_alias AS test_join_alias_alias1 WHERE (test_join_alias.parent = test_join_alias_alias1.name)"
+
+def test_2perform_join():
+ setupClass(TestJoinAlias)
+ TestJoinAlias(name="grandparent", parent=None)
+ TestJoinAlias(name="parent", parent="grandparent")
+ TestJoinAlias(name="child", parent="parent")
+ alias = Alias(TestJoinAlias)
+ select = TestJoinAlias.select(TestJoinAlias.q.parent == alias.q.name)
+ assert select.count() == 2
|