[Assorted-commits] SF.net SVN: assorted: [186] python-afx/trunk/src/afx
Brought to you by:
yangzhang
From: <yan...@us...> - 2007-11-19 07:07:22
|
Revision: 186 http://assorted.svn.sourceforge.net/assorted/?rev=186&view=rev Author: yangzhang Date: 2007-11-18 23:07:21 -0800 (Sun, 18 Nov 2007) Log Message: ----------- fixed a subtle concurrency bug causing pubsub/fq to spin; added sigquit support Modified Paths: -------------- python-afx/trunk/src/afx/pubsub.py python-afx/trunk/src/afx/startup.py Modified: python-afx/trunk/src/afx/pubsub.py =================================================================== --- python-afx/trunk/src/afx/pubsub.py 2007-11-19 07:06:23 UTC (rev 185) +++ python-afx/trunk/src/afx/pubsub.py 2007-11-19 07:07:21 UTC (rev 186) @@ -1,6 +1,8 @@ # -*- mode: python; tab-width: 2; indent-tabs-mode: nil; py-indent-offset: 2; -*- # vim:et:sw=2:ts=2 +# TODO rename something to 'skip channels' (a la ch paper) + from __future__ import ( generators, with_statement ) from contextlib import closing from cStringIO import StringIO @@ -189,9 +191,6 @@ self.qs = {} self.keys = [] self.incoming = condvar() - self.size = 0 - def __len__( self ): - return self.size def _getq( self, k ): try: q = self.qs[ k ] except KeyError: q = self.qs[ k ] = af.channel( self.n ) @@ -199,29 +198,26 @@ @af.task def put( self, k, x ): q = self._getq( k ) - self.size += 1 self.incoming.notify_one() yield q.put( x ) @af.task def get( self ): while True: for k in self.keys: - rem, x = self.qs[ k ].try_get() - if rem: - self.size -= 1 + avail, x = self.qs[ k ].try_get() + if avail: yield af.result( ( k, x ) ) else: # garbage-collect del self.qs[ k ] else: - if self.size == 0: + if len( self.qs ) == 0: yield self.incoming.wait() #### # garbage-collection #### for k in self.qs.keys(): #### if k not in self.qs: #### del self.qs[ k ] self.keys = iter( self.qs.keys() ) - self.total = 0 # TODO can the following be done without learning about dispatchables? Modified: python-afx/trunk/src/afx/startup.py =================================================================== --- python-afx/trunk/src/afx/startup.py 2007-11-19 07:06:23 UTC (rev 185) +++ python-afx/trunk/src/afx/startup.py 2007-11-19 07:07:21 UTC (rev 186) @@ -38,7 +38,8 @@ yield q.get() def run_main( main = None, do_force = False, - use_threads = True, use_exiter = True, do_kintr = False ): + use_threads = True, use_exiter = True, do_kintr = False, + use_sigquit_handler = False ): def runner( main, argv ): tasks = [ main( argv ) ] if use_threads: @@ -59,4 +60,4 @@ ty,v,tr = exc raise ty,v,tr log.debug( __name__, 'runner end' ) - startup.run_main( main, do_force, runner ) + startup.run_main( main, do_force, runner, use_sigquit_handler ) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |