[Assorted-commits] SF.net SVN: assorted: [186] python-afx/trunk/src/afx
Brought to you by:
yangzhang
|
From: <yan...@us...> - 2007-11-19 07:07:22
|
Revision: 186
http://assorted.svn.sourceforge.net/assorted/?rev=186&view=rev
Author: yangzhang
Date: 2007-11-18 23:07:21 -0800 (Sun, 18 Nov 2007)
Log Message:
-----------
fixed a subtle concurrency bug causing pubsub/fq to spin; added sigquit support
Modified Paths:
--------------
python-afx/trunk/src/afx/pubsub.py
python-afx/trunk/src/afx/startup.py
Modified: python-afx/trunk/src/afx/pubsub.py
===================================================================
--- python-afx/trunk/src/afx/pubsub.py 2007-11-19 07:06:23 UTC (rev 185)
+++ python-afx/trunk/src/afx/pubsub.py 2007-11-19 07:07:21 UTC (rev 186)
@@ -1,6 +1,8 @@
# -*- mode: python; tab-width: 2; indent-tabs-mode: nil; py-indent-offset: 2; -*-
# vim:et:sw=2:ts=2
+# TODO rename something to 'skip channels' (a la ch paper)
+
from __future__ import ( generators, with_statement )
from contextlib import closing
from cStringIO import StringIO
@@ -189,9 +191,6 @@
self.qs = {}
self.keys = []
self.incoming = condvar()
- self.size = 0
- def __len__( self ):
- return self.size
def _getq( self, k ):
try: q = self.qs[ k ]
except KeyError: q = self.qs[ k ] = af.channel( self.n )
@@ -199,29 +198,26 @@
@af.task
def put( self, k, x ):
q = self._getq( k )
- self.size += 1
self.incoming.notify_one()
yield q.put( x )
@af.task
def get( self ):
while True:
for k in self.keys:
- rem, x = self.qs[ k ].try_get()
- if rem:
- self.size -= 1
+ avail, x = self.qs[ k ].try_get()
+ if avail:
yield af.result( ( k, x ) )
else:
# garbage-collect
del self.qs[ k ]
else:
- if self.size == 0:
+ if len( self.qs ) == 0:
yield self.incoming.wait()
#### # garbage-collection
#### for k in self.qs.keys():
#### if k not in self.qs:
#### del self.qs[ k ]
self.keys = iter( self.qs.keys() )
- self.total = 0
# TODO can the following be done without learning about dispatchables?
Modified: python-afx/trunk/src/afx/startup.py
===================================================================
--- python-afx/trunk/src/afx/startup.py 2007-11-19 07:06:23 UTC (rev 185)
+++ python-afx/trunk/src/afx/startup.py 2007-11-19 07:07:21 UTC (rev 186)
@@ -38,7 +38,8 @@
yield q.get()
def run_main( main = None, do_force = False,
- use_threads = True, use_exiter = True, do_kintr = False ):
+ use_threads = True, use_exiter = True, do_kintr = False,
+ use_sigquit_handler = False ):
def runner( main, argv ):
tasks = [ main( argv ) ]
if use_threads:
@@ -59,4 +60,4 @@
ty,v,tr = exc
raise ty,v,tr
log.debug( __name__, 'runner end' )
- startup.run_main( main, do_force, runner )
+ startup.run_main( main, do_force, runner, use_sigquit_handler )
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|