Menu

Invocation Of Several Thread Safe Event Handlers In Parallel

Dispatchers are one of SObjectizer's cornerstones. They are necessary to simplify the development of multithreaded applications where working threads are used for various tasks. Instead of manual manipulations with threads a programmer uses dispatchers and binds agents to an appropriate dispatcher. All further thread management and event scheduling will be performed by dispatchers.

SObjectizer provides several dispatchers. Some of them have unique features. One of such features will be discussed below.

There is adv_thread_pool dispatcher. It creates a number of working threads and invokes event handlers for agents on the context of these threads. But there is one key feature: adv_thread_pool understands thread safety marks for event handlers.

When a programmer subscribes an event handler for a message he can mark event handler as not thread safe or as thread safe. By default, all event handlers are treated as not thread safe and most SObjectizer's dispatchers simply ignore that mark. But if a programmer marks an event handler as thread safe and binds agent to adv_thread_pool dispatcher then this mark will be taken into account during event scheduling.

When a working thread of adv_thread_pool dispatcher extracts next message from some non-empty queue it checks thread safety of the event handler for message extracted. If this event handler is thread safe it will be called in parallel with other thread safe event handlers which are running on different threads.

But if the event handler is not thread safe then the working thread will return message to the queue and will try to handle another non-empty queue. The message will be postponed. It will be returned for processing when all previously started handlers finish their work.

Message will be also postponed if there is a running event handler which is not thread safe.

In other words, 1adv_thread_pool1 allows to run several thread safe event handlers in parallel. But only one not thread safe event handler can be running at any particular moment.

Let's consider small example. Suppose there are several messages:

struct M1 : public so_5::signal_t {};
struct M2 : public so_5::signal_t {};
struct M3 : public so_5::signal_t {};

struct stop : public so_5::signal_t {};

There are also two different agent types A and B. Agent A handles message M1 by event handler A.e1 and M3 by event handler A.e3 (it also handles stop for finishing the whole example). Agent B handles message M2 by event handler B.e2.

Event handlers A.e1 and B.e2 are thread safe. Event handlers A.e3 and A.stop are not thread safe.

For demonstration purposes event handlers do not do any useful work: they just print the name of event handler to standard output and suspend the current thread for some time. There is the code of these agent types:

class A final : public so_5::agent_t
{
public :
    A( context_t ctx ) : so_5::agent_t{ ctx }
    {
        // All messages are expected from message box with name "demo"
        so_subscribe( so_environment().create_mbox( "demo" ) )
            // Event handler A.e1 is thread safe because it explicitly marked
            // by using 'thread_safe' constant.
            .event< M1 >( [this] {
                    trace( *this, "A.e1 started" );
                    std::this_thread::sleep_for( pause );
                    trace( *this, "A.e1 finished" );
                },
                so_5::thread_safe )
            // Event handlers for other messages are not thread safe because
            // there is no special mark and all handlers are not thread safe by default.
            .event< M3 >( [this] {
                    trace( *this, "A.e3 started" );
                    std::this_thread::sleep_for( pause );
                    trace( *this, "A.e3 finished" );
                } )
            .event< stop >( [this] { so_environment().stop(); } );
    }
};

class B final : public so_5::agent_t
{
public :
    B( context_t ctx ) : so_5::agent_t{ ctx }
    {
        so_subscribe( so_environment().create_mbox( "demo" ) )
            .event< M2 >( [this] {
                    trace( *this, "B.e2 started" );
                    std::this_thread::sleep_for( pause );
                    trace( *this, "B.e2 finished" );
                },
                so_5::thread_safe );
    }
};

Let's create and bind agents of type A and B to adv_thread_pool:

namespace pool_disp = so_5::disp::adv_thread_pool;

env.introduce_coop(
    // New dispatcher instance is created by create_private_disp.
    // Method binder() returns an object for binding agents from
    // new coop to that dispatcher istance.
    pool_disp::create_private_disp( env, 4 )->binder(
        // Parameter fifo_type will be discussed later.
        pool_disp::bind_params_t{}.fifo( fifo_type ) ),
    []( so_5::coop_t & coop )
    {
        coop.make_agent< A >();
        coop.make_agent< B >();
    } );

And then send messages in a sequence {M1, M2, M3, M2, M1, stop}.

const auto m = env.create_mbox( "demo" );
so_5::send< M1 >( m );
so_5::send< M2 >( m );
so_5::send< M3 >( m );
so_5::send< M2 >( m );
so_5::send< M1 >( m );
so_5::send< stop >( m );

If fifo_type is fifo_t::individual then message queues for agent A and B will be different and will be handled independently. The queue for A will contain sequence {M1, M3, M1, stop}, the queue for B will contain {M2, M2}. See below the result of example execution (the first number is thread ID, then a pointer to agent in brackets):

14: (0x57c1f0) A.e1 started
12: (0x57c480) B.e2 started
13: (0x57c480) B.e2 started
14: (0x57c1f0) A.e1 finished
13: (0x57c480) B.e2 finished
14: (0x57c1f0) A.e3 started
12: (0x57c480) B.e2 finished
14: (0x57c1f0) A.e3 finished
14: (0x57c1f0) A.e1 started
14: (0x57c1f0) A.e1 finished

We can see that thread safe event handler B.e2 is called twice on different threads in parallel. But event handlers of agent A are called sequentially. It is because A.e3 is not thread safe and it cannot be run until previous A.e1 is finished. The same situation with second A.e1 invocation: A.e1 is thread safe and it cannot be called in parallel with not thread safe A.e3.

We can change fifo_type value to fifo_t::cooperation. In this case A and B will use one common queue which will contain the whole sequence {M1, M2, M3, M2, M1, stop}. Messages M1 and M2 have thread safe event handlers A.e1 and B.e2. These event handlers will be called in parallel on different threads. But event handler for M3 will not be called until finish of previously invoked A.e1 and B.e2. Only then A.e3 will be called. And only after finish of A.e3 event handlers for next M2 and M1 messages will be invoked. In parallel again because event handlers for them are thread safe.

The result of execution for fifo_t::cooperation looks like:

6: (0x57b7c0) B.e2 started
8: (0x57c2e0) A.e1 started
6: (0x57b7c0) B.e2 finished
8: (0x57c2e0) A.e1 finished
5: (0x57c2e0) A.e3 started
5: (0x57c2e0) A.e3 finished
5: (0x57b7c0) B.e2 started
8: (0x57c2e0) A.e1 started
8: (0x57c2e0) A.e1 finished
5: (0x57b7c0) B.e2 finished

But there can be a slightly different sequence like:

8: (0x57c2e0) A.e1 started
6: (0x57b7c0) B.e2 started
6: (0x57b7c0) B.e2 finished
8: (0x57c2e0) A.e1 finished
5: (0x57c2e0) A.e3 started
5: (0x57c2e0) A.e3 finished
5: (0x57b7c0) B.e2 started
8: (0x57c2e0) A.e1 started
8: (0x57c2e0) A.e1 finished
5: (0x57b7c0) B.e2 finished

Or:

6: (0x57b7c0) B.e2 started
8: (0x57c2e0) A.e1 started
8: (0x57c2e0) A.e1 finished
6: (0x57b7c0) B.e2 finished
5: (0x57c2e0) A.e3 started
5: (0x57c2e0) A.e3 finished
8: (0x57c2e0) A.e1 started
5: (0x57b7c0) B.e2 started
8: (0x57c2e0) A.e1 finished
5: (0x57b7c0) B.e2 finished

But invocation of A.e3 will always be in the middle of pairs A.e1+B.e2. Because all previously started event handlers must be finished before A.e3 will be called.


Where can this feature of adv_thread_pool dispatcher be useful? In cases where immutable operations need to be distributed to several working threads. For example there could be cryptographer agent which performs make_signature, check_signature, encrypt_block and decrypt_block operations.

These operations are thread safe because they don't change the state of cryptographer agent. Event handlers for these operations can be marked as thread safe. This allows to handle several cryptographic operations at the same time in parallel.

But cryptographer agent can also have a not thread safe event handler for message reconfigure. In this case adv_thread_pool dispatcher guarantees that all thread safe event handlers finish their work before event handler for reconfigure message will be started.

A sketch for such cryptographer agent looks like:

class cryptographer : public so_5::agent_t
{
   void on_make_signature( const make_signature & msg ) {...}
   void on_check_signature( const check_signature & msg ) {...}
   void on_encrypt_block( const encrypt_block & msg ) {...}
   void on_decrypt_block( const decrypt_block & msg ) {...}
   void on_reconfiguration( const reconfigure & msg ) {...}
...
   virtual void so_define_agent() override {
      so_subscribe_self()
        .event( &cryptographer::on_make_signature, so_5::thread_safe )
        .event( &cryptographer::on_check_signature, so_5::thread_safe )
        .event( &cryptographer::on_encrypt_block, so_5::thread_safe )
        .event( &cryptographer::on_decrypt_block, so_5::thread_safe )
        .event( &cryptographer::on_reconfigure, so_5::not_thread_safe )
      ...
   }
}

We hope this post shows how adv_thread_pool dispatcher can simplify the development of agents which require distribution of event processing on several working threads. It is not the universal solution but if event handlers are thread safe methods (and sometimes they are) this approach can be useful.

Note. Code fragments with agents A and B are extracted from new example which was added to the main so-5.5 development brunch during the work on eighth part of "Dive into SObjectizer-5.5" serie presentations.This example is a part of SObjectizer since v.5.5.15.1. Its source code can be found in the repository.

Posted by Yauheni Akhotnikau 2016-01-28

Log in to post a comment.

Want the latest updates on software, tech news, and AI?
Get latest updates about software, tech news, and AI from SourceForge directly in your inbox once a month.