| Revision: 253
          http://assorted.svn.sourceforge.net/assorted/?rev=253&view=rev
Author:   yangzhang
Date:     2008-01-19 22:32:13 -0800 (Sat, 19 Jan 2008)
Log Message:
-----------
added profiling of worker threads!
Modified Paths:
--------------
    python-afx/trunk/src/afx/threads.py
Modified: python-afx/trunk/src/afx/threads.py
===================================================================
--- python-afx/trunk/src/afx/threads.py	2008-01-20 06:31:46 UTC (rev 252)
+++ python-afx/trunk/src/afx/threads.py	2008-01-20 06:32:13 UTC (rev 253)
@@ -5,12 +5,13 @@
 from commons import log
 from commons.threads import synchronized
 from Queue import Queue
-from threading import Lock, Thread
+from threading import Lock, Thread, currentThread
 import socket
 from functools import partial, wraps
 from contextlib import closing
-import os
+from os import environ
 from sys import exc_info
+from cProfile import runctx
 
 import af
 
@@ -26,8 +27,8 @@
 info  = partial( log.info, __name__ )
 error = partial( log.error, __name__ )
 
-itc_port = 17777 if 'ITC_PORT' not in os.environ \
-           else int( os.environ[ 'ITC_PORT' ] )
+itc_port = 17777 if 'ITC_PORT' not in environ \
+           else int( environ[ 'ITC_PORT' ] )
 
 p = None
 
@@ -66,20 +67,27 @@
         debug( 'pool accepted' )
 
         def worker():
-            debug( 'starting worker' )
-            while True:
-                msg = i.get()
-                if msg is stop_msg: break
-                resultbuf, func, args, kwargs = msg
-                result, exc = None, None
-                try:
-                    result = func( *args, **kwargs )
-                except:
-                    t, v, tb = exc_info()
-                    exc = t, v, tb.tb_next
-                o.put( ( resultbuf, result, exc ) )
-                s.send( 'x' ) # assuming socket.send is thread-safe
-            debug( 'stopping worker' )
+            def real_worker():
+                debug( 'starting worker' )
+                while True:
+                    msg = i.get()
+                    if msg is stop_msg: break
+                    resultbuf, func, args, kwargs = msg
+                    result, exc = None, None
+                    try:
+                        result = func( *args, **kwargs )
+                    except:
+                        t, v, tb = exc_info()
+                        exc = t, v, tb.tb_next
+                    o.put( ( resultbuf, result, exc ) )
+                    s.send( 'x' ) # assuming socket.send is thread-safe
+                debug( 'stopping worker' )
+            if environ.get( 'PYPROF', '' ) != '':
+                try: outpath = environ['PYPROF'] % currentThread().getName()
+                except: real_worker()
+                else: runctx( 'real_worker()', locals(), globals(), outpath )
+            else:
+                real_worker()
 
         with self.lock:
             if not self.stopped:
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 |