[Assorted-commits] SF.net SVN: assorted:[1443] mailing-list-filter/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-06-02 10:02:54
|
Revision: 1443 http://assorted.svn.sourceforge.net/assorted/?rev=1443&view=rev Author: yangzhang Date: 2009-06-02 10:02:44 +0000 (Tue, 02 Jun 2009) Log Message: ----------- rewrote MLF to be disk-based to handle large volumes of mail Modified Paths: -------------- mailing-list-filter/trunk/README mailing-list-filter/trunk/src/mlf.py Modified: mailing-list-filter/trunk/README =================================================================== --- mailing-list-filter/trunk/README 2009-06-01 21:11:41 UTC (rev 1442) +++ mailing-list-filter/trunk/README 2009-06-02 10:02:44 UTC (rev 1443) @@ -39,13 +39,13 @@ Requirements: - [argparse](http://argparse.python-hosting.com/) 0.8.0 -- [Python Commons](http://assorted.sf.net/python-commons/) 0.4 +- [Python Commons](http://assorted.sf.net/python-commons/) 0.6 - [path](http://www.jorendorff.com/articles/python/path/) 2.2 Install the program using the standard `setup.py` program. -Future Work Ideas ------------------ +Todo +---- - Currently, we assume that the server specification points to a mailbox containing all messages (both sent and received), and a message is determined Modified: mailing-list-filter/trunk/src/mlf.py =================================================================== --- mailing-list-filter/trunk/src/mlf.py 2009-06-01 21:11:41 UTC (rev 1442) +++ mailing-list-filter/trunk/src/mlf.py 2009-06-02 10:02:44 UTC (rev 1443) @@ -12,8 +12,9 @@ from path import path from functools import partial from commons.log import * -from contextlib import closing -import getpass, logging, shelve, email, re, os, imaplib, itertools, argparse, collections +from contextlib import closing, contextmanager +import getpass, logging, shelve, email, re, os, imaplib, itertools +import argparse, collections, subprocess, shutil from commons import log, startup, seqs, networking, files, sqlhash info = partial(log.info, 'main') @@ -23,21 +24,40 @@ die = partial(log.die, 'main') exception = partial(log.exception, 'main') -def opendb(dbpath): - return sqlhash.Shelf(sqlhash.SQLhash(dbpath, flags = 'w'), - protocol = 2, writeback = True) +# +# Functions for getting local file paths and opening them. +# -def thread_dfs(msg, tid, mid2msg, tid2msgs): - assert msg.tid is None - msg.tid = tid - tid2msgs[tid].append(msg) - for refmid in msg.refs: - refmsg = mid2msg[refmid] - if refmsg.tid is None: - thread_dfs(refmsg, tid, mid2msg, tid2msgs) - else: - assert refmsg.tid == tid +def dbpath(cfg, suffix): + return ( cfg.cachedir / + files.cleanse_filename(cfg.sender) + '-' + suffix ) +def opendb(cfg, suffix, flags = 'r'): + return sqlhash.Shelf(sqlhash.SQLhash(dbpath(cfg, suffix), + flags = flags), + protocol = 2, + writeback = flags == 'w', + cache = flags == 'w') + +# +# Functions for working with the server. +# + +@contextmanager +def login_imap(cfg): + info('connecting and logging in') + with networking.logout(imaplib.IMAP4_SSL(cfg.host, cfg.port)) as imap: + imap.login(cfg.user, cfg.passwd) + # Close is only valid in the authenticated state. + with closing(imap) as imap: + info('selecting mailbox') + imap.select(cfg.mailbox) + yield imap + +# +# Functions for fetching messages from the server. +# + def getmaxuid(imap): info( 'finding max UID' ) # We use UIDs rather than the default of sequence numbers because UIDs are @@ -49,7 +69,7 @@ del uids return maxuid -def getmail(imap, minuid, maxuid): +def fetch_range(imap, minuid, maxuid): info( 'fetching messages', minuid, 'to', maxuid ) # The syntax/fields of the FETCH command is documented in RFC 2060. Also, # this article contains a brief overview: @@ -75,6 +95,159 @@ for row in chunk: yield row +def fetch_new_mail(cfg): + if cfg.refresh: + try: os.remove(dbpath(cfg, 'mail')) + except: pass + + with closing(opendb(cfg, 'mail', 'w')) as mid2msg: + + minuid = mid2msg.get('maxuid', 1) + maxuid = getmaxuid(imap) + + # Every second item is just a closing paren. + # Example data: + # [('13300 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {67}', + # 'Message-ID: <mai...@py...>\r\n\r\n'), + # ')', + # ('13301 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {59}', + # 'Message-Id: <200...@hv...>\r\n\r\n'), + # ')', + # ('13302 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {92}', + # 'Message-ID: <C43EAFC0.2E3AE%ni...@ya...>\r\nIn-Reply-To: <481...@gm...>\r\n\r\n')] + pat = re.compile(r"(?P<seqno>\d+) \(UID (?P<uid>\d+) FLAGS \((?P<flags>[^)]+)\)") + for i, ((envelope, data), paren) in enumerate(seqs.grouper(2, fetch_range(imap, minuid, maxuid))): + # Parse the body. + msg = email.message_from_string(data) + + # Parse the envelope. + m = pat.match(envelope) + if m is None: raise Exception('envelope: %r' % envelope) + msg.seqno = m.group('seqno') + msg.uid = m.group('uid') + msg.flags = m.group('flags').split() + +# # Prepare a container for references to other msgs, and initialize the +# # thread ID. + msg.refs = set() + msg.tid = None + + # Add these to the map. + if msg['Message-ID'] in mid2msg: + warning( 'duplicate message IDs:', + msg['Message-ID'], msg['Subject'] ) + mid2msg[ msg['Message-ID'] ] = msg + + # Periodically sync to disk. + if len(mid2msg.cache) > 1000: mid2msg.sync() + + mid2msg['maxuid'] = maxuid + +# +# Function for analyzing messages. +# + +def iterpairs(mid2msg, midsrc): + for mid in midsrc: + yield mid, mid2msg[mid] + +def itermsgs(mid2msg, midsrc = None): + pairs = ( mid2msg.iteritems() + if midsrc is None else + iterpairs(mid2msg, midsrc) ) + for i, (mid, msg) in enumerate(pairs): + if mid != 'maxuid': yield msg + # Periodically sync to disk. + if mid2msg.cache is not None and len(mid2msg.cache) > 10000: + debug( 'syncing; now at i', i, 'mid', mid ) + mid2msg.sync() + debug( 'syncing; finished after i', i, 'mid', mid ) + mid2msg.sync() + +def thread_dfs(msg, tid, mid2msg): + assert msg.tid is None + msg.tid = tid + for refmid in msg.refs: + try: + refmsg = mid2msg[refmid] + except KeyError: + warning('no message with mid', refmid) + else: + if refmsg.tid is None: thread_dfs(refmsg, tid, mid2msg) + else: assert refmsg.tid == tid + +def mark_relevant_threads(cfg): + +# shutil.copy(dbpath(cfg, 'fetched'), dbpath(cfg, 'bidir')) +# +# with closing(opendb(cfg, 'fetched')) as midsrc: +# with closing(opendb(cfg, 'bidir', 'w')) as mid2msg: +# info( 'maxuid', midsrc['maxuid'] ) +# info( 'constructing bidirectional ref graph' ) +# for msg in itermsgs(mid2msg, midsrc): +# debug('processing', msg['Message-ID']) +# # XXX +# if not hasattr(msg, 'refs'): msg.refs = set() +# if not hasattr(msg, 'tid'): msg.tid = None +# # XXX +# irt = msg.get_all('In-Reply-To', []) +# refs = msg.get_all('References', []) +# newrefs = ' '.join( irt + refs ).replace('><', '> <').split() +# msg.refs.update( newrefs ) +# +# # Connect nodes in graph bidirectionally. Ignore references to MIDs +# # that don't exist. +# for ref in newrefs: +# debug('adding', ref, '<->', msg['Message-ID']) +# try: mid2msg[ref].refs.add( msg['Message-ID'] ) +# except KeyError: warning( 'no message with mid', ref ) + + shutil.copy(dbpath(cfg, 'bidir'), dbpath(cfg, 'threaded')) + + with closing(opendb(cfg, 'bidir')) as midsrc: + with closing(opendb(cfg, 'threaded', 'w')) as mid2msg: + info( 'looking for relevant msgs (grouping them into threads)' ) + tids = itertools.count() + for msg in itermsgs(mid2msg, midsrc): + debug('threading', msg['Message-ID']) + if ( msg.tid is None and + ( cfg.sender in msg.get('From', '') or + cfg.sender in msg.get('To', '') or + cfg.sender in msg.get('Cc', '') ) ): + thread_dfs(msg, tids.next(), mid2msg) + +# +# Functions for storing metadata changes back to the server. +# + +def flag_relevant_msgs(cfg, imap, mid2msg): + + info( 'starring/unstarring relevant/irrelevant threads' ) + + for msg in itermsgs(mid2msg): + if msg.tid is not None: # is a relevant msgs + if r'\Flagged' not in msg.flags: # not already flagged + mark_unseen = not cfg.no_mark_unseen and r'\Seen' in msg.flags + log.info( 'star', '\n', + 'star' + (' and mark unseen' if mark_unseen else ''), + msg ) + if not cfg.pretend: + imap.uid('STORE', msg.uid, '+FLAGS', r'\Flagged') + if mark_unseen: imap.uid('STORE', msg.uid, '-FLAGS', r'\Seen') + else: # is not a relevant msg + if r'\Flagged' in msg.flags: # was inadvertently flagged + mark_seen = not cfg.no_mark_seen and r'\Seen' not in msg.flags + log.info( 'unstar', '\n', + 'unstar' + (' and mark seen' if mark_seen else ''), + msg ) + if not cfg.pretend: + imap.uid('STORE', msg.uid, '-FLAGS', r'\Flagged') + if mark_seen: imap.uid('STORE', msg.uid, '+FLAGS', r'\Seen') + +# +# Main function. +# + def main(argv): p = argparse.ArgumentParser(description = __doc__) p.add_argument('--credfile', default = path( '~/.mlf.auth' ).expanduser(), @@ -129,131 +302,14 @@ files.soft_makedirs(cfg.cachedir) - info('connecting and logging in') + if not cfg.no_fetch: + with login_imap(cfg) as imap: + fetch_new_mail(cfg) - if True: - ###with networking.logout(imaplib.IMAP4_SSL(cfg.host, cfg.port)) as imap: - ###imap.login(cfg.user, cfg.passwd) - # Close is only valid in the authenticated state. - ###with closing(imap) as imap: + #mark_relevant_threads(cfg) - info('selecting mailbox') - ###imap.select(cfg.mailbox) + with login_imap(cfg) as imap: + with closing(opendb(cfg, 'threaded')) as mid2msg: + flag_relevant_msgs(cfg, imap, mid2msg) - dbpath = cfg.cachedir / files.cleanse_filename(cfg.sender) - - # - # Fetch message IDs, references, and senders into persistent store. - # - - if cfg.refresh: - try: os.remove(dbpath) - except: pass - - if not cfg.no_fetch: - with closing(opendb(dbpath)) as mid2msg: - - minuid = mid2msg.get('maxuid', 1) - maxuid = getmaxuid(imap) - - # Every second item is just a closing paren. - # Example data: - # [('13300 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {67}', - # 'Message-ID: <mai...@py...>\r\n\r\n'), - # ')', - # ('13301 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {59}', - # 'Message-Id: <200...@hv...>\r\n\r\n'), - # ')', - # ('13302 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {92}', - # 'Message-ID: <C43EAFC0.2E3AE%ni...@ya...>\r\nIn-Reply-To: <481...@gm...>\r\n\r\n')] - pat = re.compile(r"(?P<seqno>\d+) \(UID (?P<uid>\d+) FLAGS \((?P<flags>[^)]+)\)") - for i, ((envelope, data), paren) in enumerate(seqs.grouper(2, getmail(imap, minuid, maxuid))): - # Parse the body. - msg = email.message_from_string(data) - - # Parse the envelope. - m = pat.match(envelope) - if m is None: raise Exception('envelope: %r' % envelope) - msg.seqno = m.group('seqno') - msg.uid = m.group('uid') - msg.flags = m.group('flags').split() - - # Prepare a container for references to other msgs, and initialize the - # thread ID. - msg.refs = set() - msg.tid = None - - # Add these to the map. - if msg['Message-ID'] in mid2msg: - log.warning( 'dups', 'duplicate message IDs:', - msg['Message-ID'], msg['Subject'] ) - mid2msg[ msg['Message-ID'] ] = msg - - # Periodically sync to disk. - if len(mid2msg.cache) > 1000: mid2msg.sync() - - mid2msg['maxuid'] = maxuid - - with closing(opendb(dbpath)) as mid2msg: - - info( 'maxuid', mid2msg['maxuid'] ) - - info( 'constructing undirected graph' ) - - for i, (mid, msg) in enumerate(mid2msg.iteritems()): - # Extract any references. - irt = msg.get_all('In-Reply-To', []) - refs = msg.get_all('References', []) - msg.refs.update( ' '.join( irt + refs ).replace('><', '> <').split() ) - - # Connect nodes in graph bidirectionally. Ignore references to MIDs - # that don't exist. - for ref in msg.refs: - try: mid2msg[ref].refs.add(msg['Message-ID']) - except KeyError: log.warning( 'no message with id', ref ) - - # Periodically sync to disk. - if len(mid2msg.cache) > 10000: - info( 'syncing; now at', i ) - mid2msg.sync() - - info('looking for relevant (grouping the messages into threads)') - - # Look for messages sent by us or addressed to us, and add their - # connected components into tid2msgs. - tids = itertools.count() - tid2msgs = collections.defaultdict(list) - for mid, msg in mid2msg.iteritems(): - if ( cfg.sender in msg.get('From', '' ) or - cfg.sender in msg.get('To', '' ) or - cfg.sender in msg.get('Cc', '' ) ): - thread_dfs(msg, tids.next(), mid2msg, tid2msgs) - - info( 'starring the relevant threads, in which I am a participant' ) - - rel_tids = set() - for srcmsg in srcs: - if srcmsg.tid not in rel_tids: - rel_tids.add(srcmsg.tid) - for msg in tid2msgs[srcmsg.tid]: - if r'\Flagged' not in msg.flags: - log.info( 'star', '\n', msg ) - if not cfg.pretend: - imap.uid('STORE', msg.uid, '+FLAGS', r'\Flagged') - if not cfg.no_mark_unseen and r'\Seen' in msg.flags: - imap.uid('STORE', msg.uid, '-FLAGS', r'\Seen') - - info( 'unstarring irrelevant threads, in which I am not a participant' ) - - all_tids = set( tid2msgs.iterkeys() ) - irrel_tids = all_tids - rel_tids - for tid in irrel_tids: - for msg in tid2msgs[tid]: - if r'\Flagged' in msg.flags: - log.info( 'unstar', '\n', msg ) - if not cfg.pretend: - imap.uid('STORE', msg.uid, '-FLAGS', r'\Flagged') - if not cfg.no_mark_seen and r'\Seen' not in msg.flags: - imap.uid('STORE', msg.uid, '+FLAGS', r'\Seen') - startup.run_main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |