Thread: [Assorted-commits] SF.net SVN: assorted:[1420] mailing-list-filter/trunk/src/mlf.py
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-05-15 16:47:42
|
Revision: 1420 http://assorted.svn.sourceforge.net/assorted/?rev=1420&view=rev Author: yangzhang Date: 2009-05-15 16:47:28 +0000 (Fri, 15 May 2009) Log Message: ----------- first major update in a long time - switched to new sqlhash storage system - added --no-fetch, --refresh Modified Paths: -------------- mailing-list-filter/trunk/src/mlf.py Modified: mailing-list-filter/trunk/src/mlf.py =================================================================== --- mailing-list-filter/trunk/src/mlf.py 2009-05-15 07:01:14 UTC (rev 1419) +++ mailing-list-filter/trunk/src/mlf.py 2009-05-15 16:47:28 UTC (rev 1420) @@ -1,5 +1,7 @@ #!/usr/bin/env python +# See RFC 3501. + """ Given a Gmail IMAP mailbox, star all messages in which you were a participant (either a sender or an explicit recipient in To: or Cc:), where thread grouping @@ -7,43 +9,36 @@ """ from __future__ import with_statement -from collections import defaultdict -from email import message_from_string -from getpass import getpass -from imaplib import IMAP4_SSL -from argparse import ArgumentParser from path import path -from re import match from functools import partial -from itertools import count -from commons.decs import pickle_memoized -from commons.files import cleanse_filename, soft_makedirs from commons.log import * -from commons.misc import default_if_none, seq -from commons.networking import logout -from commons.seqs import concat, grouper -from commons.startup import run_main from contextlib import closing -import logging -from commons import log +import getpass, logging, shelve, email, re, os, imaplib, itertools, argparse, collections +from commons import log, startup, seqs, networking, files, sqlhash info = partial(log.info, 'main') debug = partial(log.debug, 'main') warning = partial(log.warning, 'main') error = partial(log.error, 'main') die = partial(log.die, 'main') +exception = partial(log.exception, 'main') -def thread_dfs(msg, tid, tid2msgs): +def opendb(dbpath): + return sqlhash.Shelf(sqlhash.SQLhash(dbpath, flags = 'w'), + protocol = 2, writeback = True) + +def thread_dfs(msg, tid, mid2msg, tid2msgs): assert msg.tid is None msg.tid = tid tid2msgs[tid].append(msg) - for ref in msg.refs: - if ref.tid is None: - thread_dfs(ref, tid, tid2msgs) + for refmid in msg.refs: + refmsg = mid2msg[refmid] + if refmsg.tid is None: + thread_dfs(refmsg, tid, mid2msg, tid2msgs) else: - assert ref.tid == tid + assert refmsg.tid == tid -def getmail(imap): +def getmaxuid(imap): info( 'finding max UID' ) # We use UIDs rather than the default of sequence numbers because UIDs are # guaranteed to be persistent across sessions. This means that we can, for @@ -52,8 +47,10 @@ ok, [uids] = imap.uid('SEARCH', None, 'ALL') maxuid = int( uids.split()[-1] ) del uids + return maxuid - info( 'actually fetching the messages in chunks up to max', maxuid ) +def getmail(imap, minuid, maxuid): + info( 'fetching messages', minuid, 'to', maxuid ) # The syntax/fields of the FETCH command is documented in RFC 2060. Also, # this article contains a brief overview: # http://www.devshed.com/c/a/Python/Python-Email-Libraries-part-2-IMAP/3/ @@ -61,19 +58,32 @@ query = '(FLAGS BODY.PEEK[HEADER.FIELDS ' \ '(Message-ID References In-Reply-To From To Cc Subject)])' step = 1000 - return list( concat( - seq( lambda: info('fetching', start, 'to', start + step - 1), - lambda: imap.uid('FETCH', '%d:%d' % (start, start + step - 1), - query)[1] ) - for start in xrange(1, maxuid + 1, step) ) ) + for start in xrange(minuid, maxuid + 1, step): + range = '%d:%d' % (start, min(maxuid, start + step - 1)) + while True: + try: + info('fetching', range) + ok, chunk = imap.uid('FETCH', range, query) + except imaplib.abort, ex: + error('fetch failed:', ex.message) + if 'System Error' not in ex.message: raise + except: + exception('fetch failed') + raise + else: + break + for row in chunk: + yield row def main(argv): - p = ArgumentParser(description = __doc__) + p = argparse.ArgumentParser(description = __doc__) p.add_argument('--credfile', default = path( '~/.mlf.auth' ).expanduser(), help = """File containing your login credentials, with the username on the first line and the password on the second line. Ignored iff --prompt.""") p.add_argument('--cachedir', default = path( '~/.mlf.cache' ).expanduser(), help = "Directory to use for caching our data.") + p.add_argument('--refresh', action = 'store_true', + help = "Re-fetch all messages, wiping out existing cache.") p.add_argument('--prompt', action = 'store_true', help = "Interactively prompt for the username and password.") p.add_argument('--pretend', action = 'store_true', @@ -83,6 +93,8 @@ help = "Do not mark newly revelant threads as unread.") p.add_argument('--no-mark-seen', action = 'store_true', help = "Do not mark newly irrevelant threads as read.") + p.add_argument('--no-fetch', action = 'store_true', + help = "Do not fetch new messages; just process already-fetched messages.") p.add_argument('--debug', action = 'append', default = [], help = """Enable logging for messages of the given flags. Flags include: refs (references to missing Message-IDs), dups (duplicate Message-IDs), @@ -101,136 +113,147 @@ print "username:", cfg.user = raw_input() print "password:", - cfg.passwd = getpass() + cfg.passwd = getpass.getpass() else: with file(cfg.credfile) as f: [cfg.user, cfg.passwd] = map(lambda x: x.strip('\r\n'), f.readlines()) try: - m = match( r'(?P<host>[^:/]+)(:(?P<port>\d+))?(/(?P<mailbox>.+))?$', + m = re.match( r'(?P<host>[^:/]+)(:(?P<port>\d+))?(/(?P<mailbox>.+))?$', cfg.server ) cfg.host = m.group('host') - cfg.port = int( default_if_none(m.group('port'), 993) ) - cfg.mailbox = default_if_none(m.group('mailbox'), 'INBOX') + cfg.port = int( m.group('port') or 993 ) + cfg.mailbox = m.group('mailbox') or 'INBOX' except: p.error('Need to specify the server in the correct format.') - soft_makedirs(cfg.cachedir) + files.soft_makedirs(cfg.cachedir) - with logout(IMAP4_SSL(cfg.host, cfg.port)) as imap: - imap.login(cfg.user, cfg.passwd) + info('connecting and logging in') + + if True: + ###with networking.logout(imaplib.IMAP4_SSL(cfg.host, cfg.port)) as imap: + ###imap.login(cfg.user, cfg.passwd) # Close is only valid in the authenticated state. - with closing(imap) as imap: - # Select the main mailbox (INBOX). - imap.select(cfg.mailbox) + ###with closing(imap) as imap: - # Fetch message IDs, references, and senders. - xs = pickle_memoized \ - (lambda imap: cfg.cachedir / cleanse_filename(cfg.sender)) \ - (getmail) \ - (imap) + info('selecting mailbox') + ###imap.select(cfg.mailbox) - log.debug('fetched', xs) + dbpath = cfg.cachedir / files.cleanse_filename(cfg.sender) - info('building message-id map and determining the set of messages sent ' - 'by you or addressed to you (the "source set")') + # + # Fetch message IDs, references, and senders into persistent store. + # - srcs = [] - mid2msg = {} - # Every second item is just a closing paren. - # Example data: - # [('13300 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {67}', - # 'Message-ID: <mai...@py...>\r\n\r\n'), - # ')', - # ('13301 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {59}', - # 'Message-Id: <200...@hv...>\r\n\r\n'), - # ')', - # ('13302 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {92}', - # 'Message-ID: <C43EAFC0.2E3AE%ni...@ya...>\r\nIn-Reply-To: <481...@gm...>\r\n\r\n')] - for (envelope, data), paren in grouper(2, xs): - # Parse the body. - msg = message_from_string(data) + if cfg.refresh: + try: os.remove(dbpath) + except: pass - # Parse the envelope. - m = match( - r"(?P<seqno>\d+) \(UID (?P<uid>\d+) FLAGS \((?P<flags>[^)]+)\)", - envelope ) - msg.seqno = m.group('seqno') - msg.uid = m.group('uid') - msg.flags = m.group('flags').split() + if not cfg.no_fetch: + with closing(opendb(dbpath)) as mid2msg: - # Prepare a container for references to other msgs, and initialize the - # thread ID. - msg.refs = [] - msg.tid = None + minuid = mid2msg.get('maxuid', 1) + maxuid = getmaxuid(imap) - # Add these to the map. - if msg['Message-ID'] in mid2msg: - log.warning( 'dups', 'duplicate message IDs:', - msg['Message-ID'], msg['Subject'] ) - mid2msg[ msg['Message-ID'] ] = msg + # Every second item is just a closing paren. + # Example data: + # [('13300 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {67}', + # 'Message-ID: <mai...@py...>\r\n\r\n'), + # ')', + # ('13301 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {59}', + # 'Message-Id: <200...@hv...>\r\n\r\n'), + # ')', + # ('13302 (BODY[HEADER.FIELDS (Message-ID References In-Reply-To)] {92}', + # 'Message-ID: <C43EAFC0.2E3AE%ni...@ya...>\r\nIn-Reply-To: <481...@gm...>\r\n\r\n')] + pat = re.compile(r"(?P<seqno>\d+) \(UID (?P<uid>\d+) FLAGS \((?P<flags>[^)]+)\)") + for i, ((envelope, data), paren) in enumerate(seqs.grouper(2, getmail(imap, minuid, maxuid))): + # Parse the body. + msg = email.message_from_string(data) - # Add to "srcs" set if sent by us or addressed to us. - if ( cfg.sender in default_if_none( msg['From'], '' ) or - cfg.sender in default_if_none( msg['To'], '' ) or - cfg.sender in default_if_none( msg['Cc'], '' ) ): - srcs.append( msg ) + # Parse the envelope. + m = pat.match(envelope) + if m is None: raise Exception('envelope: %r' % envelope) + msg.seqno = m.group('seqno') + msg.uid = m.group('uid') + msg.flags = m.group('flags').split() - info( 'constructing undirected graph' ) + # Prepare a container for references to other msgs, and initialize the + # thread ID. + msg.refs = set() + msg.tid = None - for mid, msg in mid2msg.iteritems(): - # Extract any references. - irt = default_if_none( msg.get_all('In-Reply-To'), [] ) - refs = default_if_none( msg.get_all('References'), [] ) - refs = set( ' '.join( irt + refs ).replace('><', '> <').split() ) + # Add these to the map. + if msg['Message-ID'] in mid2msg: + log.warning( 'dups', 'duplicate message IDs:', + msg['Message-ID'], msg['Subject'] ) + mid2msg[ msg['Message-ID'] ] = msg - # Connect nodes in graph bidirectionally. Ignore references to MIDs - # that don't exist. - for ref in refs: - try: - refmsg = mid2msg[ref] - # We can use lists/append (not worry about duplicates) because the - # original sources should be acyclic. If a -> b, then there is no b -> - # a, so when crawling a we can add a <-> b without worrying that later - # we may re-add b -> a. - msg.refs.append(refmsg) - refmsg.refs.append(msg) - except: - log.warning( 'refs', ref ) + # Periodically sync to disk. + if len(mid2msg.cache) > 1000: mid2msg.sync() - info('finding connected components (grouping the messages into threads)') + mid2msg['maxuid'] = maxuid - tids = count() - tid2msgs = defaultdict(list) - for mid, msg in mid2msg.iteritems(): - if msg.tid is None: - thread_dfs(msg, tids.next(), tid2msgs) + with closing(opendb(dbpath)) as mid2msg: - info( 'starring the relevant threads, in which I am a participant' ) + info( 'maxuid', mid2msg['maxuid'] ) - rel_tids = set() - for srcmsg in srcs: - if srcmsg.tid not in rel_tids: - rel_tids.add(srcmsg.tid) - for msg in tid2msgs[srcmsg.tid]: - if r'\Flagged' not in msg.flags: - log.info( 'star', '\n', msg ) - if not cfg.pretend: - imap.uid('STORE', msg.uid, '+FLAGS', r'\Flagged') - if not cfg.no_mark_unseen and r'\Seen' in msg.flags: - imap.uid('STORE', msg.uid, '-FLAGS', r'\Seen') + info( 'constructing undirected graph' ) - info( 'unstarring irrelevant threads, in which I am not a participant' ) + for i, (mid, msg) in enumerate(mid2msg.iteritems()): + # Extract any references. + irt = msg.get_all('In-Reply-To', []) + refs = msg.get_all('References', []) + msg.refs.update( ' '.join( irt + refs ).replace('><', '> <').split() ) - all_tids = set( tid2msgs.iterkeys() ) - irrel_tids = all_tids - rel_tids - for tid in irrel_tids: - for msg in tid2msgs[tid]: - if r'\Flagged' in msg.flags: - log.info( 'unstar', '\n', msg ) - if not cfg.pretend: - imap.uid('STORE', msg.uid, '-FLAGS', r'\Flagged') - if not cfg.no_mark_seen and r'\Seen' not in msg.flags: - imap.uid('STORE', msg.uid, '+FLAGS', r'\Seen') + # Connect nodes in graph bidirectionally. Ignore references to MIDs + # that don't exist. + for ref in msg.refs: + try: mid2msg[ref].refs.add(msg['Message-ID']) + except KeyError: log.warning( 'no message with id', ref ) -run_main() + # Periodically sync to disk. + if len(mid2msg.cache) > 10000: + info( 'syncing; now at', i ) + mid2msg.sync() + + info('looking for relevant (grouping the messages into threads)') + + # Look for messages sent by us or addressed to us, and add their + # connected components into tid2msgs. + tids = itertools.count() + tid2msgs = collections.defaultdict(list) + for mid, msg in mid2msg.iteritems(): + if ( cfg.sender in msg.get('From', '' ) or + cfg.sender in msg.get('To', '' ) or + cfg.sender in msg.get('Cc', '' ) ): + thread_dfs(msg, tids.next(), mid2msg, tid2msgs) + + info( 'starring the relevant threads, in which I am a participant' ) + + rel_tids = set() + for srcmsg in srcs: + if srcmsg.tid not in rel_tids: + rel_tids.add(srcmsg.tid) + for msg in tid2msgs[srcmsg.tid]: + if r'\Flagged' not in msg.flags: + log.info( 'star', '\n', msg ) + if not cfg.pretend: + imap.uid('STORE', msg.uid, '+FLAGS', r'\Flagged') + if not cfg.no_mark_unseen and r'\Seen' in msg.flags: + imap.uid('STORE', msg.uid, '-FLAGS', r'\Seen') + + info( 'unstarring irrelevant threads, in which I am not a participant' ) + + all_tids = set( tid2msgs.iterkeys() ) + irrel_tids = all_tids - rel_tids + for tid in irrel_tids: + for msg in tid2msgs[tid]: + if r'\Flagged' in msg.flags: + log.info( 'unstar', '\n', msg ) + if not cfg.pretend: + imap.uid('STORE', msg.uid, '-FLAGS', r'\Flagged') + if not cfg.no_mark_seen and r'\Seen' not in msg.flags: + imap.uid('STORE', msg.uid, '+FLAGS', r'\Seen') + +startup.run_main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-06-03 01:22:24
|
Revision: 1444 http://assorted.svn.sourceforge.net/assorted/?rev=1444&view=rev Author: yangzhang Date: 2009-06-03 00:17:16 +0000 (Wed, 03 Jun 2009) Log Message: ----------- added faster incremental processing Modified Paths: -------------- mailing-list-filter/trunk/src/mlf.py Modified: mailing-list-filter/trunk/src/mlf.py =================================================================== --- mailing-list-filter/trunk/src/mlf.py 2009-06-02 10:02:44 UTC (rev 1443) +++ mailing-list-filter/trunk/src/mlf.py 2009-06-03 00:17:16 UTC (rev 1444) @@ -95,14 +95,14 @@ for row in chunk: yield row -def fetch_new_mail(cfg): +def fetch_new_mail(cfg, imap): if cfg.refresh: - try: os.remove(dbpath(cfg, 'mail')) + try: os.remove(dbpath(cfg, 'fetched')) except: pass - with closing(opendb(cfg, 'mail', 'w')) as mid2msg: + with closing(opendb(cfg, 'fetched', 'w')) as mid2msg: - minuid = mid2msg.get('maxuid', 1) + minuid = mid2msg.get('maxuid', 0) + 1 maxuid = getmaxuid(imap) # Every second item is just a closing paren. @@ -127,40 +127,39 @@ msg.uid = m.group('uid') msg.flags = m.group('flags').split() -# # Prepare a container for references to other msgs, and initialize the -# # thread ID. + # Prepare a container for references to other msgs, and initialize the + # thread ID. msg.refs = set() msg.tid = None # Add these to the map. if msg['Message-ID'] in mid2msg: - warning( 'duplicate message IDs:', - msg['Message-ID'], msg['Subject'] ) + log.warning( 'dups', 'duplicate message IDs:', + msg['Message-ID'], msg['Subject'] ) mid2msg[ msg['Message-ID'] ] = msg # Periodically sync to disk. if len(mid2msg.cache) > 1000: mid2msg.sync() mid2msg['maxuid'] = maxuid + # XXX + mid2msg['procuid'] = mid2msg['maxuid'] # # Function for analyzing messages. # -def iterpairs(mid2msg, midsrc): - for mid in midsrc: - yield mid, mid2msg[mid] - -def itermsgs(mid2msg, midsrc = None): - pairs = ( mid2msg.iteritems() - if midsrc is None else - iterpairs(mid2msg, midsrc) ) - for i, (mid, msg) in enumerate(pairs): - if mid != 'maxuid': yield msg - # Periodically sync to disk. - if mid2msg.cache is not None and len(mid2msg.cache) > 10000: - debug( 'syncing; now at i', i, 'mid', mid ) - mid2msg.sync() +def itermsgs(mid2msg, minuid, midsrc = None): + if midsrc is None: midsrc = mid2msg + special_keys = ['maxuid', 'procuid'] + for i, (mid, msg) in enumerate(midsrc.iteritems()): + if mid not in special_keys and msg.uid >= minuid: + if midsrc is mid2msg: yield msg + else: yield mid2msg[mid] + # Periodically sync to disk. + if mid2msg.cache is not None and len(mid2msg.cache) > 10000: + debug( 'syncing; now at i', i, 'mid', mid ) + mid2msg.sync() debug( 'syncing; finished after i', i, 'mid', mid ) mid2msg.sync() @@ -171,44 +170,43 @@ try: refmsg = mid2msg[refmid] except KeyError: - warning('no message with mid', refmid) + log.warning('badref', 'no message with mid', refmid) else: if refmsg.tid is None: thread_dfs(refmsg, tid, mid2msg) else: assert refmsg.tid == tid def mark_relevant_threads(cfg): -# shutil.copy(dbpath(cfg, 'fetched'), dbpath(cfg, 'bidir')) -# -# with closing(opendb(cfg, 'fetched')) as midsrc: -# with closing(opendb(cfg, 'bidir', 'w')) as mid2msg: -# info( 'maxuid', midsrc['maxuid'] ) -# info( 'constructing bidirectional ref graph' ) -# for msg in itermsgs(mid2msg, midsrc): -# debug('processing', msg['Message-ID']) -# # XXX -# if not hasattr(msg, 'refs'): msg.refs = set() -# if not hasattr(msg, 'tid'): msg.tid = None -# # XXX -# irt = msg.get_all('In-Reply-To', []) -# refs = msg.get_all('References', []) -# newrefs = ' '.join( irt + refs ).replace('><', '> <').split() -# msg.refs.update( newrefs ) -# -# # Connect nodes in graph bidirectionally. Ignore references to MIDs -# # that don't exist. -# for ref in newrefs: -# debug('adding', ref, '<->', msg['Message-ID']) -# try: mid2msg[ref].refs.add( msg['Message-ID'] ) -# except KeyError: warning( 'no message with mid', ref ) + shutil.copy(dbpath(cfg, 'fetched'), dbpath(cfg, 'bidir')) + with closing(opendb(cfg, 'fetched')) as midsrc: + with closing(opendb(cfg, 'bidir', 'w')) as mid2msg: + procuid = mid2msg.get('procuid', 0) + minuid = procuid + 1 + info( 'maxuid', midsrc['maxuid'], 'procuid', procuid ) + del procuid + info( 'constructing bidirectional ref graph' ) + for msg in itermsgs(mid2msg, minuid, midsrc): + debug('processing', msg['Message-ID']) + irt = msg.get_all('In-Reply-To', []) + refs = msg.get_all('References', []) + newrefs = ' '.join( irt + refs ).replace('><', '> <').split() + msg.refs.update( newrefs ) + + # Connect nodes in graph bidirectionally. Ignore references to MIDs + # that don't exist. + for ref in newrefs: + debug('adding', ref, '<->', msg['Message-ID']) + try: mid2msg[ref].refs.add( msg['Message-ID'] ) + except KeyError: log.warning( 'badref', 'no message with mid', ref ) + shutil.copy(dbpath(cfg, 'bidir'), dbpath(cfg, 'threaded')) with closing(opendb(cfg, 'bidir')) as midsrc: with closing(opendb(cfg, 'threaded', 'w')) as mid2msg: info( 'looking for relevant msgs (grouping them into threads)' ) tids = itertools.count() - for msg in itermsgs(mid2msg, midsrc): + for msg in itermsgs(mid2msg, minuid, midsrc): debug('threading', msg['Message-ID']) if ( msg.tid is None and ( cfg.sender in msg.get('From', '') or @@ -224,7 +222,7 @@ info( 'starring/unstarring relevant/irrelevant threads' ) - for msg in itermsgs(mid2msg): + for msg in itermsgs(mid2msg, 0): if msg.tid is not None: # is a relevant msgs if r'\Flagged' not in msg.flags: # not already flagged mark_unseen = not cfg.no_mark_unseen and r'\Seen' in msg.flags @@ -244,6 +242,8 @@ imap.uid('STORE', msg.uid, '-FLAGS', r'\Flagged') if mark_seen: imap.uid('STORE', msg.uid, '+FLAGS', r'\Seen') + mid2msg['procuid'] = mid2msg['maxuid'] + # # Main function. # @@ -304,9 +304,9 @@ if not cfg.no_fetch: with login_imap(cfg) as imap: - fetch_new_mail(cfg) + fetch_new_mail(cfg, imap) - #mark_relevant_threads(cfg) + mark_relevant_threads(cfg) with login_imap(cfg) as imap: with closing(opendb(cfg, 'threaded')) as mid2msg: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-06-03 17:07:54
|
Revision: 1445 http://assorted.svn.sourceforge.net/assorted/?rev=1445&view=rev Author: yangzhang Date: 2009-06-03 17:07:53 +0000 (Wed, 03 Jun 2009) Log Message: ----------- using python commons path Modified Paths: -------------- mailing-list-filter/trunk/src/mlf.py Modified: mailing-list-filter/trunk/src/mlf.py =================================================================== --- mailing-list-filter/trunk/src/mlf.py 2009-06-03 00:17:16 UTC (rev 1444) +++ mailing-list-filter/trunk/src/mlf.py 2009-06-03 17:07:53 UTC (rev 1445) @@ -9,13 +9,13 @@ """ from __future__ import with_statement -from path import path from functools import partial from commons.log import * from contextlib import closing, contextmanager import getpass, logging, shelve, email, re, os, imaplib, itertools import argparse, collections, subprocess, shutil from commons import log, startup, seqs, networking, files, sqlhash +from commons.path import path info = partial(log.info, 'main') debug = partial(log.debug, 'main') This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2010-02-16 23:09:17
|
Revision: 1568 http://assorted.svn.sourceforge.net/assorted/?rev=1568&view=rev Author: yangzhang Date: 2010-02-16 23:09:07 +0000 (Tue, 16 Feb 2010) Log Message: ----------- started rewrite to use external algorithms Modified Paths: -------------- mailing-list-filter/trunk/src/mlf.py Modified: mailing-list-filter/trunk/src/mlf.py =================================================================== --- mailing-list-filter/trunk/src/mlf.py 2010-02-13 01:34:07 UTC (rev 1567) +++ mailing-list-filter/trunk/src/mlf.py 2010-02-16 23:09:07 UTC (rev 1568) @@ -13,7 +13,7 @@ from commons.log import * from contextlib import closing, contextmanager import getpass, logging, shelve, email, re, os, imaplib, itertools -import argparse, collections, subprocess, shutil +import argparse, collections, subprocess, shutil, cPickle as pickle from commons import log, startup, seqs, networking, files, sqlhash from commons.path import path @@ -65,6 +65,9 @@ else: imap.logout() +@contextmanager +def dummyctx(): yield + # # Functions for fetching messages from the server. # @@ -162,6 +165,7 @@ if midsrc is None: midsrc = mid2msg special_keys = ['maxuid', 'procuid'] for i, (mid, msg) in enumerate(midsrc.iteritems()): + if i % 10000 == 0: log.debug('readscan', i) if mid not in special_keys and msg.uid >= minuid: if midsrc is mid2msg: yield msg else: yield mid2msg[mid] @@ -321,14 +325,114 @@ with login_imap(cfg) as imap: fetch_new_mail(cfg, imap) - mark_relevant_threads(cfg) + g['reversed'] = reverse(g['fetched']) + g['sorted-reversed'] = sort(g['reversed']) + g['fetched'] = union(g['fetched'], g['sorted-reversed']) + g['flagged'] = flag_relevant(g['unioned']) + process(g) + sync_server(g['flagged']) + #reverse_graph('fetched', 'reversed') + #sort_graph('reversed', 'sorted-reversed') + #union_graphs('fetched', 'sorted-reversed', 'unioned') + #flag_relevant('unioned', 'flagged') + #sync_server('flagged') + + def gen_graph(path): + with file(path) as f: + for line in f: + yield ('path', []) + + def reverse_graph(g): + for line in g: + aoeu + + def union_graphs(g0, g1): + aoeu + + + if not os.path.exists(dbpath(cfg, 'tagged')): + tagged = set() + else: + with file(dbpath(cfg, 'tagged')) as f: tagged = pickle.load(f) + + tagged = tag_roots(cfg) + with file(dbpath(cfg, 'tagged'), 'w') as f: pickle.dump(tagged, f, 2) + + tagged = tag_connected(cfg, tagged) + + return + #mark_relevant_threads(cfg) + shutil.copy(dbpath(cfg, 'threaded'), dbpath(cfg, 'flagged')) - with login_imap(cfg) as imap: + with (dummyctx() if cfg.no_fetch else login_imap(cfg)) as imap: with closing(opendb(cfg, 'threaded')) as midsrc: with closing(opendb(cfg, 'flagged', 'w')) as mid2msg: flag_relevant_msgs(cfg, imap, midsrc, mid2msg) - os.rename(dbpath(cfg, 'fetched'), dbpath(cfg, 'fetched-old')) - os.rename(dbpath(cfg, 'flagged'), dbpath(cfg, 'fetched')) + #os.rename(dbpath(cfg, 'fetched'), dbpath(cfg, 'fetched-old')) + #os.rename(dbpath(cfg, 'flagged'), dbpath(cfg, 'fetched')) +#class graph_op(object): pass +def read(path): return partial(_read_graph, path) +def reverse(g): return partial() +def union(g1, g2): return partial('union') +def materialize(g): return g() + +def _read_graph(path): + with file(path) as f: + return f.read() + +def _reverse_graph(g): + outpath = g() + +def tag_roots(cfg, tagged): + log.info('roots', 'tagging roots; len(tagged) =', len(tagged)) + with closing(opendb(cfg, 'threaded')) as m: + count = 0 + for msg in itermsgs(m, 0): + if ( cfg.sender in msg.get('From', '') or + cfg.sender in msg.get('To', '') or + cfg.sender in msg.get('Cc', '') ): + tagged.add(msg['Message-ID']) + count += 1 + log.debug('roots', + 'count', count, + 'len(tagged)', len(tagged), + 'msgid', msg['Message-ID']) + log.info('roots', 'tagged', count, 'roots; len(tagged) =', len(tagged)) + +def refs(msg): + a = msg.get_all('In-Reply-To', []) + b = msg.get_all('References', []) + return ' '.join(a + b).replace('><', '> <').split() + +def tag_connected(cfg, tagged): + ''' + Scan messages, tagging anything that + already tagged. + ''' + + def dfs_tag(msg): + for refid in refs(msg): + if refid not in tagged: + tagged.add(refid) + # disk seeks here, but there shouldn't be too many, and locality in + # time means they're probably in the already cache (since we're + # scanning from earliest message to latest, and most messages are + # received in order, i.e. most references are back-references) + dfs_tag(msgs[ref]) + + log.info('tag_connected', 'tagging connected; len(tagged) =', len(tagged)) + with closing(opendb(cfg, 'threaded')) as m: + for msg in itermsgs(m, 0): + msgid = msg['Message-ID'] + + if ( cfg.sender in msg.get('From', '') or + cfg.sender in msg.get('To', '') or + cfg.sender in msg.get('Cc', '') ): + if msgid in tagged or any(refid in tagged for refid in refs(msg)): + dfs_tag(msg) + log.info('tag_connected', + 'tagged', count, 'msgs; len(tagged) = ', len(tagged)) + startup.run_main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |