Menu

so-5.5 Basics

Yauheni Akhotnikau korish

Introduction

so_5 is a framework for organizing application's work via message passing between agents. There are agents which send and receive messages or/and signals to each other. The messages are delivered via mail boxes and there are message queues where the messages are stored before processing. Some messages could be delayed (e.g. will be delivered after some time), and some of which could be repeated (e.g. will be delivered periodically again and again).

Every agent is working on some execution context, but user has no need to manage its working thread manually. All of these working threads are controlled by dispatchers. The user is responsible only for the selection of appropriate dispatchers and distribution of agents among them.

SObjectizer Environment

SObjectizer Environment (so_environment) is a container which comprises all the stuff. SObjectizer Environment is represented to the user as an interface so_5::environment_t.

It is possible to create several instances of SObjectizer Environment inside one application, although the need for it arises very rarely. In such case, they will be totally independent instances of environment, which means that agents in one instance will work independently from agents in the other.

The interface so_5::environment_t is an abstract class. The concrete so_environment_t implementation is created inside the so_5::launch() functions family. There are two ways to start SObjectizer Environment -- a call of so_5::launch() function or usage of so_5::wrapped_env_t class.

Starting SObjectizer Environment by launch() function

Every so_5::launch() function does the same sequence of steps:

  • instantiates a specific implementation of so_5::environment_t;
  • calls the user supplied initialization function;
  • starts SObjectizer Environment;
  • waits for SObjectizer Environment shutdown.

It means that launch() blocks the caller thread until SObjectizer Environment finished its work.

In simple application launch() is called only once in main() function, and afterwards the application finished when launch() returns. If user wants to do something else concurrently or to run another SObjectizer Environment instance he/she should create a dedicated thread on context of which launch() will be called.

The so_5::launch() function has two forms. The first one receives just one argument -- it is a function object (or lambda) with initialization actions:

::c++
so_5::launch( []( so_5::environment_t & env ) {
    ... // Some application specific code which uses 'env'.
} );

The second one receives two arguments. They are functional objects. The first, like in the previous case, does the initialization actions. The second argument sets up SObjectizer Environment's parameters:

::c++
so_5::launch( []( so_5::environment_t & env ) {
       ... // Some application specific code which uses 'env'.
    },
    []( so_5::environment_params_t & params ) {
        ... // Some environment tuning code which uses 'params'.
    } );

Starting SObjectizer Environment by using wrapped_env_t class

Since v.5.5.9 there is another way of starting SObjectizer Environment. This way is useful if some form of classical GUI-messages processing loop must be present in an application:

::c++
int main()
{
    so_5::wrapped_env_t sobj; // Empty SO Environment will be started here.
    sobj.environment().introduce_coop( []( so_5::coop_t & coop ) {
        ... // Some SO Environment initialization code.
    } );
    ... // Some application-specific initialization code.

    // Classical message processing loop for Windows GUI application.
    while(!GetMessage(...))
    {
        TranslateMessage(...);
        DispatchMessage(...);
    }

    sobj.stop_then_join(); // Stopping SO Environment and wait for complete finish.
    ... // Some application-specific deinitialization code.
}

An instance of so_5::environment_t is automatically created inside wrapped_env_t. A reference to that instance can be obtained via wrapped_env_t::environment() method.

An instance of SObjectizer Environment is automatically started in the constructor of wrapped_env_t. There is no need to shutdown that Environment explicitly: it will be done in the destructor. But sometimes Environment should be shut down manually like in the example above. It could be useful if there is a need to guarantee that all agents have stopped their work and all resources allocated by Environment are deallocated.

The constructor of wrapped_env_t can accept the same arguments as so_5::launch() functions: an SObjectizer initialization lambda and Environment parameters could be passed as constructor arguments:

::c++
int main()
{
    so_5::wrapped_env_t env{
        // Initialization lambda.
        []( so_5::environment_t & env ) {
            ... // Some application specific code which uses 'env'
        },
        // Parameters tuning lambda.
        []( so_5::environment_params_t & params ) {
            ... // Some application specific code which uses 'params'.
        } };

    // Classical Windows GUI 
    // Classical message processing loop for Windows GUI application.
    while(!GetMessage(...))
    {
        TranslateMessage(...);
        DispatchMessage(...);
    }
    // SObjectizer Environment will be automatically shut down here.
}

Messages and Signals

Interaction between agents is performed by sending messages. There are two types of messages:

  • the ordinary message with some data inside;
  • the special case of message where there is no any data except the fact of message existence. Such kind of message is signal.

Every message/signal must have a unique type -- a dedicated C++ class/struct.

For signals the corresponding C++ classes must be derived from so_5::signal_t:

For ordinary messages there are two possibilities: arbitrary type T which is MoveConstructible can be used as message type or type of message must be derived from so_5::message_t.

::c++
// This is a message type.
struct msg_convert : public so_5::message_t
  { /* some fields here */ };
// This is a message type.
struct mouse_position { int x, y; };
// This type is also can be used as message type.
enum class engine_control { turn_on, speed_up, slow_down, turn_off };

// This is a signal type.
struct msg_get_status : public so_5::signal_t {};
// This is a signal type.
struct ping : public so_5::signal_t {};

All ordinary messages must be sent as dynamically allocated objects, e.g. instances of corresponding C++ classes are created by new operator and then deleted by delete operator.

The recommended way of sending messages is to use so_5::send() functions:

::c++
so_5::send< msg_convert >(dest, 42); /* Value 42 will be passed to
        the constructor of msg_convert */
so_5::send< mouse_position >(dest, 0, 145);
so_5::send< engine_control >(dest, engine_control::turn_on); /* Value turn_on will be sent
        as a message of type engine_control. */

so_5::send< msg_get_status >(dest); // Sending of a signal.
so_5::send< ping >(dest); // Sending of a signal.

If send() function is used all necessary dynamically-allocated objects are created automatically.

But sometimes it is necessary to send already created message instance. Method deliver_message of destination mbox can be used here. But delivery_message can handle only messages which are derived from so_5::message_t:

::c++
std::unique_ptr< msg_convert > msg = std::make_unique< msg_convert >(42);
... // Some action on msg content...
// Sending message to the distination.
mbox->deliver_message( std::move( msg ) );

There is also deliver_signal method for sending signals:

::c++
mbox->deliver_signal< msg_get_status >();

But since v.5.5.1 so_5::send() is preferable instead of deliver_message and deliver_signal.

Note. so_5 distinguishes message/signal by their C++ type. It is necessary to define some different type for each message even if they have the same structure. For example, there are two different types of message:

::c++
struct msg_convert_to_long : public so_5::message_t
  {
    int m_value;
  };
struct msg_convert_to_string : public so_5::message_t
  {
    int m_value;
  };

But these are not:

::c++
struct msg_convert_int : public so_5::message_t
  {
    int m_value;
  };
// IT WON'T WORK IN SO_5!
typedef msg_convert_int msg_convert_to_long;
typedef msg_convert_int msg_convert_to_string;

Mboxes

Mbox is a shorthand for the message box. Mbox is a term which is used in SObjectizer for two abstractions:

  • point/address/channel, to which messages and signals are sent;
  • point/address/channel, from which messages and signals are received.

There are two different types of mboxes in SObjectizer, each of them is implemented in its own way. But there is an interface so_5::abstract_message_box_t which is implemented by every type of mboxes. All interactions with mboxes are done via that interface.

Interface abstract_message_box_t is an abstract class. So it is impossible to store objects of abstract_message_box_t inside an agent or pass abstract_message_box_t as an argument to a function/method or to message. Only references/pointers to abstract_message_box_t can be used. There is a special type so_5::mbox_t which is an analog of smart pointer/reference to the abstract_message_box_t. Objects of mbox_t is the only way to deal with mboxes in SObjectizer.

All mboxes are controlled by SObjectizer Environment. The interface environment_t contains methods for creating mboxes. User holds references to mboxes by so_5::mbox_t objects. Every mbox will be automatically destroyed right after the moment, when all of mbox-references to it are gone.

Multi-producer/multi-consumer mboxes

SObjectizer uses a lightweight publish-subscribe model which is implemented by multi-producer/multi-consumer (MPMC) mboxes. The MPMC mbox is like bulletin board: a producer publishes message/signal on it and all consumers of the message/signal type from the bulletin board receive the message/signal instance. It means that the producer should know the mbox, to which the messages will be sent (published). And the consumer also should know the mbox to make its message subscription. It is a task of user to share that knowledge between application's agents.

Mbox can deliver messages of different types. It means that user can send various messages of various types to a single mbox. And, it is not necessary to create different mboxes for different message types.

However, the subscription is made for some specific message type. For example, if an agent A is subscribed to messages of type msg_ping from mbox M, and an agent B is subscribed to messages of type msg_pong, then user can send msg_ping or msg_pong messages to M, but A will receive only msg_ping and B will receive only msg_pong.

There are two kinds of MPMC mboxes:

  • anonymous. These mboxes have no any identification. They are known only by mbox-references to them. In fact, if user lost mbox-reference to an anonymous mbox he/she lost the mbox too;
  • named. These mboxes should have unique names. SObjectizer Environment holds a dictionary of named mboxes. If user creates an mbox with already known name then the same mbox is returned.

An anonymous mbox can be created by environment_t::create_mbox() method without the name argument:

::c++
// Anonymous mbox will be created.
auto mbox = env.create_mbox();

A named mbox can be created (or a reference to already created mbox is obtained) by environment_t::create_mbox() method with the name argument:

::c++
// Named mbox is created or obtained.
 auto mbox = env.create_mbox( "notification_channel" );

The choice between anonymous or named mboxes mainly depends on possibility to pass mbox-reference to agent's constructor. If it is possible then anonymous mboxes could be better choice. For example, if there are two agents which are constructed at the same time and should pass messages to each other then anonymous mboxes is the simplest case:

::c++
// Pinger agent. Should have its own mbox and mbox of ponger agent.
class a_pinger_t : public so_5::agent_t
  {
      const so_5::mbox_t m_self_mbox;
      const so_5::mbox_t m_ponger_mbox;
  public :
    // Receives mbox-references in the constructor:
    a_pinger_t( context_t ctx,
      // Reference to own mbox.
      so_5::mbox_t self_mbox,
      // Reference to ponger's mbox.
      so_5::mbox_t ponger_mbox )
      : so_5::agent_t( ctx )
      , m_self_mbox( std::move(self_mbox) )
      , m_ponger_mbox( std::move(ponger_mbox) )
      ...
    ...
  };
// Ponger agent. Should have its own mbox and mbox of pinger agent.
class a_ponger_t : public so_5::agent_t
  {
      const so_5::mbox_t m_self_mbox;
      const so_5::mbox_t m_pinger_mbox;
  public :
    // Receives mbox-references in the constructor:
    a_ponger_t( context_t ctx,
      // Reference to own mbox.
      so_5::mbox_t self_mbox,
      // Reference to pinger's mbox.
      so_5::mbox_t pinger_mbox )
      : so_5::agent_t( ctx )
      , m_self_mbox( std::move(self_mbox) )
      , m_pinger_mbox( std::move(pinger_mbox) )
      ...
    ...
  };
...
// Creation of mboxes.
auto pinger_mbox = env.create_mbox();
auto ponger_mbox = env.create_mbox();
// Creation of agents.
coop->make_agent< a_pinger_t >( pinger_mbox, ponger_mbox );
coop->make_agent< a_ponger_t >( ponger_mbox, pinger_mbox );

But there could be cases when agents are created in different parts of application (when so_sysconf is used for example) and direct exchange of mbox reference is difficult. In such cases named mboxes could help:

::c++
// Pinger agent. Should have its own mbox and mbox of ponger agent.
class a_pinger_t : public so_5::agent_t
  {
      const so_5::mbox_t m_self_mbox;
      const so_5::mbox_t m_ponger_mbox;
  public :
    a_pinger_t( context_t ctx )
      : so_5::agent_t( ctx )
      , m_self_mbox( ctx.env().create_mbox( "pinger" ) )
      , m_ponger_mbox( ctx.env().create_mbox( "ponger" ) )
      ...
    ...
  };
// Ponger agent. Should have its own mbox and mbox of pinger agent.
class a_ponger_t : public so_5::rt::agent_t
  {
      const so_5::mbox_t m_self_mbox;
      const so_5::mbox_t m_pinger_mbox;
  public :
    a_ponger_t( context_t ctx )
      : so_5::agent_t( ctx )
      , m_self_mbox( ctx.env().create_mbox( "ponger" ) )
      , m_pinger_mbox( ctx.env().create_mbox( "pinger" ) )
      ...
    ...
  };
...
// Pinger is created in one place...
coop->make_agent< a_pinger_t >();
...
// Ponger is created somewhere else...
coop->make_agent< a_ponger_t >();

There could be more sophisticated cases when agents are created, destroyed and created again, and in this case the named mboxes could be useful. However, this is out of scope of this introduction to SObjectizer's basics.

Since v.5.5.1 there are so_5::send free functions family which is the recommended way of messages/signals sending:

::c++
// Sending message.
so_5::send< msg_ping >( mbox,
  // All the following arguments will be forwarded
  // to the msg_ping's constructor.
  ... );

// Sending signal.
so_5::send< msg_pong >( mbox );

Bet there are also two mbox's method families for message delivery. The first one is used to deliver ordinary messages which a derived from so_5::message_t. Methods of that family called deliver_message and type of message is deduced from actual argument:

::c++
auto msg = std::make_unique< msg_ping >(...);
m_ponger_mbox->deliver_message(
  // Message type is deduced from the argument.
  std::move( msg ) );

The second one is used to deliver signals, the type of which must be specified explicitly:

::c++
m_pinger_mbox->deliver_signal< msg_pong >();

Multi-producer/single-consumer mboxes

MPMC mboxes are good when 1-to-many or many-to-many agents interaction is required. But they are too expensive for the case of 1-to-1 interaction. It is because message passing via MPMC mbox consists of two phases:

  • on the first phase there is a lookup for subscriber list for the specified message type;
  • on the second phase there is a check for possibility of handling this message type in the current subscriber’s state.

For the 1-to-1 agents interaction the first phase is redundant. The 1-to-1 interaction will be more efficient if a message is directly passed to the subscriber’s queue without looking up the subscriber list for the message type.

This is a goal for new type of mboxes in SObjectizer 5.4.0 -- direct and efficient support for 1-to-1 agents interaction.

In v.5.4.0 of SObjectizer the new type of mbox is implemented. It is a multi-producer/single-consumer (MPSC) message box. These mboxes are automatically created by SObjectizer for every agent. When a constructor of agent_t finishes its work there is an MPSC mbox for the agent. That mbox can be accessed via agent_t::so_direct_mbox() method. That mbox belongs to the agent for which mbox is created. That agent is the owner of the MPSC mbox.

MPSC mbox implements abstract_message_box_t interface just as MPMC mbox. References to MPSC mboxes can be stored by mbox_t objects. References to MPSC mboxes in form of mbox_t can be passed to other agents or can be stored anywhere user wants.

The example with ping-ping agents from above could be rewritten with MPSC mboxes in the following way:

::c++
// Pinger agent.
class a_pinger_t : public so_5::agent_t
  {
  public :
    a_pinger_t( context_t ctx )
      : so_5::agent_t( ctx )
      {}
    // Reference to ponger's mbox cannot be passed to constructor
    // so it must be set by this method.
    void set_ponger_mbox( const so_5::mbox_t & mbox )
      {
        m_ponger_mbox = mbox;
      }
    ...
  };
// Ponger agent.
class a_ponger_t : public so_5::agent_t
  {
  public :
    a_ponger_t( context_t ctx )
      : so_5::agent_t( env )
      {}
    // Reference to pinger's mbox cannot be passed to constructor
    // so it must be set by this method.
    void set_pinger_mbox( const so_5::mbox_t & mbox )
      {
        m_pinger_mbox = mbox;
      }
    ...
  };
...
// Creation of agents.
auto pinger = coop->make_agent< a_pinger_t >();
auto ponger = coop->make_agent< a_ponger_t >();
// Exchange of mboxes.
a_pinger->set_ponger_mbox( a_ponger->so_direct_mbox() );
a_ponger->set_pinger_mbox( a_pinger->so_direct_mbox() );

The main differences between MPSC and MPMC mboxes are:

  • the inability to subscribe to the messages from the mbox except for the owner of mbox;
  • it is possible to send mutable messages into MPSC mboxes, but this is impossible for MPMC mboxes (more about mutability and immutability of messages below);
  • delivery filters can be set only to MPMC mboxes. See [so-5.5 In-depth - Message Delivery Filters] for more information about delivery filters.

Sending to agent's direct MPSC mbox can be done via so_5::send():

::c++
struct mouse_position { int x, y };
struct stop_engine : public so_5::signal_t {};
// Sending a message to agent.
so_5::send< mouse_position >( *canvas, x_pos, y_pos );
// Sending a signal to agent.
so_5::send< stop_engine >( *engine_controller );

For more information about MPSC mboxes see [so-5.4.0 New type of mbox - multi-producer and single-consumer].

Selection between MPMC and MPSC mboxes

So, there are two types of mboxes with different abilities and with a noticeable difference in effectiveness of message delivering to subscribes. Which one must be used and when?

MPMC mboxes must be used when an agent is a producer of some information which could be processed by several different agents. Or, when there are several producers for information and several consumers for that information.

Lets consider an agent which reads one socket, transform data and writes it to another socket. From time to time that agent sends a message about its current state: state of connections, amount of processed data and so on. If this message is sent to MPMC mbox it could be processed by different agents. One could transform that message to one or more SNMP data sources to be available for various monitoring tools like Zabbix or HP OpenView. Another receiver could use this status message for overloading control. Another receiver could write content of this status message to a log-file. And so on.

MPSC mboxes must be used when agent is a receiver of some agent-specific information and/or agent-specific commands. For example, the message about need to reconfigure agent is agent-specific and it is better to send it to agent’s MPSC mbox than to common MPMC mbox, which is shared between several agents.

Note. There is a significant trait of MPSC mbox in comparison with MPMC mbox: because there is absence of the first phase (lookup for a list of subscribers by the message type) it is possible to push a message to agent’s queue even if the agent is not subscribed to it. This message will be rejected on the second phase when the checking for ability of the message processing is performed.

Cooperations

The cooperation is a kind of container for introducing (registering) and removing (deregistering) agents to/from SObjectizer Environment. With the help of cooperation some several agents which are dependent on one another could be registered at once in transaction mode: all of that agents are registered successfully or no one of them.

Each cooperation must have a unique name. SObjectizer Environment maintains a dictionary of registered cooperations. An attempt to register a cooperation with already known name is an error.

Every cooperation creation consists of three stages:

  1. A cooperation object must be created by environment_t::create_coop() method.
  2. The cooperation must be filled up with agents by coop_t::make_agent(), coop_t::make_agent_with_binder() (or coop_t::add_agent() in versions prior to 5.5.4) methods.
  3. The cooperation must be registered by moving to environment_t::register_coop() method.

For example:

::c++
// Creation of cooperation.
auto coop = env.create_coop( "ping_pong_demo" );
// Creation of agents.
coop->make_agent< a_pinger_t >( pinger_mbox, ponger_mbox );
coop->make_agent< a_ponger_t >( ponger_mbox, pinger_mbox );
// Registration of cooperation.
env.register_coop( std::move( coop ) );

Since v.5.5.5 there is more compact way for creation and registration of a new coop (see [so-5.5.5 introduce_coop and introduce_child_coop helpers] for more details):

::c++
env.introduce_coop( "ping_pong_demo", [&]( so_5::coop_t & coop ) {
    coop->make_agent< a_pinger_t >( pinger_mbox, ponger_mbox );
    coop->make_agent< a_ponger_t >( ponger_mbox, pinger_mbox );
} );

To deregister cooperation environment_t::deregister_coop() method must be used. It receives the cooperation name to be deregistered and the reason of the deregistration:

::c++
env.deregister_coop( "ping_pong_demo", so_5::dereg_reason::normal );

Please note, the deregistration can take some time. The method deregister_coop() only starts the deregistration procedure. But deregistration is completed only when all of cooperation agents finished their message processing. Whereas the deregistration procedure is active, their agents cannot receive any new messages. The agents only handle their old messages which had been dispatched before deregister_coop() call.

In the case where a cooperation should contain only one agent the register_agent_as_coop() method can be used:

::c++
// Create agent and register it as a cooperation "demo".
env.register_agent_as_coop(
  // Cooperation name.
  "demo", 
  // The single agent of cooperation.
  env.make_agent< a_demo_t >( ... ) );

There is also optional parent-child relation between cooperations. It allows to specify some existent parent cooperation for a new cooperation before registration:

::c++
// A verbose version:
void a_connection_manager_t::evt_new_connection( const msg_new_connection & evt )
{
    // New connection must be served by new cooperation.
    auto coop = so_environment().create_coop( "connection_handler" );
    // New cooperation is a child for this cooperation.
    coop->set_parent_coop_name( so_coop_name() );
    ... // Filling new cooperation with agents.
    so_environment().register_coop( std::move( coop ) );
}

// Or more compact one:
void a_connection_manager_t::evt_new_connection( const msg_new_connection & evt )
{
    // New connection must be served by new cooperation.
    // New cooperation will be a child of cooperation of the current agent.
    auto coop = so_5::create_child_coop( *this, "connection_handler" );
    ... // Filling new cooperation with agents.
    so_environment().register_coop( std::move( coop ) );
}

// Or yet more compact one (since v.5.5.5):
void a_connection_manager_t::evt_new_connection( const msg_new_connection & evt )
{
    // New connection must be served by new cooperation.
    // New cooperation will be a child of cooperation of the current agent.
    so_5::introduce_child_coop( *this, "connection_handler",
        [&]( so_5::coop_t & coop ) {
            ... // Filling new cooperation with agents.
        } );
}

SObjectizer guarantees that children cooperation will be destroyed before its parent cooperation. It is useful for some reasons:

  • no need to control lifetime of child cooperation. It will be automatically deregistered when deregistration of the parent cooperation is started;
  • it is safe to pass references to objects which are owned by parent cooperation to the child cooperation. For example, parent cooperation can create connection to database and holds DB connection object as an attribute in some agent. Reference to that DB connection object can be safely passed to child cooperation because lifetime of DB connection object will be longer then lifetime of child cooperation.

For more information about parent-child relation see [so-5.2.3 Parent-child cooperation relationship].

SObjectizer requires a unique name for every cooperation. But sometimes the selection of appropriate name it is a boring task for the user. Because of that since v.5.5.1 there is a possibility to delegate this task to the SObjectizer:

::c++
// Name of the cooperation will be created automatically.
auto coop = env.create_coop( so_5::autoname );
coop->add_agent( ... );
...
env.register_coop( std::move( coop ) );

// Or, for the case when cooperation has only one agent.
auto agent = std::unique_ptr< my_agent >( new my_agent( ... ) );
env.register_agent_as_coop( so_5::autoname, std::move( agent ) );

For more information about automatic name generation for cooperations see [so-5.5.1 The autoname feature for cooperations].

Agents

Agents are objects which receive and handle messages and signals.

Every agent is linked with some message queue. When someone sends a message/signal to which an agent is subscribed a reference to that message/signal is stored in agent's message queue. Agents periodically get messages from their queues and process them. Processing of message/signal from queue is called 'event handling'.

There are two kinds of agents in SObjectizer: ordinary agents and ad-hoc agents.

Ordinary Agents

Ordinary agents, which exist from the very first version of SObjectizer, are the most flexible and full fledged agents.

To define an ordinary agent it is necessary to derive a new class from so_5::agent_t:

::c++
class a_pinger_t : public so_5::agent_t
  { /* class definition here */ }

Subsequently, instances of that class may be used for some cooperation.

There are several virtual methods of so_5::agent_t class which could be used for agent tuning.

The first one is so_define_agent(). It is called inside register_coop() before an agent starts its work. The main purpose of so_define_agent() is to provide a possibility to make subscriptions. Usually agents make their subscription in so_define_agent():

::c++
void a_pinger_t::so_define_agent() override
  {
    so_subscribe_self().event( &a_pinger_t::evt_pong );
    ...
  }

The second one is so_evt_start(). It is called as the very first agent event after its successful registration of agent's cooperation. It could be used for doing something at the start of agent's work:

::c++
void a_pinger_t::so_evt_start() override
  {
    so_5::send< msg_ping >( m_ponger_mbox, "Start!" );
  }

The main difference between so_define_agent() and so_evt_start() is the following: so_define_agent() is called on the context of thread where register_coop() is called. Whereas so_evt_start() is called on the context of agent's working thread (more about it see Dispatchers section).

The next important virtual method is so_evt_finish(). It is the opposite to so_evt_start(). This method is the last method called for agent on the context of agent's working thread.

The pair of so_evt_start()/so_evt_finish() could be regarded as analogs of constructor/destructor. The method so_evt_start() could be used for allocating some resource and so_evt_finish() for deallocating it:

::c++
void a_db_manager_t::so_evt_start() override
  {
    // Create a connection to DB.
    m_connection = create_database_connection( ... );
    ...
  } 
void a_db_manager_t::so_evt_finish() override
  {
    // Close the connection gracefully.
    m_connection.close();
  }

The usual way for message handling for ordinary agents is to define a dedicated non-static method for each message type. For the cases of messages (e.g. messages with data inside) there could be severals forms of such methods:

::c++
// Long forms:
result_type event_handler( so_5::mhood_t< MSG > evt );
result_type event_handler( const so_5::event_type_t< MSG > & evt );

// Short form:
result_type event_handler( const MSG & evt );

// Short form with a copy of a message.
// Very useful for messages of lightweight types like int's or long's.
result_type event_handler( MSG evt );

There is no need to declare message type during event subscription -- it is deduced from the method's prototype:

::c++
void a_ponger_t::so_define_agent() override
  {
    so_subscribe_self().event( &a_ponger_t::evt_ping );
  }
void a_ponger_t::evt_ping( const msg_ping & msg )
  { ... }

The difference between long and short forms is the possibility of calling mhood_t::make_reference() method for resending the same message object. If this possibility is not necessary then the short form is preferable.

For the cases of signals (e.g. messages without actual data) there could be two forms of signal-handlers. The first one is the same as the first form of message handlers:

::c++
// Long forms.
result_type event_handler( so_5::mhood_t< SIGNAL > evt );
result_type event_handler( const so_5::mhood_t< SIGNAL > & evt );

There is also no need to declare any signal type during the signal subscription:

::c++
void a_pinger_t::so_define_agent() override
  {
    so_subscribe_self().event( &a_pinger_t::evt_pong );
  }
void a_ponger_t::evt_ping( mhood_t< msg_pong > & msg )
  { ... }

