From: Kevin C. <ke...@us...> - 2006-02-09 09:57:41
|
Update of /cvsroot/mailmanager/MailManager/tests/classes In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv23248/tests/classes Modified Files: Tag: RELENG_2_1 testStructures.py Log Message: Copied in the up to date version from HEAD Index: testStructures.py =================================================================== RCS file: /cvsroot/mailmanager/MailManager/tests/classes/testStructures.py,v retrieving revision 1.12.2.4 retrieving revision 1.12.2.5 diff -u -d -r1.12.2.4 -r1.12.2.5 --- testStructures.py 23 Jan 2006 18:12:59 -0000 1.12.2.4 +++ testStructures.py 9 Feb 2006 09:57:32 -0000 1.12.2.5 @@ -1,4 +1,4 @@ -# (c) Copyright Logicalware Ltd 2002-2006 +# (c) Copyright Logicalware Ltd 2002-2005 # Logicalware MailManager comes with ABSOLUTELY NO WARRANTY. Further details # including conditions of redistribution are contained in LICENSE.txt # http://www.logicalware.org/ @@ -68,6 +68,7 @@ from pprint import pformat import zLOG import logging +import sys ############################################################################## @@ -75,14 +76,14 @@ # Log Handling # -debug_log=1 +debug_log=0 if debug_log: logger = logging.getLogger() hdlr = logging.FileHandler('/tmp/testsuite.log') formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') hdlr.setFormatter(formatter) - logger.addHandler(hdlr) + logger.addHandler(hdlr) logger.setLevel(logging.DEBUG) @@ -105,7 +106,7 @@ zLOG.LOG('ZPsycopgDA:', zLOG.INFO, 'query: %s %s' % (pformat(query_string), pformat(query_data))) if self.debugState == 'ok': - return Products.ZPsycopgDA.db.DB.query(self, + return Products.ZPsycopgDA.db.DB.query(self, query_string, max_rows, query_data) elif self.debugState == 'DB Error': raise psycopg.ProgrammingError('Test SQL Exception') @@ -117,7 +118,7 @@ def __init__(self, id, title, connection_string, zdatetime, check=None, tilevel=2, encoding='UTF-8', ustrings=0): - Products.ZPsycopgDA.DA.Connection.__init__(self, id, title, + Products.ZPsycopgDA.DA.Connection.__init__(self, id, title, connection_string, zdatetime, check, tilevel, encoding, ustrings) # # Add on the counter hook @@ -146,6 +147,7 @@ class DebugMailHost(BlackholeMailHost): def __init__(self, *args, **kw): + kw['savemsgs'] = True BlackholeMailHost.__init__(self, *args, **kw) self.debugState = 'ok' @@ -157,9 +159,13 @@ if self.debugState == 'ok': return BlackholeMailHost._send(self, mfrom, mto, messageText) else: - raise smtplib.SMTPRecipientsRefused( - { 'mto' : (450,'tempfail') } - ) + # FIXME: This doesn't match the smtplib class. It needs looked + # at to improve the error responses to end users where this + # exception is caught. + faildict = {} + for addr in mto: + faildict[addr] = (450,'tempfail') + raise smtplib.SMTPRecipientsRefused(faildict) ### POP3/IMAP ############################################################### @@ -198,8 +204,9 @@ return poplib.quit(self) class debug_poplib(poplib.POP3): + def user(self, user): - raise poplib.error_proto('something') + raise poplib.error_proto('Test error generated in debug_poplib') return poplib.user(self, user) def pass_(self, pswd): @@ -220,6 +227,77 @@ def quit(self): return poplib.quit(self) + +class mock_poplib_factory: + """ Mock object to emulate a mail server covering all of the + functionality we use. + + This object has some hook points where you can add in your own + callbacks to test failures at certain points of processing. It is + primarily designed for the getMail test. + """ + + debug = True + + def __init__(self): + self.messages = [] + + def addMessage(self, message): + self.messages.append(message) + + def __call__(self, hostname): + print "Generating a connection to host %s" % hostname + conn = mock_poplib(hostname) + for message in self.messages: + conn.addMessage(message) + return conn + +class mock_poplib: + + """ Notes on this class: + + Message ids are indexed from 1, not 0 + None is used as a flag to show a message has been deleted + + """ + + debug = True + + def addMessage(self, message): + self.messages.append(message) + + def __init__(self, remotehost): + if self.debug: print "Mock pop3lib connecting to %s" % remotehost + self.messages = [] + self.quitcalled = False + + def user(self, user): + pass + + def pass_(self, pswd): + pass + + def list(self, which=None): + return ('+OK POP3 clients that break here, they violate STD53.', + ['%i %i' % (id+1, len(self.messages[id])) + for id in range(len(self.messages))]) + + def retr(self, which): + print which + print self.messages + message = self.messages[which-1] + return ('+OK %i octets follow.' % len(message), + message.split('\n'), + len(message)) + + def dele(self, which): + self.messages[which-1] = None # Flag for deletion + + def quit(self): + self.quitcalled = True + + + if pop3_ssl: class debug_poplib_ssl(poplib.POP3_SSL): pass @@ -287,87 +365,87 @@ schema = 'testschema' postgres_mapping = { - 'sql_delimiter' : ';', - 'sql_boolean' : 'BOOL', - 'sql_binary' : 'BYTEA', - 'sql_interval' : 'BIGINT', - 'sql_database' : 'postgres', - 'sql_deferrable' : '', - 'sql_datetimestamp' : 'TIMESTAMP', - 'sql_initdefer' : '', - 'sql_indexlimit' : '', - 'sql_serialpkey' : 'SERIAL PRIMARY KEY', - 'sql_tabletype' : '', - 'sql_true' : '\'t\'', - 'sql_false' : '\'f\'', - 'sql_constraints' : True, - 'sql_tabletype' : '', - 'sql_true' : '\'t\'', - 'sql_false' : '\'f\'', - 'sql_varchar' : 'VARCHAR', - 'sql_smalltext' : 'TEXT', - 'sql_largetext' : 'TEXT', - 'sql_integer' : 'INT', - 'sql_textsearch' : True, - 'sql_charset' : '', - 'sql_truevar' : 't', - 'sql_falsevar' : 'f', - 'sql_constraints' : True, - 'suboptimise' : True + 'sql_delimiter' : ';', + 'sql_boolean' : 'BOOL', + 'sql_binary' : 'BYTEA', + 'sql_interval' : 'BIGINT', + 'sql_database' : 'postgres', + 'sql_deferrable' : '', + 'sql_datetimestamp' : 'TIMESTAMP', + 'sql_initdefer' : '', + 'sql_indexlimit' : '', + 'sql_serialpkey' : 'SERIAL PRIMARY KEY', + 'sql_tabletype' : '', + 'sql_true' : '\'t\'', + 'sql_false' : '\'f\'', + 'sql_constraints' : True, + 'sql_tabletype' : '', + 'sql_true' : '\'t\'', + 'sql_false' : '\'f\'', + 'sql_varchar' : 'VARCHAR', + 'sql_smalltext' : 'TEXT', + 'sql_largetext' : 'TEXT', + 'sql_integer' : 'INT', + 'sql_textsearch' : True, + 'sql_charset' : '', + 'sql_truevar' : 't', + 'sql_falsevar' : 'f', + 'sql_constraints' : True, + 'suboptimise' : True } mysql_mapping = { - 'sql_delimiter' : '\0', - 'sql_boolean' : 'BOOL', - 'sql_binary' : 'BLOB', - 'sql_interval' : 'BIGINT', - 'sql_database' : 'mysql', - 'sql_deferrable' : '', - 'sql_datetimestamp' : 'DATETIME', - 'sql_initdefer' : '', - 'sql_indexlimit' : '(255)', - 'sql_serialpkey' : 'INT PRIMARY KEY AUTO_INCREMENT', - 'sql_tabletype' : 'TYPE=InnoDB', - 'sql_true' : 1, - 'sql_false' : 0, - 'sql_constraints' : False, - 'sql_charset' : 'DEFAULT CHARSET=UTF8', - 'sql_truevar' : 1, - 'sql_falsevar' : 0, - 'sql_varchar' : 'VARCHAR', - 'sql_smalltext' : 'TEXT', - 'sql_largetext' : 'TEXT', - 'sql_integer' : 'INT', - 'sql_textsearch' : True, - 'sql_charset' : 'DEFAULT CHARSET=UTF8', - 'mysql_max_allowed_packet' : 32 * 1024 * 1024, - 'suboptimise' : False + 'sql_delimiter' : '\0', + 'sql_boolean' : 'BOOL', + 'sql_binary' : 'BLOB', + 'sql_interval' : 'BIGINT', + 'sql_database' : 'mysql', + 'sql_deferrable' : '', + 'sql_datetimestamp' : 'DATETIME', + 'sql_initdefer' : '', + 'sql_indexlimit' : '(255)', + 'sql_serialpkey' : 'INT PRIMARY KEY AUTO_INCREMENT', + 'sql_tabletype' : 'TYPE=InnoDB', + 'sql_true' : 1, + 'sql_false' : 0, + 'sql_constraints' : False, + 'sql_charset' : 'DEFAULT CHARSET=UTF8', + 'sql_truevar' : 1, + 'sql_falsevar' : 0, + 'sql_varchar' : 'VARCHAR', + 'sql_smalltext' : 'TEXT', + 'sql_largetext' : 'TEXT', + 'sql_integer' : 'INT', + 'sql_textsearch' : True, + 'sql_charset' : 'DEFAULT CHARSET=UTF8', + 'mysql_max_allowed_packet' : 32 * 1024 * 1024, + 'suboptimise' : False } oracle_mapping = { - 'sql_delimiter' : '/', - 'sql_boolean' : "CHAR(1)", - 'sql_binary' : 'BLOB', - 'sql_interval' : 'NUMBER', - 'sql_database' : 'oracle', - 'sql_deferrable' : '', - 'sql_datetimestamp' : 'TIMESTAMP', - 'sql_initdefer' : '', - 'sql_indexlimit' : '', - 'sql_serialpkey' : 'INTEGER PRIMARY KEY', - 'sql_tabletype' : '', - 'sql_charset' : '', - 'sql_true' : '\'t\'', - 'sql_false' : '\'f\'', - 'sql_truevar' : 't', - 'sql_falsevar' : 'f', - 'sql_varchar' : 'VARCHAR2', - 'sql_smalltext' : 'VARCHAR(2048)', - 'sql_largetext' : 'CLOB', - 'sql_integer' : 'INTEGER', - 'sql_textsearch' : True, - 'sql_charset' : '', - 'suboptimise' : False + 'sql_delimiter' : '/', + 'sql_boolean' : "CHAR(1)", + 'sql_binary' : 'BLOB', + 'sql_interval' : 'NUMBER', + 'sql_database' : 'oracle', + 'sql_deferrable' : '', + 'sql_datetimestamp' : 'TIMESTAMP', + 'sql_initdefer' : '', + 'sql_indexlimit' : '', + 'sql_serialpkey' : 'INTEGER PRIMARY KEY', + 'sql_tabletype' : '', + 'sql_charset' : '', + 'sql_true' : '\'t\'', + 'sql_false' : '\'f\'', + 'sql_truevar' : 't', + 'sql_falsevar' : 'f', + 'sql_varchar' : 'VARCHAR2', + 'sql_smalltext' : 'VARCHAR(2048)', + 'sql_largetext' : 'CLOB', + 'sql_integer' : 'INTEGER', + 'sql_textsearch' : True, + 'sql_charset' : '', + 'suboptimise' : False } @@ -404,7 +482,7 @@ return sqldata class ZSQLFileMethod(ZSQLMethod): - zsqlroot = os.path.join(Globals.INSTANCE_HOME, 'Products', 'MailManager', 'sql') + zsqlroot = os.path.join(Globals.INSTANCE_HOME, 'Products', 'MailManager', 'sql', 'v2_1') def sql_quote__(self, v): "Taken from Shared.DC.ZRDB.Connection" @@ -424,9 +502,9 @@ class mailManagerTestCase(ZopeTestCase.ZopeTestCase): """ This is the basic structure for a test case running against MailManager - + This class provides the ability to create a MailManager object, running - against a backend database, and subsequently remove that object. + against a backend database, and subsequently remove that object. Configuration is loaded from /etc/mailmanager/testsuite.conf self.config - The configuration, presented as a dict @@ -434,11 +512,11 @@ self.setupMailManager(dbplatform=?) - Create a new clean MM instance self.removeMailManager(dbplatform=?) - Remove the MM instance and DB data - + Various members of this class are hookable for pluggable test support """ - # Hookable object creation + # Hookable object creation # Replace these in subclasses in order to test using different objects _MailManagerAdd = staticmethod(manage_addMailManager) @@ -453,12 +531,11 @@ try: _ZMySQLAdd = staticmethod(manage_addZMySQLConnection) - #_debugZMySQLAdd = staticmethod(manage_addDebugZMySQLConnection) except NameError: pass # Turn this on to enable heavy zope logging - _logtrace = True + _logtrace = False def afterSetUp(self): @@ -475,7 +552,7 @@ print "Config loaded from env setting - %s" % filename except: print "Unable to load env configuration file setting!" - raise + raise else: try: self.config = ConfigRegistry(os.path.join(Globals.package_home(globals()),'..', 'testsuite.conf')) @@ -486,25 +563,25 @@ print "Config loaded from global /etc/mailmanager/testsuite.conf" except: print "Unable to load any configuration file!" - raise - + raise + self.config['databases']['supported_databases'] = \ [x.strip() for x in self.config['databases']['supported_databases'].split(',')] - # Open ZODB connection - self.app = ZopeTestCase.app() - # Set up sessioning objects ZopeTestCase.utils.setupCoreSessions(self.app) # Login as the admin user - from AccessControl.SecurityManagement import newSecurityManager, getSecurityManager - user = self.app.acl_users.userFolderAddUser('admin', 'letmein', ('Manager', 'Tickets', 'Settings'), ()) - user = self.app.acl_users.getUser('admin').__of__(self.app.acl_users) - newSecurityManager(None, user) + self.setCurrentUser('admin', ('Manager', 'Tickets', 'Reports', 'Settings')) self.request = self.makerequest() + def setCurrentUser(self, username, roles, password='letmein'): + from AccessControl.SecurityManagement import newSecurityManager, getSecurityManager + user = self.app.acl_users.userFolderAddUser(username, password, roles, ()) + user = self.app.acl_users.getUser(username).__of__(self.app.acl_users) + newSecurityManager(None, user) + def makerequest(self): # Put SESSION object into REQUEST request = self.app.REQUEST @@ -516,11 +593,11 @@ return request def setupMailManager(self, dbplatform, debugmode=False): + self.removeMailManager() self.wipeDB(dbplatform=dbplatform) self.openDB(dbplatform=dbplatform) - root = self.app if dbplatform == 'mysql': @@ -546,9 +623,9 @@ # Create the zpsycopg connection print "Adding Database Connection" if debugmode: - self._debugZPsycopgAdd(root, 'mailmanager_db', 'MailManager Database Connection', connstring, zdatetime=True, tilevel=0) + self._debugZPsycopgAdd(root, 'mailmanager_db', 'MailManager Database Connection', connstring, zdatetime=True, tilevel=2) else: - self._ZPsycopgAdd(root, 'mailmanager_db', 'MailManager Database Connection', connstring, zdatetime=True, tilevel=0) + self._ZPsycopgAdd(root, 'mailmanager_db', 'MailManager Database Connection', connstring, zdatetime=True, tilevel=2) # Create a mailhost object @@ -560,11 +637,11 @@ print "Adding MailManager" self._debugMailManagerAdd(root, id='mail', title='MailManager', admin_username='manager', admin_email='de...@lo...', - password='letmein', confirm_password='letmein', - schema=self.config['database_%s' % dbplatform]['schema'], + password='letmein', confirm_password='letmein', + schema=self.config['database_%s' % dbplatform]['schema'], dbplatform=dbplatform, create_tables=True) root.mail.debug = True - + # Fix up Zope logging logger = logging.getLogger('event') handler = logging.StreamHandler() @@ -578,8 +655,8 @@ print "Adding MailManager" self._MailManagerAdd(root, id='mail', title='MailManager', admin_username='manager', admin_email='de...@lo...', - password='letmein', confirm_password='letmein', - schema=self.config['database_%s' % dbplatform]['schema'], + password='letmein', confirm_password='letmein', + schema=self.config['database_%s' % dbplatform]['schema'], dbplatform=dbplatform, create_tables=True) self.mmobj = root.mail @@ -604,7 +681,7 @@ if pop3_ssl: self.mmobj.pop3lib_ssl = debug_poplib_ssl self.mmobj.imap4lib = debug_imaplib - self.mmobj.imap4lib_ssql = debug_imaplib_ssl + self.mmobj.imap4lib_ssl = debug_imaplib_ssl #self.mmobj.saveDBState() #get_transaction().commit() @@ -671,10 +748,10 @@ mm_password = self.config['database_%s' % dbplatform]['pass'], su_user = self.config['database_%s' % dbplatform]['adminuser'], su_pass = self.config['database_%s' % dbplatform]['adminpass'], + psql = self.config['database_%s' % dbplatform]['psql'], + createlang = self.config['database_%s' % dbplatform]['createlang'], tsearch = True, - tsearch_file = self.config['database_%s' % dbplatform]['tsearch'], - psql = self.config['paths']['psql'], - createlang = self.config['paths']['createlang'] + tsearch_file = self.config['database_%s' % dbplatform]['tsearch'] ) else: setupdb.createMySQLDatabase( |