[Assorted-commits] SF.net SVN: assorted:[1444] mailing-list-filter/trunk/src/mlf.py
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-06-03 01:22:24
|
Revision: 1444 http://assorted.svn.sourceforge.net/assorted/?rev=1444&view=rev Author: yangzhang Date: 2009-06-03 00:17:16 +0000 (Wed, 03 Jun 2009) Log Message: ----------- added faster incremental processing Modified Paths: -------------- mailing-list-filter/trunk/src/mlf.py Modified: mailing-list-filter/trunk/src/mlf.py =================================================================== --- mailing-list-filter/trunk/src/mlf.py 2009-06-02 10:02:44 UTC (rev 1443) +++ mailing-list-filter/trunk/src/mlf.py 2009-06-03 00:17:16 UTC (rev 1444) @@ -95,14 +95,14 @@ for row in chunk: yield row -def fetch_new_mail(cfg): +def fetch_new_mail(cfg, imap): if cfg.refresh: - try: os.remove(dbpath(cfg, 'mail')) + try: os.remove(dbpath(cfg, 'fetched')) except: pass - with closing(opendb(cfg, 'mail', 'w')) as mid2msg: + with closing(opendb(cfg, 'fetched', 'w')) as mid2msg: - minuid = mid2msg.get('maxuid', 1) + minuid = mid2msg.get('maxuid', 0) + 1 maxuid = getmaxuid(imap) # Every second item is just a closing paren. @@ -127,40 +127,39 @@ msg.uid = m.group('uid') msg.flags = m.group('flags').split() -# # Prepare a container for references to other msgs, and initialize the -# # thread ID. + # Prepare a container for references to other msgs, and initialize the + # thread ID. msg.refs = set() msg.tid = None # Add these to the map. if msg['Message-ID'] in mid2msg: - warning( 'duplicate message IDs:', - msg['Message-ID'], msg['Subject'] ) + log.warning( 'dups', 'duplicate message IDs:', + msg['Message-ID'], msg['Subject'] ) mid2msg[ msg['Message-ID'] ] = msg # Periodically sync to disk. if len(mid2msg.cache) > 1000: mid2msg.sync() mid2msg['maxuid'] = maxuid + # XXX + mid2msg['procuid'] = mid2msg['maxuid'] # # Function for analyzing messages. # -def iterpairs(mid2msg, midsrc): - for mid in midsrc: - yield mid, mid2msg[mid] - -def itermsgs(mid2msg, midsrc = None): - pairs = ( mid2msg.iteritems() - if midsrc is None else - iterpairs(mid2msg, midsrc) ) - for i, (mid, msg) in enumerate(pairs): - if mid != 'maxuid': yield msg - # Periodically sync to disk. - if mid2msg.cache is not None and len(mid2msg.cache) > 10000: - debug( 'syncing; now at i', i, 'mid', mid ) - mid2msg.sync() +def itermsgs(mid2msg, minuid, midsrc = None): + if midsrc is None: midsrc = mid2msg + special_keys = ['maxuid', 'procuid'] + for i, (mid, msg) in enumerate(midsrc.iteritems()): + if mid not in special_keys and msg.uid >= minuid: + if midsrc is mid2msg: yield msg + else: yield mid2msg[mid] + # Periodically sync to disk. + if mid2msg.cache is not None and len(mid2msg.cache) > 10000: + debug( 'syncing; now at i', i, 'mid', mid ) + mid2msg.sync() debug( 'syncing; finished after i', i, 'mid', mid ) mid2msg.sync() @@ -171,44 +170,43 @@ try: refmsg = mid2msg[refmid] except KeyError: - warning('no message with mid', refmid) + log.warning('badref', 'no message with mid', refmid) else: if refmsg.tid is None: thread_dfs(refmsg, tid, mid2msg) else: assert refmsg.tid == tid def mark_relevant_threads(cfg): -# shutil.copy(dbpath(cfg, 'fetched'), dbpath(cfg, 'bidir')) -# -# with closing(opendb(cfg, 'fetched')) as midsrc: -# with closing(opendb(cfg, 'bidir', 'w')) as mid2msg: -# info( 'maxuid', midsrc['maxuid'] ) -# info( 'constructing bidirectional ref graph' ) -# for msg in itermsgs(mid2msg, midsrc): -# debug('processing', msg['Message-ID']) -# # XXX -# if not hasattr(msg, 'refs'): msg.refs = set() -# if not hasattr(msg, 'tid'): msg.tid = None -# # XXX -# irt = msg.get_all('In-Reply-To', []) -# refs = msg.get_all('References', []) -# newrefs = ' '.join( irt + refs ).replace('><', '> <').split() -# msg.refs.update( newrefs ) -# -# # Connect nodes in graph bidirectionally. Ignore references to MIDs -# # that don't exist. -# for ref in newrefs: -# debug('adding', ref, '<->', msg['Message-ID']) -# try: mid2msg[ref].refs.add( msg['Message-ID'] ) -# except KeyError: warning( 'no message with mid', ref ) + shutil.copy(dbpath(cfg, 'fetched'), dbpath(cfg, 'bidir')) + with closing(opendb(cfg, 'fetched')) as midsrc: + with closing(opendb(cfg, 'bidir', 'w')) as mid2msg: + procuid = mid2msg.get('procuid', 0) + minuid = procuid + 1 + info( 'maxuid', midsrc['maxuid'], 'procuid', procuid ) + del procuid + info( 'constructing bidirectional ref graph' ) + for msg in itermsgs(mid2msg, minuid, midsrc): + debug('processing', msg['Message-ID']) + irt = msg.get_all('In-Reply-To', []) + refs = msg.get_all('References', []) + newrefs = ' '.join( irt + refs ).replace('><', '> <').split() + msg.refs.update( newrefs ) + + # Connect nodes in graph bidirectionally. Ignore references to MIDs + # that don't exist. + for ref in newrefs: + debug('adding', ref, '<->', msg['Message-ID']) + try: mid2msg[ref].refs.add( msg['Message-ID'] ) + except KeyError: log.warning( 'badref', 'no message with mid', ref ) + shutil.copy(dbpath(cfg, 'bidir'), dbpath(cfg, 'threaded')) with closing(opendb(cfg, 'bidir')) as midsrc: with closing(opendb(cfg, 'threaded', 'w')) as mid2msg: info( 'looking for relevant msgs (grouping them into threads)' ) tids = itertools.count() - for msg in itermsgs(mid2msg, midsrc): + for msg in itermsgs(mid2msg, minuid, midsrc): debug('threading', msg['Message-ID']) if ( msg.tid is None and ( cfg.sender in msg.get('From', '') or @@ -224,7 +222,7 @@ info( 'starring/unstarring relevant/irrelevant threads' ) - for msg in itermsgs(mid2msg): + for msg in itermsgs(mid2msg, 0): if msg.tid is not None: # is a relevant msgs if r'\Flagged' not in msg.flags: # not already flagged mark_unseen = not cfg.no_mark_unseen and r'\Seen' in msg.flags @@ -244,6 +242,8 @@ imap.uid('STORE', msg.uid, '-FLAGS', r'\Flagged') if mark_seen: imap.uid('STORE', msg.uid, '+FLAGS', r'\Seen') + mid2msg['procuid'] = mid2msg['maxuid'] + # # Main function. # @@ -304,9 +304,9 @@ if not cfg.no_fetch: with login_imap(cfg) as imap: - fetch_new_mail(cfg) + fetch_new_mail(cfg, imap) - #mark_relevant_threads(cfg) + mark_relevant_threads(cfg) with login_imap(cfg) as imap: with closing(opendb(cfg, 'threaded')) as mid2msg: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |