From: Dirk M. <di...@fr...> - 2007-10-29 17:52:47
|
Author: dmeyer Date: Mon Oct 29 13:52:45 2007 New Revision: 2892 Log: add notifier type thread to run kaa.notifier besides a different mainloop Added: trunk/base/src/notifier/nf_thread.py trunk/base/test/kaa_in_twisted.py Modified: trunk/base/src/notifier/__init__.py trunk/base/src/notifier/pynotifier/nf_generic.py Modified: trunk/base/src/notifier/__init__.py ============================================================================== --- trunk/base/src/notifier/__init__.py (original) +++ trunk/base/src/notifier/__init__.py Mon Oct 29 13:52:45 2007 @@ -42,8 +42,6 @@ # kaa.notifier imports import nf_wrapper as notifier -init = notifier.init - from popen import * from callback import * from thread import * @@ -141,6 +139,16 @@ shutdown() +def init( module, **options ): + """ + Init the notifier. + """ + if module == 'thread': + import nf_thread + return nf_thread.init(options['handler']) + return notifier.init( module, **options ) + + def step(*args, **kwargs): """ Notifier step function with signal support. Added: trunk/base/src/notifier/nf_thread.py ============================================================================== --- (empty file) +++ trunk/base/src/notifier/nf_thread.py Mon Oct 29 13:52:45 2007 @@ -0,0 +1,67 @@ +import threading +import logging + +import kaa.notifier +import nf_wrapper + +# get logging object +log = logging.getLogger('notifier') + +class ThreadLoop(threading.Thread): + + def __init__(self, interleave): + super(ThreadLoop, self).__init__() + self.interleave = interleave + self.condition = threading.Semaphore(0) + self.sleeping = False + + def handle(self): + nf_wrapper.step(sleep = False) + self.condition.release() + + def run(self): + kaa.notifier.running = True + try: + while True: + self.sleeping = True + nf_wrapper.step(simulate = True) + self.sleeping = False + if not kaa.notifier.running: + break + self.interleave(self.handle) + self.condition.acquire() + except (KeyboardInterrupt, SystemExit): + pass + except Exception, e: + log.exception('loop') + kaa.notifier.running = False + kaa.notifier.shutdown() + + +class Wakeup(object): + def __init__(self, loop, func): + self.loop = loop + self.func = func + + def __call__(self, *args, **kwargs): + ret = self.func(*args, **kwargs) + if self.loop.sleeping: + kaa.notifier.wakeup() + return ret + + +def init( handler ): + """ + Init the notifier. + """ + loop = ThreadLoop(handler) + nf_wrapper.init( 'generic', use_pynotifier=False ) + # set main thread and init thread pipe + kaa.notifier.set_current_as_mainthread() + # adding a timer or socket is not thread safe in general but + # an additional wakeup we don't need does not hurt. And in + # simulation mode the step function does not modify the + # internal variables. + nf_wrapper.timer_add = Wakeup(loop, nf_wrapper.timer_add) + nf_wrapper.socket_add = Wakeup(loop, nf_wrapper.socket_add) + loop.start() Modified: trunk/base/src/notifier/pynotifier/nf_generic.py ============================================================================== --- trunk/base/src/notifier/pynotifier/nf_generic.py (original) +++ trunk/base/src/notifier/pynotifier/nf_generic.py Mon Oct 29 13:52:45 2007 @@ -109,7 +109,7 @@ ( INTERVAL, TIMESTAMP, CALLBACK ) = range( 3 ) -def step( sleep = True, external = True ): +def step( sleep = True, external = True, simulate = False ): """Do one step forward in the main loop. First all timers are checked for expiration and if necessary the accociated callback function is called. After that the timer list is searched for the next timer that will expire. @@ -140,6 +140,11 @@ continue now = int( time() * 1000 ) if timestamp <= now: + if simulate: + # we only simulate and we should be called + __step_depth -= 1 + __in_step = False + return # Update timestamp on timer before calling the callback to # prevent infinite recursion in case the callback calls # step(). @@ -206,6 +211,12 @@ log.exception( 'error in select' ) sys.exit( 1 ) + if simulate: + __step_depth -= 1 + __in_step = False + # we only simulate + return + for sl in ( ( r, IO_READ ), ( w, IO_WRITE ), ( e, IO_EXCEPT ) ): sockets, condition = sl # append all unknown sockets to check list Added: trunk/base/test/kaa_in_twisted.py ============================================================================== --- (empty file) +++ trunk/base/test/kaa_in_twisted.py Mon Oct 29 13:52:45 2007 @@ -0,0 +1,30 @@ +import sys +import kaa + +import kaa.notifier + +# get reactor +from twisted.internet import reactor + +def twisted_callback1(): + print "twisted", kaa.notifier.is_mainthread() + +def twisted_callback2(): + print "twisted (shutdown)", kaa.notifier.is_mainthread() + reactor.stop() + +def kaa_callback(): + print 'kaa', kaa.notifier.is_mainthread() + # sys.exit(0) + +kaa.notifier.init('thread', handler = reactor.callFromThread) + +reactor.callLater(2.5, twisted_callback1) +reactor.callLater(3.5, twisted_callback2) +kaa.notifier.Timer(kaa_callback).start(1) + +# you can either call notifier.main() or reactor.run() +reactor.run() + +kaa.notifier.shutdown() +print 'stop' |