This long form for signal handlers is supported for historical reasons as well as for simplification in case of heavy usage of C++ templates in agents code (in such case there is no difference between message and signal subscription and handler's declaration).

The second form of signal handlers is the short form:

::c++
// Short form for signal handler.
result_type event_handler();

In this case, type of signal should be explicitly specified during the signal subscription:

::c++
struct msg_shutdown : public so_5::signal_t {};
struct msg_get_status : public so_5::signal_t {};

void a_watch_dog_t::so_define_agent() override
  {
    so_subscribe_self()
        .event< msg_shutdown >( &a_watch_dog_t::evt_shutdown );
        .event< msg_get_status >( &a_watch_dog_t::evt_get_status );
  }
void a_watch_dog_t::evt_shutdown()
  {
    so_environment().stop();
  }
std::string a_watch_dog_t::evt_get_status()
  {
    return m_everything_fine ? "ok" : "failure";
  }

There is a possibility to use C++11 lambda functions as event handlers. They should be used as arguments to event() method in subscription chain:

::c++
void a_watch_dog_t::so_define_agent() override
  {
    so_subscribe_self()
        // Subscription to a signal.
        .event< msg_shutdown >( [this]() { so_environment().stop(); } )
        // Subscription to a message.
        .event(
           // Type of the message will be deduced.
           [this]( const msg_failure_detected & msg ) {
                 m_everything_fine = false;
                 m_failure_description = msg.reason();
           } )
        // Subscription to a signal.
       .event< msg_get_status >(
          [this]() -> std::string {
                return m_everything_fine ? "ok" : "failure";
          } );
  }

Since v.5.5.14 event handler in a form of lambda-function can also have the same formats as ordinary event handlers in a form of non-static method. For example:

::c++
void some_agent::so_define_agent() override
{
    so_subscribe_self()
        // Subscription to a signal.
        .event( [this]( mhood_t< msg_shutdown > ) { ... } )
        // Another form of subscription to a signal.
        .event< msg_get_status >( [this]() -> std::string { ... } )
        // Subscription to a message.
        .event( [this]( mhood_t< request > msg ) { ... } )
        // Another form of subscription to a message.
        .event( [this]( const update_config & msg ) { ... } );
}

Since v.5.4.0 every ordinary agent has its own MPSC mbox. This mbox is automatically created in the base class constructor -- so_5::agent_t. Reference to direct agent's mbox can be obtained by agent_t::so_direct_mbox() method.

Ad-hoc Agents

Ad-hoc agents are agents which could be constructed and registered without necessity of defining dedicated C++ class.

To construct an ad-hoc agent it is necessary to call define_agent for a cooperation object. This method returns a special proxy object which can be used to define the behavior of a new agent. For example, the next method chain creates the agents which handles a message and a signal:

::c++
auto coop = env.create_coop( "sample" );
// Definition of ad-hoc agent.
coop->define_agent()
  // First event for that agent.
  .event(
    // A mbox as message source.
    some_mbox,
    // Event handling code. Type of message is deduced automatically.
    []( const msg_convert & msg ) -> int { return std::atoi( msg.m_value ); } )
  // Second event for that agent.
  // Handles signal, because of thar signal type must be explicitly specified.
  .event< msg_get_status >(
    // A mbox as message source.
    some_mbox,
    // Event handling code.
    []() -> std::string { return "ready"; } ); 

There is no C++ class for ad-hoc agent, and that's why there are no virtual methods like so_define_agent(), so_evt_start(), so_evt_finish() and so on.

Subscriptions for ad-hoc agent should be done in define_agent().event() chains as shown above.

To define the start action for ad-hoc agent the define_agent().on_start() chain must be used:

::c++
coop->define_agent()
  // The start action for the agent.
  .on_start( []() { std::cout << "Hello, world" << std::endl; } )
  ... /* other agent definitions... */;

To define the finish action for ad-hoc agent the define_agent().on_finish() chain must be used:

::c++
coop->define_agent()
  // The finish action for the agent.
  .on_finish( []() { std::cout << "Bye, world" << std::endl; } )
  ... /* other agent definitions... */;

Since v.5.5.3 there is a possibility to get access to ad-hoc agent's MPSC (aka direct) mbox:

::c++
auto pinger = coop->define_agent();
auto ponger = coop->define_agent();

pinger
    .on_start( [ponger]{ so_5::send< msg_ping >( ponger.direct_mbox() ); } )
    .event< msg_pong >(
        pinger.direct_mbox(),
        [ponger]{ so_5::send< msg_ping >( ponger.direct_mbox() ); } );

ponger
    .event< msg_ping >(
        ponger.direct_mbox(),
        [pinger]{ so_5::send< msg_pong >( pinger.direct_mbox() ); } );

Since v.5.5.8 and v.5.5.9 there is more simple way to work with ad-hoc agent's MPSC (aka direct) mbox:

::c++
auto pinger = coop->define_agent();
auto ponger = coop->define_agent();

pinger
    .on_start( [ponger]{ so_5::send< msg_ping >( ponger ); } )
    .event< msg_pong >( pinger, [ponger]{ so_5::send< msg_ping >( ponger ); } );

ponger
    .event< msg_ping >( ponger, [pinger]{ so_5::send< msg_pong >( pinger ); } );

Since v.5.5.14 event handlers of ad-hoc agents can have the same format as event handlers of ordinary agents.

For more details about ad-hoc agents' direct mboxes see [so-5.5.3 Direct mboxes of ad-hoc agents], [so-5.5.8 Simplification of usage of send-family functions with ad-hoc agents], [so-5.5.8 Simpification of ad-hoc agents subscription] and [so-5.5.9 New format of send function].

Agent's priority

A new feature was introduced in v.5.5.8 -- agent’s priorities. Every agent now has a priority. Priority is just an optional mark for dispatchers. Some dispatcher can use this mark for priority-specific events scheduling, some dispatcher can’t.

Priority is represented by enumeration so_5::priority_t. There are just 8 priorities: from so_5::priority_t::p0 (the lowest) to so_5::priority_t::p7 (the highest). There also a useful constants functions in namespace so_5::prio, for example so_5::prio::p1 as a synonym for so_5::priority_t::p1.

Priority must be specified at the time of agent construction and priority of agent cannot be changed later.

For ordinary agents priority is passed to the constructor of base class:

::c++
class my_agent : public so_5::agent_t
{
public :
    my_agent( context_t ctx ) : so_5::agent_t( ctx + so_5::prio::p1 ), ... {}
    ...
};

For ad-hoc agents priority must be specified via define_agent() method:

::c++
env.introduce_coop( []( so_5::agent_coop_t & coop ) {
    coop.define_agent( coop.make_agent_context() + so_5::prio::p1 )...
    ...
} );

All agents have the lowest priority by default. It means that if a priority for agent is not set explicitely during construction the agent will have priority so_5::priority_t::p0.

For more information about agent's priorities see [so-5.5 In-depth - Priorities of Agents].

Thread safety for event handlers

By default SObjectizer thinks that all event handlers are not thread safe. Standard dispatchers guarantee that not_thread_safe event handlers will work alone and only on the context of one working thread.

Since v.5.4.0 it is possible to specify that an event handler is a thread safe handler. It means that event handler doesn't change the agent state (or, does it in a thread safe manner). If user specifies an event handler as thread_safe handler it gives to SObjectizer's dispatcher a right of running thread-safe handlers of the same agent in parallel on the different working threads. However, with the following conditions:

  • a not_thread_safe event handler can't be started until there is any other running event handler of the agent. The start of not_thread_safe event handler will be delayed until all already running event handlers finish their work;
  • no one event handler can't be started until there is a working not_thread_safe event handler;
  • no one thread_safe event handler can't be started until there is a working not_thread_safe event handler.

To specify thread_safe event handler it is necessary to pass the additional argument to event() method in subscription chain:

::c++
void so_define_agent() override
   {
      st_disconnected
         // thread_safety flag is not specified.
         // This is not_thread_safe event handler and it
         // can't work in parallel with other event handlers
         // because it changes agent's state.
         .event< msg_reconnect >( &my_agent::evt_reconnect )
         // This is thread_safe event and it doesn't change agent's state.
         .event(
               &my_agent::evt_send_when_disconnected,
               so_5::thread_safe )
         // This is thread_safe event and it doesn't change agent's state.
         .event< msg_get_status >(
               []() -> std::string { return "disconnected"; },
               so_5::thread_safe );
      ...
   } 

As an example of thread_safe and not_thread_safe event handler, lets see an agent for performing cryptography operations: encrypting and decrypting. These operations are stateless and can be done in parallel. But the agent must be reconfigured from time to time. The reconfiguration operation is stateful and event handler for reconfigure message is not_thread_safe.

User must only specify the thread_safety flag for those event handlers. All of other work will be done by SObjectizer. It means that evt_encrypt and evt_decrypt will be running in parallel until an occurrence of msg_reconfigure. SObjectizer will wait until all previously started evt_encrypt/evt_decrypt finish their work and only then will start evt_reconfigure. While evt_reconfigure is working all of other event handlers will be delayed and will be started only after return from evt_reconfigure:

::c++
class a_cryptographer_t : public so_5::agent_t 
    {
    public :
        virtual void so_define_agent() override
            {
                // not_thread_safe event handler.
                so_default_state()
                    .event( &a_cryptographer_t::evt_reconfigure );

                // thread_safe event handlers.
                so_default_state()
                    .event( &a_cryptographer_t::evt_encrypt, so_5::thread_safe )
                    .event( &a_cryptographer_t::evt_decrypt, so_5::thread_safe );
            }

        void evt_reconfigure( const msg_reconfigure & evt ) { ... }

        encrypt_result_t evt_encrypt( const msg_encrypt & evt ) { ... }

        decrypt_result_t evt_decrypt( const msg_decrypt & evt ) { ... }
    };

Dispatchers

Every agent should work on some execution context: some working thread must be assigned to an agent (or, in SObjectizer's terms, agent must be bound to some working thread). The special object is responsible for that -- dispatcher.

The dispatchers could be seen as managers for working threads. SObjectizer's user is free from thread management. All threads are started and stopped by dispatcher under SObjectizer Environment control. All that the user should do is to specify which dispatcher should be used by SObjectizer Environment and bind agents to appropriate dispatchers.

There are several dispatcher types which are available for the user "out of the box". Five of them do not support agent's priority and ignore priority value during event scheduling:

  • the simplest one, which binds every agent to the same working thread. It is called one_thread dispatcher;
  • the dispatcher, which binds an agent to a dedicated working thread. The agent becomes an active object -- it works on its own thread and doesn't share this thread with the other agents. That dispatcher is called active_obj dispatcher;
  • the dispatcher, which binds an agent group to one dedicated working thread. Every agent within the group works on the same working thread but the other agents work on different threads. That dispatcher is called active_group dispatcher;
  • the dispatcher, which uses pool of working threads and distributes event handlers between them. It is called thread_pool dispatcher. This dispatcher doesn't distinguish between not_thread_safe and thread_safe event handlers and assumes that all event handlers are not_thread_safe. It means that agent work only on one working thread and no more than one event at a time;
  • the dispatcher, which uses pool of working threads and distinguish between not_thread_safe and thread_safe event handlers. It is called adv_thread_pool dispatcher. This dispatcher allows to run several of thread_safe event handlers of one agent on different threads in parallel.

There are also several dispatchers which support agent's priority and do priority-respected event scheduling:

  • dispatcher prio_one_thread::strictly_ordered runs all events on the context of one working thread. This dispatcher allows for events of high priority agents to block events of low priority agents. It means that events queue is always strictly ordered: events for agents with high priority are placed before events for agents with lower priority;
  • dispatcher prio_one_thread::quoted_round_robin also runs all events on the context of one working thread. Dispatcher prio_one_thread::quoted_round_robin works on round-robin principle. It allows to specify maximum count of events to be processed consequently for the specified priority. After processing that count of events dispatcher switches to processing events of lower priority even if there are yet more events of higher priority to be processed;
  • dispatcher prio_dedicated_threads::one_per_prio creates dedicated thread for every priority (so the eight working threads will be created). It means that events for agents with priority p7 will be handled on different thread than events for agents with, for example, priority p6.

The one_thread dispatcher is available to user as a default dispatcher. There is no need to do something to launch that kind of dispatcher. By default all agents are handled by that default dispatcher.

The other types of dispatchers require some user actions. First of all, dispatcher instance must be created and passed to SObjectizer Environment. Each dispatcher instance must have a unique dispatcher name. The name will be used later to bind agents to the dispatcher.

There are two kinds of dispatchers in SObjectizer (since v.5.5.4): public and private dispatchers. A work with them has very important differences.

Public Dispatchers

Public dispatchers must have unique names. They are accessed by means of their names. Once created public dispatchers stay inside SObjectizer Environment until Environment finished its work. It means that an instance of public dispatcher will work even when no one is used it.

Dispatcher is passed to SObjectizer Environment via environment_params_t object which could be tuned in the launch() function:

::c++
so_5::launch(
    []( so_5::environment_t & env ) { /* some initial actions */ },
    // SObjectizer's parameters tuning.
    []( so_5::environment_params_t & params ) {
        // Add an instance of active_obj dispatcher with name 'active_obj'.
        p.add_named_dispatcher( "active_obj",
            so_5::disp::active_obj::create_disp() );
        // Add another instance of active_obj dispatcher with name 'shutdowner'.
        p.add_named_dispatcher( "shutdowner",
            so_5::disp::active_obj::create_disp() );

        // Add an instance of active_group dispatcher with name 'groups'.
        p.add_named_dispatcher( "groups",
            so_5::disp::active_group::create_disp() );
        ...
    } );

Also dispatcher can be added to running SObjectizer Environment by the method add_dispatcher_if_not_exists:

::c++
void my_agent::evt_long_request( const request & evt )
{
   // New child cooperation is necessary for handling the request.
   // An instance of dispatcher is required for the cooperation.

   // Create a dispatcher if it not exists.
   so_environment().add_dispatcher_if_not_exists(
         "long_request_handler_disp",
         []() { return so_5::disp::active_group::create_disp(); } );
   ...
} 

Then user should create some dispatcher_binder objects for binding agents to the dispatcher. That object will be used during the cooperation registration for binding agent to its working thread. The dispatcher binder could be specified for whole cooperation:

::c++
auto coop = env.create_coop( "ping_pong",
    // All agents  will be bound to dispatcher with 'active_obj' name.
    so_5::disp::active_obj::create_disp_binder( "active_obj" ) );
// Will be an active object because there is default dispatcher binder for the cooperation.
coop->make_agent< a_pinger_t >(...);
// Will be an active object too.
coop->make_agent< a_ponger_t >(...);

Or, the dispatcher binder could be specified in make_agent_with_binder/add_agent methods as an additional argument:

::c++
auto coop = env.create_coop( "ping_pong",
    // Agents will be bound to dispatcher with 'active_obj' name by default.
    so_5::disp::active_obj::create_disp_binder( "active_obj" ) );
// Will be an active object because there is a default dispatcher binder for the cooperation.
coop->make_agent< a_pinger_t >(...);
// Will be an active object too.
coop->make_agent< a_ponger_t >(...);
// Will be an active object but on a different dispatcher.
coop->add_agent( new a_shutdowner_t(...),
    so_5::disp::active_obj::create_disp_binder( "shutdowner" ) );
// Will be part of active group 'g1' on dispatcher 'groups'.
coop->make_agent_with_binder< a_group_member_t >(
    so_5::disp::active_group::create_disp_binder( "groups", "g1" ),
    ... /* Parameters for a_group_member_t's constructor */ );
// Will be part of active group 'g2' on dispatcher 'groups'.
coop->make_agent_with_binder< a_group_member_t >(
    so_5::disp::active_group::create_disp_binder( "groups", "g2" ),
    ... /* Parameters for a_group_member_t's constructor */ );

Private Dispatchers

Private dispatchers are introduced in v.5.5.4. They are accessible via direct references obtained during dispatchers creation. Private dispatchers are destroyed automatically when no one is used them.

Every type of dispatchers in SObjectizer provides functions create_private_disp which are used for creation of private dispatcher instance. A smart pointer to newly created dispatcher is returned. This pointer could be used for binding agents to that dispatcher:

::c++
void
create_processing_coops( so_5::environment_t & env )
{
    std::size_t capacities[] = { 25, 35, 40, 15, 20 };

    // Private dispatcher for receivers.
    auto receiver_disp = so_5::disp::thread_pool::create_private_disp( env, 2 );
    // And private dispatcher for processors.
    auto processor_disp = so_5::disp::active_obj::create_private_disp( env );

    int i = 0;
    for( auto c : capacities )
    {
        auto coop = env.create_coop( so_5::autoname );

        auto receiver = coop->make_agent_with_binder< a_receiver_t >(
                // Getting binder for the dispatcher and using that binder
                // for new agent.
                receiver_disp->binder( so_5::disp::thread_pool::bind_params_t{} ),
                "r" + std::to_string(i), c );

        const auto receiver_mbox = receiver->so_direct_mbox();

        coop->make_agent_with_binder< a_processor_t >(
                // Getting binder for the dispatcher and using that binder
                // for new agent.
                processor_disp->binder(),
                "p" + std::to_string(i), receiver_mbox );

        env.register_coop( std::move( coop ) );

        ++i;
    }
}

It is recommended to prefer to use private dispatchers instead of public dispatchers.

More information about private dispatchers can be found here: [so-5.5.4 Private dispatchers].

More About Dispatchers

For more information about dispatchers please see [so-5.5 In-depth - Dispatchers] section.

Delayed and Periodic Messages/Signals

Often it is necessary to send a message/signal which must be delayed in some time. The simplest way to do that is to use so_5::send_delayed function. For example:

:::c++
void a_transaction_handler_t::evt_initiate_request( const msg_request_data & msg )
    {
        // Store the request info for further actions...
        save_request_info( msg );
        // ...initiate a request to remove site...
        send_request( msg );
        // ...and wait for some time.
        so_5::send_delayed< msg_request_timedout >(
            so_environment(),
            m_self_mbox,
            // Message delayed for 500 millisecons.
            std::chrono::milliseconds( 500 ),
            // Will be passed to msg_request_timedout's constructor.
            msg );
    }
void a_transaction_handler_t::evt_timeout( const msg_request_timedout & msg )
    {
        // Some actions dealing with absence of response from remote site...
    }

There is a variant of so_5::send_delayed function for signals:

:::c++
struct msg_watch_dog : public so_5::rt::signal_t {};

so_5::send_delayed< msg_watch_dog_time >( *this, mbox, delay );

There are schedule_timer() methods which return so_5::timer_id_t objects. That object is a kind of smart pointer references to the delayed message instance. Timer ID could be used for canceling delivery of the delayed message:

:::c++
void a_transaction_handler_t::evt_initiate_request( const msg_request_data & msg )
    {
        // Store the request info for further actions...
        save_request_info( msg );
        // ...initiate a request to remove site...
        send_request( msg );
        // ...and wait for some time.
        m_timeout_id = so_environment().schedule_timer( new msg_request_timedout( msg ),
            m_self_mbox,
            // Message is delayed for 500 millisecons.
            std::chrono::milliseconds( 500 ) );
    }
void a_transaction_handler_t::evt_timeout( const msg_request_timedout & msg )
    {
        // Some actions dealing with absence of response from remote site...
    }
void a_transaction_handler_t::evt_response( const msg_response_data & msg )
    {
        // Response received. So, there is no need to deliver timeout message.
        m_timeout_id->release();

        // Other response processing actions here...
    }

As usual there is a variant of schedule_timer() for signals:

:::c++
m_watchdog_id = so_environment().schedule_timer< msg_watch_dog >( mbox, delay );

The schedule_timer() also can schedule delivery of periodic messages/signals. The last argument for schedule_timer(), which has the default value 0, must be used for that. It defines a period of message delivery.

:::c++
// Scheduling watch_dog signal.
m_watchdog_id = so_environment().schedule_timer< msg_watch_dog >(
    // Mbox for signal to be delivered to.
    mbox,
    // Delay for the first appearance of the signal.
    std::chrono::milliseconds( 150 ),
    // A period for signal repetitions. In milliseconds.
    std::chrono::milliseconds( 300 ) );

Since v.5.5.1 there is another way for sending periodic messages and signals by so_5::send_periodic function:

::c++
// Sending periodic message.
m_timeout_id = so_5::send_periodic< msg_request_timedout > (
    so_environment(),
    mbox,
    std::chrono::milliseconds( 500 ),
    std::chrono::milliseconds( 150 ),
    msg );
// Sending periodic signal.
m_watchdog_id = so_5::send_periodic< msg_watch_dog >(
    so_environment(),
    mbox,
    std::chrono::milliseconds( 150 ),
    std::chrono::milliseconds( 300 ) );

Note. In the case of schedule_timer() for a periodic delivery of message with actual data the same message object will be delivered every time.

:::c++
struct msg_hello : public so_5::rt::message_t {};

void a_demo_t::so_define_agent()
    {
        so_subscribe_self().event(
            []( const msg_hello & msg ) {
                // The same pointer value will be printed for every message appearance.
                std::cout << "msg pointer: " << &msg << std::endl;
            } );
    }

void a_demo_t::so_evt_start()
    {
        m_hello_id = so_5::send_periodic< msg_hello >(
            *this,
            std::chrono::milliseconds( 100 ),
            std::chrono::milliseconds( 100 ) );
    }

For more information about this topic see [so-5.5 In-depth - Timers].

Agent's States

Agents are often used to implement some kinds of finite automata. Based on that, SObjectizer has a facility to define states of agents explicitly. State restricts the set of messages which could be handled by agent in that state.

To define a state for an agent it is necessary to declare and initialize so_5::state_t object as an agent attribute:

:::c++
class a_transaction_handler_t : public so_5::rt::agent_t
  {
    // Agent is waiting for an initial request.
    const so_5::state_t st_wait_for_request{ this, "waiting for request" };
    // Agent is waiting for a response from remote state.
    const so_5::state_t st_wait_for_remove_site_response{ this, "waiting for response" };
    // Agent doesn't receive any response.
    const so_5::state_t st_response_timedout{ this, "no response received" };
    // The response is received and agent is handing it.
    const so_5::state_t st_response_processing{ this, "processing" };
  ...
  };

Every agent in SObjectizer has the default state. It is defined by SObjectizer and can be accessed through agent_t::so_default_state() method.

Every agent starts its work in the default state. Because of that if an agent needs to be started in another state the switch must be performed in so_define_agent() method:

:::c++
void a_transaction_handler_t::so_define_agent() override
    {
        so_change_state( st_wait_for_request );

        /* Other initialization actions here... */
    }

The so_subscribe().event() method chain, example of which is shown above, does a subscription of event handler in default state. If an agent has several states it should specify a state in so_subscribe().event() chain by in() method:

:::c++
void a_transaction_handler_t::so_define_agent()
    {
        so_change_state( st_wait_for_request );

        // Message msg_request_data will be processed only in st_wait_for_request state.
        so_subscribe( m_self_mbox ).in( st_wait_for_request )
            .event( &a_transaction_handler_t::evt_request );

        // Response will be processed only in st_wait_for_response state.
        so_subscribe( m_self_mbox ).in( st_wait_for_response )
            .event( &a_transaction_handler_t::evt_response );

        // Timeout must be processed in two states.
        so_subscribe( m_self_mbox )
            .in( st_wait_for_request )
            .in( st_wait_for_response )
            .event( &a_transaction_handler_t::evt_timeout );
    }

Since v.5.5.1 there are more concise way to change agent's state and to subscribe agent's events with respect to agent's states. The example above can be rewritten this way:

:::c++
void a_transaction_handler_t::so_define_agent()
    {
        this >>= st_wait_for_request; // Or: st_wait_for_request.activate();

        // Message msg_request_data will be processed only in st_wait_for_request state.
        st_wait_for_request.event( m_self_mbox, &a_transaction_handler_t::evt_request );

        // Response will be processed only in st_wait_for_response state.
        st_wait_for_response.event( m_self_mbox, &a_transaction_handler_t::evt_response );

        // But if an event handler is used in different states then
        // old subscription chain must be used.
        so_subscribe( m_self_mbox )
            .in( st_wait_for_request )
            .in( st_wait_for_response )
            .event( &a_transaction_handler_t::evt_timeout );
    }

When a message/signal is sent to an mbox it will be stored in the agent's message queue regardless of the current agent state. The state of agent is checking when a message/signal is extracted from the queue and it's ready to be processed. SObjectizer checks the current agent state and if the message is subscribed in the current state the message will be passed to agent's event handler. Otherwise, the message is discarded and the next message is extracted from the queue.

To switch agent from one state to another agent_t::so_change_state() method must be used. Or operator>>=. Or state_t::activate() method:

:::c++
void a_transaction_handler_t::evt_request( const msg_request_data & msg )
    {
        so_change_state( st_wait_for_response );
        ...
    }
void a_transaction_handler_t::evt_response( const msg_response_data & msg )
    {
        this >>= st_response_processing;
        ...
    }
void a_transaction_handler_t::evt_timeout( const msg_timedout & msg )
    {
        st_response_timeout.activate();
        ...
    }

Those methods do the same thing -- switch agent to the new state. The choice between so_change_state, >>= or activate is just matter of user taste.

Note. Only ordinary agents can have a state. Ad-hoc agents have no possibility to work with agent states.

Since v.5.5.15 SObjectizer supports advanced features like composite states, shallow- and deep-history, on_enter/on_exit handler, time limits, events deferring and suppression. See [so-5.5 In-depth - Agent States] for more details.

Asynchronous and Synchronous Agent's Interaction

The main and primary form of agent's interaction in SObjectizer is asynchronous message passing. It means that if an agent requires some response from another agent then the application logic of the first agent should be splitted into several event handling methods. For example:

:::c++
// Messages for agent's interaction.
struct msg_get_status : public so_5::signal_t {};
struct msg_current_status : public so_5::message_t
    {
        std::string m_status;
        msg_current_status( const std::string & s ) : m_status( s )
            {}
    };

// The first agent. It needs the status from another agent.
void a_status_requester_t::so_evt_start()
    {
        /* Do something by itself... */

        // There we should ask status of another agent.
        so_5::send< msg_get_status >( m_other_agent_mbox );
        // Response will be received and processed in different event handler.
    }
void a_status_requester_t::evt_status_response( const msg_current_status & msg )
    {
        if( "ok" == msg.m_status )
            // One scenario...
        else
            // The other scenario...
    }

// The second agent. It provides the current status.
void a_status_provider_t::evt_get_status()
    {
        so_5::send< msg_current_status >( m_self_mbox, "ok" );
    }

That approach requires a lot of coding but has some advantages. It is scalable. It is hard to create a deadlock. The request-response handling could be more flexible. A requester could interact with several different agents at once, and so on.

Fortunately, SObjectizer has a possibility to use some form of synchronous interaction. Agent could send service request to another agent and then synchronously wait for a response on std::future object. The sample above could be rewritten in the following way:

:::c++
// Messages for agent's interaction.
struct msg_get_status : public so_5::signal_t {};

// The first agent. It needs the status from another agent.
void a_status_requester_t::so_evt_start()
    {
        /* Do something by itself... */

        // Do request of another agent status.
        std::future< std::string > s =
                so_5::request_future<std::string, msg_get_status>(m_other_agent_mbox);
        if( "ok" == s.get() )
            // One scenario...
        else
            // The other scenario...
    }

// The second agent. It provides the current status.
std::string a_status_provider_t::evt_get_status()
    {
        return "ok"
    }

Or:

:::c++
// Do request of another agent status.
std::string s = so_5::request_value<std::string, msg_get_status >(
        m_other_agent_mbox, so_5::infinite_wait);
if( "ok" == s )
    // One scenario...
else
    // The other scenario...

However, the synchronous interaction could lead to deadlocks and requires more attention from the programmer. So, it should be used very carefully.

For more information about synchronous interaction see [so-5.5 In-depth - Synchronous Interaction] .

Message Chains

The main purpose of message chain (or just mchain) mechanism is providing a way for interacting between SObjectizer- and non-SObjectizer-part of an application. The interaction in opposite direction is very simple: usual message passing via mboxes is used. But how to receive some messages from an agent back to non-SObjectizer-part of the application?

Message chain is the answer.

Message chain looks almost like mbox for agents. An agent can send messages to mchain exactly the same way as for mbox. So mchain can be passed to agent and agent will use it as destination for reply messages. On the other side of a mchain will be non-SObjectizer message handler. This handler will receive messages from the mchain by using special API functions and handle them appropriately.

In the following example an agent of type data_reader reads data sample from some source and passes them to a mchain as instances of type data_sample. This mchain is being read outside of SObjectizer Environment by the help of so_5::receive function:

::c++
class data_producer : public so_5::agent_t {
public:
    data_producer(context_t ctx, so_5::mchain_t data_ch)
        : so_5::agent_t{ctx}, m_data_ch{std::move(data_ch)}
    {}
    ...
private:
    const so_5::mchain_t m_data_ch;
    ...
    void read_data_event() {
        auto data = read_next_data();
        if(data_read())
            // New data sample read successfully and can be delivered to the consumer.
            so_5::send<data_sample>(m_data_ch, data);
        else {
            // Data source is closed. No more data can be received.
            // Close the data chain and finish out work.
            m_data_ch->close();
            so_deregister_coop_normally();
        }
    }
};
...
// Non-SObjectizer part of an application.
so_5::mchain_t ch = env.create_mbox(...);
env.introduce_coop([&](so_5::coop_t & coop) {
    coop.make_agent<data_reader>(
        // Passing mchain as mchain to data_reader agent.
        ch);
...
} );
...
// Receive and handle all data samples until the chain will be closed.
receive(from(ch), [](const data_sample & data) {...});

For more information about mchain see [so-5.5 In-depth - Message chains] section.

Immutable and Mutable Messages

By default all messages are immutable. It makes possible 1-to-many delivery of a message to several subscribers at the same. For example when dest is a MPMC mbox then send<Msg>(dest,...) delivers an instance of immutable message of type Msg to all subscribers (several event handlers for that instance can be run on parallel work threads at the same time). Interaction by means of immutable messages is very easy and powerful approach for building concurrent applications on top of publish-subscribe and Actor models: any messages can be redirected to any number of receivers or can be stored for furher processing.

But sometimes interaction by means of immutable messages has some drawbacks. For example when agent A opens a file and wants to delegate file processing to agent B. Something like:

struct transfer_handle_file final : public so_5::message_t {
    File m_file;
    transfer_handle_file(File f) : m_file(stf::move(f)) {}
};

class A : public so_5::agent_t {
    void on_handle_file_cmd(const handle_file & cmd) {
        File f(cmd.file_name());
        if(f)
            // Open operation successed. File must be sent to agent B.
            so_5::send<transfer_handle_file>(dest, std::move(f));
    }
    ...
};
class B : public so_5::agent_t {
    void on_transfer_handle_file(const transfer_handle_file & cmd) {
        File f(std::move(cmd.m_file)); // WON'T COMPILE!!!
        ...
    }
    ...
};

It won't work because cmd is a const in B::on_transfer_handle_file.

A strightforward but very dangerous solution: use of mutable keyword:

struct transfer_handle_file final : public so_5::message_t {
    mutable File m_file;
    transfer_handle_file(File f) : m_file(std::move(f)) {}
};

It is dangerous because there is no any guarantees that dest is a MPSC mbox. If dest is a MPMC mbox then the message with mutable m_file can be received by several agents and this can lead to variuos errors.

Since v.5.5.19 there is a possibility to send a mutable message. The example above can be rewritten as such:

struct transfer_handle_file final : public so_5::message_t {
    File m_file;
    transfer_handle_file(File f) : m_file(stf::move(f)) {}
};

class A : public so_5::agent_t {
    void on_handle_file_cmd(const handle_file & cmd) {
        File f(cmd.file_name());
        if(f)
            // Open operation successed. File must be sent to agent B.
            // Message transfer_handle_file will be sent as a MUTABLE message.
            // `dest` must be a MPSC mbox, an exception will be thrown if not.
            so_5::send<so_5::mutable_msg<transfer_handle_file>>(dest, std::move(f));
    }
    ...
};
class B : public so_5::agent_t {
    // Message transfer_handle_file will be received as MUTABLE message.
    void on_transfer_handle_file(mutable_mhood_t<transfer_handle_file> cmd) {
        File f(std::move(cmd->m_file)); // It's OK now.
        ...
    }
    ...
};

Immutable messages can be sent to MPMS mboxes or mchains. But they can't be sent to MPMC mboxes (an exception will be throw at such attempt).

Immutable messages can be sent as delayed message. But they can't be sent as periodic messages.

Event-handlers for mutable messages can have on of the following formats:

result_type handler(so_5::mhood_t<so_5::mutable_msg<Msg>>);
result_type handler(so_5::mutable_mhood_t<Msgs>);

Note: mutable_mhood_t<M> is just a shorthand for mhood_t<mutable_msg<M>>.

Immutable message Msg is considered to have a different type than mutable message mutable_msg<Msg>. It allows to have different message handlers for Msg and mutable_msg<Msg>:

class demo : public so_5::agent_t {
    virtual void so_define_agent() override {
        so_subscribe_self().event(&demo::one).event(&demo::two);
    }
    virtual void so_evt_start() override {
        so_5::send<Msg>(*this, ...);
        so_5::send<so_5::mutable_msg<Msg>>(*this, ...);
    }
    void one(mhood_t<Msg>) { std::cout << "One!" << std::endl; }
    void two(mutable_mhood_t<Msg>) { std::cout << "Two!" << std::endl; }
    ...
};

Non-handled Exceptions

C++ exceptions are actively used by SObjectizer. It's the only way to report about error in SObjectizer. Because of that it is possible to receive SObjectizer initiated exception in agent's event handler. If exception is not handled by agent and goes out from event handling method (or lambda function) then it is intercepted and handled by SObjectizer.

By default SObjectizer calls std::abort() for non-handled exceptions, and it could be customized by user. For the details of dealing with non-handled exceptions see [so-5.5 In-depth - Exceptions].

Further Reading

A description of very basic things is represented in this article. There are more complex and less common used features of SObjectizer. They are described in serie of In-depth articles:

[so-5.5 In-depth - Messages of Arbitrary Types]
[so-5.5 In-depth - Exceptions]
[so-5.5 In-depth - Synchronous Interaction]
[so-5.5 In-depth - Priorities of Agents]
[so-5.5 In-depth - Message Limits]
[so-5.5 In-depth - Message Delivery Filters]
[so-5.5 In-depth - Run-Time Monitoring]
[so-5.5 In-depth - Message Delivery Tracing]


Related

Wiki: Basics
Wiki: so-5.2.3 Parent-child cooperation relationship
Wiki: so-5.4.0 New type of mbox - multi-producer and single-consumer
Wiki: so-5.5 By Example Change State
Wiki: so-5.5 By Example Collector and Many Performers
Wiki: so-5.5 By Example Collector and Performer Pair
Wiki: so-5.5 By Example ConvertLib
Wiki: so-5.5 By Example Coop User Resources
Wiki: so-5.5 By Example Cooperation Notifications
Wiki: so-5.5 By Example Custom Error Logger
Wiki: so-5.5 By Example Delivery Filters
Wiki: so-5.5 By Example Exception Reaction
Wiki: so-5.5 By Example Machine Control
Wiki: so-5.5 By Example Many Timers
Wiki: so-5.5 By Example Minimal Ping-Pong
Wiki: so-5.5 By Example News Board
Wiki: so-5.5 By Example PImpl
Wiki: so-5.5 By Example Periodic Hello
Wiki: so-5.5 By Example Private Dispatcher for Children
Wiki: so-5.5 By Example Producer Consumer MChain
Wiki: so-5.5 By Example Queue Size Stats
Wiki: so-5.5 By Example Redirect and Transform
Wiki: so-5.5 By Example Simple Message Deadline
Wiki: so-5.5 By Example Subscriptions
Wiki: so-5.5 By Example Synchronous Services and Exceptions
Wiki: so-5.5 By Example Work Generation
Wiki: so-5.5 By Example Work Stealing
Wiki: so-5.5 In-depth - Agent States
Wiki: so-5.5 In-depth - Dispatchers
Wiki: so-5.5 In-depth - Exceptions
Wiki: so-5.5 In-depth - Message Delivery Filters
Wiki: so-5.5 In-depth - Message Delivery Tracing
Wiki: so-5.5 In-depth - Message Limits
Wiki: so-5.5 In-depth - Message chains
Wiki: so-5.5 In-depth - Messages of Arbitrary Types
Wiki: so-5.5 In-depth - Priorities of Agents
Wiki: so-5.5 In-depth - Run-Time Monitoring
Wiki: so-5.5 In-depth - Synchronous Interaction
Wiki: so-5.5 In-depth - Timers
Wiki: so-5.5.1 The autoname feature for cooperations
Wiki: so-5.5.3 Direct mboxes of ad-hoc agents
Wiki: so-5.5.4 Private dispatchers
Wiki: so-5.5.5 introduce_coop and introduce_child_coop helpers
Wiki: so-5.5.8 Simpification of ad-hoc agents subscription
Wiki: so-5.5.8 Simplification of usage of send-family functions with ad-hoc agents
Wiki: so-5.5.9 New format of send function

MongoDB Logo MongoDB