Dispatchers are one of SObjectizer's cornerstones. They are necessary to simplify the development of multithreaded applications where working threads are used for various tasks. Instead of manual manipulations with threads a programmer uses dispatchers and binds agents to an appropriate dispatcher. All further thread management and event scheduling will be performed by dispatchers.
SObjectizer provides several dispatchers. Some of them have unique features. One of such features will be discussed below.
There is adv_thread_pool
dispatcher. It creates a number of working threads and invokes event handlers for agents on the context of these threads. But there is one key feature: adv_thread_pool
understands thread safety marks for event handlers.
When a programmer subscribes an event handler for a message he can mark event handler as not thread safe or as thread safe. By default, all event handlers are treated as not thread safe and most SObjectizer's dispatchers simply ignore that mark. But if a programmer marks an event handler as thread safe and binds agent to adv_thread_pool
dispatcher then this mark will be taken into account during event scheduling.
When a working thread of adv_thread_pool
dispatcher extracts next message from some non-empty queue it checks thread safety of the event handler for message extracted. If this event handler is thread safe it will be called in parallel with other thread safe event handlers which are running on different threads.
But if the event handler is not thread safe then the working thread will return message to the queue and will try to handle another non-empty queue. The message will be postponed. It will be returned for processing when all previously started handlers finish their work.
Message will be also postponed if there is a running event handler which is not thread safe.
In other words, 1adv_thread_pool1 allows to run several thread safe event handlers in parallel. But only one not thread safe event handler can be running at any particular moment.
Let's consider small example. Suppose there are several messages:
struct M1 : public so_5::signal_t {}; struct M2 : public so_5::signal_t {}; struct M3 : public so_5::signal_t {}; struct stop : public so_5::signal_t {};
There are also two different agent types A
and B
. Agent A
handles message M1
by event handler A.e1
and M3
by event handler A.e3
(it also handles stop
for finishing the whole example). Agent B
handles message M2
by event handler B.e2
.
Event handlers A.e1
and B.e2
are thread safe. Event handlers A.e3
and A.stop
are not thread safe.
For demonstration purposes event handlers do not do any useful work: they just print the name of event handler to standard output and suspend the current thread for some time. There is the code of these agent types:
class A final : public so_5::agent_t { public : A( context_t ctx ) : so_5::agent_t{ ctx } { // All messages are expected from message box with name "demo" so_subscribe( so_environment().create_mbox( "demo" ) ) // Event handler A.e1 is thread safe because it explicitly marked // by using 'thread_safe' constant. .event< M1 >( [this] { trace( *this, "A.e1 started" ); std::this_thread::sleep_for( pause ); trace( *this, "A.e1 finished" ); }, so_5::thread_safe ) // Event handlers for other messages are not thread safe because // there is no special mark and all handlers are not thread safe by default. .event< M3 >( [this] { trace( *this, "A.e3 started" ); std::this_thread::sleep_for( pause ); trace( *this, "A.e3 finished" ); } ) .event< stop >( [this] { so_environment().stop(); } ); } }; class B final : public so_5::agent_t { public : B( context_t ctx ) : so_5::agent_t{ ctx } { so_subscribe( so_environment().create_mbox( "demo" ) ) .event< M2 >( [this] { trace( *this, "B.e2 started" ); std::this_thread::sleep_for( pause ); trace( *this, "B.e2 finished" ); }, so_5::thread_safe ); } };
Let's create and bind agents of type A
and B
to adv_thread_pool
:
namespace pool_disp = so_5::disp::adv_thread_pool; env.introduce_coop( // New dispatcher instance is created by create_private_disp. // Method binder() returns an object for binding agents from // new coop to that dispatcher istance. pool_disp::create_private_disp( env, 4 )->binder( // Parameter fifo_type will be discussed later. pool_disp::bind_params_t{}.fifo( fifo_type ) ), []( so_5::coop_t & coop ) { coop.make_agent< A >(); coop.make_agent< B >(); } );
And then send messages in a sequence {M1
, M2
, M3
, M2
, M1
, stop
}.
const auto m = env.create_mbox( "demo" ); so_5::send< M1 >( m ); so_5::send< M2 >( m ); so_5::send< M3 >( m ); so_5::send< M2 >( m ); so_5::send< M1 >( m ); so_5::send< stop >( m );
If fifo_type
is fifo_t::individual
then message queues for agent A
and B
will be different and will be handled independently. The queue for A
will contain sequence {M1
, M3
, M1
, stop
}, the queue for B
will contain {M2
, M2
}. See below the result of example execution (the first number is thread ID, then a pointer to agent in brackets):
14: (0x57c1f0) A.e1 started 12: (0x57c480) B.e2 started 13: (0x57c480) B.e2 started 14: (0x57c1f0) A.e1 finished 13: (0x57c480) B.e2 finished 14: (0x57c1f0) A.e3 started 12: (0x57c480) B.e2 finished 14: (0x57c1f0) A.e3 finished 14: (0x57c1f0) A.e1 started 14: (0x57c1f0) A.e1 finished
We can see that thread safe event handler B.e2
is called twice on different threads in parallel. But event handlers of agent A
are called sequentially. It is because A.e3
is not thread safe and it cannot be run until previous A.e1
is finished. The same situation with second A.e1
invocation: A.e1
is thread safe and it cannot be called in parallel with not thread safe A.e3
.
We can change fifo_type
value to fifo_t::cooperation
. In this case A
and B
will use one common queue which will contain the whole sequence {M1
, M2
, M3
, M2
, M1
, stop
}. Messages M1
and M2
have thread safe event handlers A.e1
and B.e2
. These event handlers will be called in parallel on different threads. But event handler for M3
will not be called until finish of previously invoked A.e1
and B.e2
. Only then A.e3
will be called. And only after finish of A.e3
event handlers for next M2
and M1
messages will be invoked. In parallel again because event handlers for them are thread safe.
The result of execution for fifo_t::cooperation
looks like:
6: (0x57b7c0) B.e2 started 8: (0x57c2e0) A.e1 started 6: (0x57b7c0) B.e2 finished 8: (0x57c2e0) A.e1 finished 5: (0x57c2e0) A.e3 started 5: (0x57c2e0) A.e3 finished 5: (0x57b7c0) B.e2 started 8: (0x57c2e0) A.e1 started 8: (0x57c2e0) A.e1 finished 5: (0x57b7c0) B.e2 finished
But there can be a slightly different sequence like:
8: (0x57c2e0) A.e1 started 6: (0x57b7c0) B.e2 started 6: (0x57b7c0) B.e2 finished 8: (0x57c2e0) A.e1 finished 5: (0x57c2e0) A.e3 started 5: (0x57c2e0) A.e3 finished 5: (0x57b7c0) B.e2 started 8: (0x57c2e0) A.e1 started 8: (0x57c2e0) A.e1 finished 5: (0x57b7c0) B.e2 finished
Or:
6: (0x57b7c0) B.e2 started 8: (0x57c2e0) A.e1 started 8: (0x57c2e0) A.e1 finished 6: (0x57b7c0) B.e2 finished 5: (0x57c2e0) A.e3 started 5: (0x57c2e0) A.e3 finished 8: (0x57c2e0) A.e1 started 5: (0x57b7c0) B.e2 started 8: (0x57c2e0) A.e1 finished 5: (0x57b7c0) B.e2 finished
But invocation of A.e3
will always be in the middle of pairs A.e1
+B.e2
. Because all previously started event handlers must be finished before A.e3
will be called.
Where can this feature of adv_thread_pool
dispatcher be useful? In cases where immutable operations need to be distributed to several working threads. For example there could be cryptographer
agent which performs make_signature
, check_signature
, encrypt_block
and decrypt_block
operations.
These operations are thread safe because they don't change the state of cryptographer
agent. Event handlers for these operations can be marked as thread safe. This allows to handle several cryptographic operations at the same time in parallel.
But cryptographer
agent can also have a not thread safe event handler for message reconfigure
. In this case adv_thread_pool
dispatcher guarantees that all thread safe event handlers finish their work before event handler for reconfigure
message will be started.
A sketch for such cryptographer
agent looks like:
class cryptographer : public so_5::agent_t { void on_make_signature( const make_signature & msg ) {...} void on_check_signature( const check_signature & msg ) {...} void on_encrypt_block( const encrypt_block & msg ) {...} void on_decrypt_block( const decrypt_block & msg ) {...} void on_reconfiguration( const reconfigure & msg ) {...} ... virtual void so_define_agent() override { so_subscribe_self() .event( &cryptographer::on_make_signature, so_5::thread_safe ) .event( &cryptographer::on_check_signature, so_5::thread_safe ) .event( &cryptographer::on_encrypt_block, so_5::thread_safe ) .event( &cryptographer::on_decrypt_block, so_5::thread_safe ) .event( &cryptographer::on_reconfigure, so_5::not_thread_safe ) ... } }
We hope this post shows how adv_thread_pool
dispatcher can simplify the development of agents which require distribution of event processing on several working threads. It is not the universal solution but if event handlers are thread safe methods (and sometimes they are) this approach can be useful.
Note. Code fragments with agents A
and B
are extracted from new example which was added to the main so-5.5 development brunch during the work on eighth part of "Dive into SObjectizer-5.5" serie presentations.This example is a part of SObjectizer since v.5.5.15.1. Its source code can be found in the repository.