Menu

so-5.4 Basics

Yauheni Akhotnikau Boris Sivko

Introduction

so_5 is a framework for organizing application's work via message passing between agents. There are agents which send and receive messages or/and signals to each other. The messages are delivered via mail boxes and there are message queues where the messages are stored before processing. Some messages could be delayed (e.g. will be delivered after some time), and some of which could be repeated (e.g. will be delivered periodically again and again).

Every agent is working on some execution context, but user has no need to manage its working thread manually. All of these working threads are controlled by dispatchers. The user is responsible only for the selection of appropriate dispatchers and distribution of agents among them.

SObjectizer Environment

SObjectizer Environment (so_environment) is a container which comprises all the stuff. SObjectizer Environment is represented to the user as an interface so_5::rt::so_environment_t.

It is possible to create several instances of SObjectizer Environment inside one application, although the need for it arises very rarely. In such case, they will be totally independent instances of so_environment, which means that agents from one instance will work independently from agents from another instance.

The interface so_5::rt::so_environment_t is an abstract class. The concrete so_environment_t implementation is created inside the run_so_environment*() functions family. The only way to start SObjectizer Environment -- is to call one of so_5::api::run_so_environment*() functions.

Every run_so_environment*() function does the same sequence of steps:

  • instantiates a concrete implementation of so_5::rt::so_environment_t;
  • calls the user supplied initialization function;
  • starts SObjectizer Environment;
  • waits for SObjectizer Environment shutdown.

It means that run_so_environment() blocks the caller thread until SObjectizer Environment finished its work.

In simple application run_so_environment() is called only once in main() function, and afterwards the application finished when run_so_environment() returns. If user wants to do something else concurrently or to run another SObjectizer Environment instance he/she should create a dedicated thread on context of which run_so_environment() will be called.

The run_so_environment*() function has several forms but only two of them are basic and could be used in almost 99% cases. The first one receives just one argument -- it is a function object (or lambda) with initialization actions:

so_5::api::run_so_environment(
  []( so_5::rt::so_environment_t & env ) {
    auto coop = env.create_coop( "hello_world" );
    coop->define_agent().on_start(
      [&env]() {
        std::cout << "Hello, world!" << std::endl;
        env.stop();
      } );
    env.register_coop( std::move( coop ) );
  } );

The second one receives two arguments. They are functional objects. The first, like in previous case, does the initialization actions. The second argument sets up SObjectizer Environment's parameters:

so_5::api::run_so_environment(
  []( so_5::rt::so_environment_t & env ) {
    auto coop = env.create_coop( "hello_world",
      so_5::disp::active_obj::create_disp_binder( "active_obj" ) );
    coop->define_agent().on_start(
      [&env]() {
        std::cout << "Hello, world!" << std::endl;
        env.stop();
      } );
    coop->define_agent().on_start(
      []() { std::cout << "Hello form another thread" << std::endl; } );
    env.register_coop( std::move( coop ) );
  } ),
  []( so_5::rt::so_environment_params_t & p ) {
    p.add_named_dispatcher( "active_obj",
      so_5::disp::active_obj::create_disp() );
  } );

Messages and Signals

Interaction between agents is performed by sending messages. There are two types of messages:

  • the ordinary message with some data inside;
  • the special case of message where there is no any data except the fact of message existence. Such kind of message is signal.

Every message/signal must have a unique type -- a dedicated C++ class/struct.

For ordinary messages the corresponding C++ classes must be derived from so_5::rt::message_t.

For signals the corresponding C++ classes must be derived from so_5::rt::signal_t:

// This is a message type.
struct msg_convert : public so_5::rt::message_t
  { /* some fields here */ };
// This is a signal type.
struct msg_get_status : public so_5::rt::signal_t {};

All ordinary messages must be sent as dynamically allocated objects, e.g. instances of corresponding C++ classes are created by new operator:

mbox->deliver_message( new msg_convert( 42 ) );
// Or
std::unique_ptr< msg_convert > msg( new msg_convert( 42 ) );
mbox->deliver_message( std::move( msg ) );

All signals must be sent by special methods like:

mbox->deliver_signal< msg_get_status >();

So it is not necessary to create a signal instance.

Note. so_5 distinguishes message/signal by their C++ type. It necessary to define some different type for each message even if they have the same structure. For example, there are two different types of message:

struct msg_convert_to_long : public so_5::rt::message_t
  {
    int m_value;
  };
struct msg_convert_to_string : public so_5::rt::message_t
  {
    int m_value;
  };

But these are not:

struct msg_convert_int : public so_5::rt::message_t
  {
    int m_value;
  };
// IT WON'T WORK IN SO_5!
typedef msg_convert_int msg_convert_to_long;
typedef msg_convert_int msg_convert_to_string;

Mboxes

Mbox is a shorthand for the message box. Mbox is a term which is used in SObjectizer for two abstractions:

  • point/address/channel, to which messages and signals are sent;
  • point/address/channel, from which messages and signals are received.

There are two different types of mboxes in SObjectizer, each of them implemented in its own way. But there is an interface so_5::rt::mbox_t which is implemented by every type of mboxes. All interactions with mboxes are done via that interface.

Interface mbox_t is an abstract class. So it is impossible to store objects of mbox_t inside an agent or pass mbox_t as argument to a function/method or to message. Only references/pointers to mbox_t can be used. There is a special type so_5::rt::mbox_ref_t which is an analog of smart pointer/reference to the mbox_t. Objects of mbox_ref_t is the only way to deal with mboxes in SObjectizer.

All mboxes are controlled by SObjectizer Environment. The interface so_environment_t contains methods for creating mboxes. User holds references to mboxes by so_5::rt::mbox_ref_t objects. Every mbox will be automatically destroyed right after the moment, when all of mbox_ref to it are gone.

Multi-producer/multi-consumer mboxes

SObjectizer uses a lightweight publish-subscribe model which is implemented by multi-producer/multi-consumer (MPMC) mboxes. The MPMC mbox is like bulletin board: a producer publishes message/signal on it and all consumers of the message/signal type from the bulletin board receive the message/signal instance. It means that the producer should know the mbox, to which the messages will be sent (published). And the consumer also should know the mbox to make its message subscription. It is a task of user to share that knowledge between application's agents.

Mbox can deliver messages of different types. It means that user can send various messages of various types to a single mbox. And, it is not necessary to create different mboxes for different message types.

However, the subscription is made for some specific message type. For example, if an agent A is subscribed to messages of type msg_ping from mbox M, and an agent B is subscribed to messages of type msg_pong, then user can send msg_ping or msg_pong messages to M, but A will receive only msg_ping and B will receive only msg_pong.

There are two kinds of MPMC mboxes:

  • anonymous. These mboxes have no any identification. They are known only by mbox_ref's to them. In fact, if user lost mbox_ref to an anonymous mbox he/she lost the mbox too;
  • named. These mboxes should have unique names. SObjectizer Environment holds a dictionary of named mboxes. If user creates an mbox with already known name then the same mbox is returned.

An anonymous mbox can be created by so_environment_t::create_local_mbox() method without the name argument:

// Anonymous mbox will be created.
auto mbox = env.create_local_mbox();

A named mbox can be created (or a reference to already created mbox is obtained) by so_environment_t::create_local_mbox() method with the name argument:

// Named mbox is created or obtained.
auto mbox = env.create_local_mbox( "notification_channel" );

The choice between anonymous or named mboxes mainly depends on possibility to pass mbox-reference to agent's constructor. If it is possible then anonymous mboxes could be better choice. For example, if there are two agents which are constructed at the same time and should pass messages to each other then anonymous mboxes is the simplest case:

// Pinger agents. Should have its own mbox and mbox of ponger agent.
class a_pinger_t : public so_5::rt::agent_t
  {
  public :
    // Receives mbox-references in the constructor:
    a_pinger_t( so_5::rt::so_environment_t & env,
      // Reference to own mbox.
      const so_5::rt::mbox_ref_t & self_mbox,
      // Reference to ponger's mbox.
      const so_5::rt::mbox_ref_t & ponger_mbox )
      : so_5::rt::agent_t( env )
      , m_self_mbox( self_mbox )
      , m_ponger_mbox( ponger_mbox )
      ...
  ...
  };
// Ponger agents. Should have its own mbox and mbox of pinger agent.
class a_ponger_t : public so_5::rt::agent_t
  {
  public :
    // Receives mbox-references in the constructor:
    a_ponger_t( so_5::rt::so_environment_t & env,
      // Reference to own mbox.
      const so_5::rt::mbox_ref_t & self_mbox,
      // Reference to pinger's mbox.
      const so_5::rt::mbox_ref_t & pinger_mbox )
      : so_5::rt::agent_t( env )
      , m_self_mbox( self_mbox )
      , m_pinger_mbox( pinger_mbox )
      ...
  ...
  };
...
// Creation of mboxes.
auto pinger_mbox = env.create_local_mbox();
auto ponger_mbox = env.create_local_mbox();
// Creation of agents.
coop->add_agent( new a_pinger_t( env, pinger_mbox, ponger_mbox ) );
coop->add_agent( new a_ponger_t( env, ponger_mbox, pinger_mbox ) );

But there could be cases when agents are created in different parts of application (when so_sysconf is used for example) and direct exchange of mbox reference is difficult. In such cases named mboxes could help:

// Pinger agents. Should have its own mbox and mbox of ponger agent.
class a_pinger_t : public so_5::rt::agent_t
  {
  public :
    a_pinger_t( so_5::rt::so_environment_t & env )
      : so_5::rt::agent_t( env )
      , m_self_mbox( env.create_local_mbox( "pinger" ) )
      , m_ponger_mbox( env.create_local_mbox( "ponger" ) )
      ...
  ...
  };
// Ponger agents. Should have its own mbox and mbox of pinger agent.
class a_ponger_t : public so_5::rt::agent_t
  {
  public :
    a_ponger_t( so_5::rt::so_environment_t & env )
      : so_5::rt::agent_t( env )
      , m_self_mbox( env.create_local_mbox( "ponger" ) )
      , m_pinger_mbox( env.create_local_mbox( "pinger" ) )
      ...
  ...
  };
...
// Pinger is created in one place...
coop->add_agent( new a_pinger_t( env ) );
...
// Ponger is created somewhere else...
coop->add_agent( new a_ponger_t( env ) );

There could be more sophisticated cases when agents are created, destroyed and created again, and in this case the named mboxes could be useful. However, this is out of scope of this introduction to SObjectizer's basics.

There are two mbox's method families for message delivery. The first one is used to deliver ordinal messages. Methods of that family called deliver_message and type of message is deduced from actual argument:

m_ponger_mbox->deliver_message(
  // Message type is deduced from the argument.
  new msg_ping( ... ) );
// Or via unique_ptr wrapper...
std::unique_ptr< msg_ping > msg( new msg_ping(...) );
m_ponger_mbox->deliver_message(
  // Message type is deduced from the argument.
  msg );

The second one is used to deliver signals, the type of which must be specified explicitly:

m_pinger_mbox->deliver_signal< msg_pong >();

Multi-producer/single-consumer mboxes

MPMC mboxes are good when 1-to-many or many-to-many agents interaction is required. But they are too expensive for the case of 1-to-1 interaction. It is because message passing via MPMC mbox consists of two phases:

  • on the first phase there is a lookup for subscriber list for the specified message type;
  • on the second phase there is a check for possibility of handling this message type in the current subscriber’s state.

For the 1-to-1 agents interaction the first phase is redundant. The 1-to-1 interaction will be more efficient if a message is directly passed to the subscriber’s queue without looking up the subscriber list for the message type.

This is a goal for new type of mboxes in SObjectizer 5.4.0 -- direct and efficient support for 1-to-1 agents interaction.

In v.5.4.0 of SObjectizer the new type of mbox is implemented. It is a multi-producer/single-consumer (MPSC) message box. These mboxes are automatically created by SObjectizer for every agent. When a constructor of agent_t finishes its work there is an MPSC mbox for the agent. That mbox can be accessed via agent_t::so_direct_mbox() method. That mbox belongs to the agent for which mbox is created. That agent is the owner of the MPSC mbox.

MPSC mbox implements mbox_t interface just as MPMC mbox. References to MPSC mboxes can be stored by mbox_ref_t objects. References to MPSC mboxes in form of mbox_ref_t can be passed to other agents or can be stored anywhere user wants.

The example with ping-ping agents from above could be rewritten with MPSC mboxes in the following way:

// Pinger agents.
class a_pinger_t : public so_5::rt::agent_t
  {
  public :
    a_pinger_t( so_5::rt::so_environment_t & env )
      : so_5::rt::agent_t( env )
      {}
    // Reference to ponger's mbox cannot be passed to constructor
    // so it must be set by this method.
    void set_ponger_mbox( const so_5::rt::mbox_ref_t & mbox )
      {
        m_ponger_mbox = mbox;
      }
      ...
  ...
  };
// Ponger agents.
class a_ponger_t : public so_5::rt::agent_t
  {
  public :
    a_ponger_t( so_5::rt::so_environment_t & env )
      : so_5::rt::agent_t( env )
      {}
    // Reference to pinger's mbox cannot be passed to constructor
    // so it must be set by this method.
    void set_pinger_mbox( const so_5::rt::mbox_ref_t & mbox )
      {
        m_pinger_mbox = mbox;
      }
      ...
  ...
  };
...
// Creation of agents.
a_pinger_t * pinger = coop->add_agent( new a_pinger_t( env ) );
a_ponger_t * ponger = coop->add_agent( new a_ponger_t( env ) );
// Exchange of mboxes.
a_pinger->set_ponger_mbox( a_ponger->so_direct_mbox() );
a_ponger->set_pinger_mbox( a_pinger->so_direct_mbox() );

The main difference between MPSC and MPMC mboxes is the inability to subscribe to the messages from the mbox except for the owner of mbox.

For more information about MPSC mboxes see [so-5.4.0 New type of mbox - multi-producer and single-consumer].

Selection between MPMC and MPSC mboxes

So, there are two types of mboxes with different abilities and with a noticeable difference in effectiveness of message delivering to subscribes. Which one must be used and when?

MPMC mboxes must be used when an agent is a producer of some information which could be processed by several different agents. Or, when there are several producers for information and several consumers for that information.

Lets consider an agent which reads one socket transform data and writes it to another socket. From time to time that agent sends a message about its current state: state of connections, amount of processed data and so on. If this message is sent to MPMC mbox it could be processed by different agents. One could transform that message to one or more SNMP data sources to be available for various monitoring tools like Zabbix or HP OpenView. Another receiver could use this status message for overloading control. Another receiver could write content of this status message to a log-file. And so on.

MPSC mboxes must be used when agent is a receiver of some agent-specific information and/or agent-specific commands. For example, the message about need to reconfigure agent is agent-specific and it is better to send it to agent’s MPSC mbox than to common MPMC mbox, which is shared between several agents.

Note. There is a significant trait of MPSC mbox in comparison with MPMC mbox: because there is absence of the first phase (lookup for a list of subscribers by the message type) it is possible to push a message to agent’s queue even if the agent is not subscribed to it. This message will be rejected on the second phase when the checking for ability of the message processing is performed.

Cooperations

The cooperation is a kind of container for introducing (registering) and removing (deregistering) agents to/from SObjectizer Environment. With the help of cooperation some several agents which are dependent on one another could be registered at once in transaction mode: all of that agents are registered successfully or no one of them.

Each cooperation must have a unique name. SObjectizer Environment maintains a dictionary of registered cooperations. An attempt to register a cooperation with already known name is an error.

Every cooperation creation consists of three stages:

  1. A cooperation object must be created by so_environment_t::create_coop() method.
  2. The cooperation must be filled up with agents by agent_coop_t::add_agent() method.
  3. The cooperation must be registered by moving to so_environment_t::register_coop() method.

For example:

// Creation of cooperation.
auto coop = env.create_coop( "ping_pong_demo" );
// Creation of agents.
coop->add_agent( new a_pinger_t( env, pinger_mbox, ponger_mbox ) );
coop->add_agent( new a_ponger_t( env, ponger_mbox, pinger_mbox ) );
// Registration of cooperation.
env.register_coop( std::move( coop ) );

To deregister cooperation so_environment_t::deregister_coop() method must be used. It receives the cooperation name to be deregistered:

env.deregister_coop( "ping_pong_demo" );

Please note, the deregistration can take some time. The method deregister_coop() only starts the deregistration procedure. But deregistration is completed only when all of cooperation agents finished their message processing. Whereas the deregistration procedure is active, their agents cannot receive any new messages. The agents only handle their old messages which had been dispatched before deregister_coop() call.

In the case where a cooperation should contain only one agent the register_agent_as_coop() method can be used:

// Create agent and register it as a cooperation "demo".
env.register_agent_as_coop(
  // Cooperation name.
  "demo", 
  // The single agent of cooperation.
  new a_demo_t( env, ... ) );

There is also optional parent-child relation between cooperations. It allows to specify some exist parent cooperation for a new cooperation before registration:

void a_connection_manager_t::evt_new_connection( const msg_new_connection & evt )
{
    // New connection must be served by new cooperation.
    auto coop = so_environment().create_coop( "connection_handler" );
    // New cooperation is a child for this cooperation.
    coop->set_parent_coop_name( so_coop_name() );
    ... // Filling new cooperation with agents.
    so_environment().register_coop( std::move( coop ) );
}

SObjectizer guarantees that children cooperation will be destroyed before its parent cooperation. It is useful for some reasons:

  • no need to control lifetime of child cooperation. It will be automatically deregistered when deregistration of the parent cooperation is started;
  • it is safe to pass references to objects which are owned by parent cooperation to the child cooperation. For example, parent cooperation can create connection to database and holds DB connection object as an attribute in some agent. Reference to that DB connection object can be safely passed to child cooperation because lifetime of DB connection object will be longer then lifetime of child cooperation.

For more information about parent-child relation see [so-5.2.3 Parent-child cooperation relationship].

Agents

Agents are objects which receive and handle messages and signals.

Every agent has its own message queue. When someone sends a message/signal to which an agent is subscribed a reference to that message/signal is stored in agent's message queue. Agents periodically get messages from their queues and process them. Processing of message/signal from queue is called 'event handling'.

There are two kinds of agents in SObjectizer: ordinary agents and ad-hoc agents.

Ordinary Agents

Ordinary agents, which exist from the very first version of SObjectizer, are the most flexible and full fledged agents.

To define an ordinary agent it is necessary to derive a new class from so_5::rt::agent_t:

class a_pinger_t : public so_5::rt::agent_t
  { /* class definition here */ }

Subsequently, instances of that class may be used for some cooperation.

There are several virtual methods of so_5::rt::agent_t class which could be used for agent tuning.

The first one is so_define_agent(). It is called inside register_coop() before an agent starts its work. The main purpose of so_define_agent() is to provide a possibility to make subscriptions. Usually agents make their subscription in so_define_agent():

void a_pinger_t::so_define_agent()
  {
    so_subscribe( so_direct_mbox() ).event( &a_pinger_t::evt_pong );
    ...
  }

The second one is so_evt_start(). It is called as the very first agent event after its successful registration of agent's cooperation. It could be used for doing something at the start of agent's work:

void a_pinger_t::so_evt_start()
  {
    m_ponger_mbox->deliver_message( new msg_ping( "Start!" ) );
  }

The main difference between so_define_agent() and so_evt_start() is the following: so_define_agent() is called on the context of thread where register_coop() is called. Whereas so_evt_start() is called on the context of agent's working thread (more about it see Dispatchers section).

The next important virtual method is so_evt_finish(). It is the opposite to so_evt_start(). This method is the last method called for agent on the context of agent's working thread.

The pair of so_evt_start()/so_evt_finish() could be regarded as analogs of constructor/destructor. The method so_evt_start() could be used for allocating some resource and so_evt_finish() for deallocating it:

void a_db_manager_t::so_evt_start()
  {
    // Create a connection to DB.
    m_connection = create_database_connection( ... );
    ...
  } 
void a_db_manager_t::so_evt_finish()
  {
    // Close the connection gracefully.
    m_connection.close();
  }

The usual way for message handling for ordinary agents is to define a dedicated non-static method for each message type. For the cases of messages (e.g. messages with data inside) there could be two forms of such methods:

// Long form:
result_type event_handler( const so_5::rt::event_type_t< MSG > & evt );

// Short form:
result_type event_handler( const MSG & evt );

There is no need to declare message type during event subscription -- it is deduced from the method's prototype:

void a_ponger_t::so_define_agent()
  {
    so_subscribe( so_direct_mbox() ).event( &a_ponger_t::evt_ping );
  }
void a_ponger_t::evt_ping( const msg_ping & msg )
  { ... }

The difference between long and short forms is the possibility of calling event_type_t::make_reference() method for resending the same message object. If this possibility is not necessary then the short form is preferable.

For the cases of signals (e.g. messages without actual data) there could be two forms of signal-handlers. The first one is the same as the first form of message handlers:

// Long form.
result_type event_handler( const so_5::rt::event_type_t< SIGNAL > & evt );

There is also no need to declare any signal type during the signal subscription:

void a_pinger_t::so_define_agent()
  {
    so_subscribe( so_direct_mbox() ).event( &a_pinger_t::evt_pong );
  }
void a_ponger_t::evt_ping( const so_5::rt::event_type_t< msg_pong > & msg )
  { ... }

This long form for signal handlers is supported for historical reasons as well as for simplification in case of heavy using of C++ templates in agents code (in such case there is no difference between message and signal subscription and handler's declaration).

The second form of signal handlers is the short form:

// Short form for signal handler.
result_type event_handler();

In this case, type of signal should be explicitly specified during the signal subscription:

struct msg_shutdown : public so_5::rt::signal_t {};
struct msg_get_status : public so_5::rt::signal_t {};

void a_watch_dog_t::so_define_agent()
  {
    so_subscribe( so_direct_mbox() ).event(
      // Type of the signal to handle.
      so_5::signal< msg_shutdown >,
      // Handler.
      &a_watch_dog_t::evt_shutdown );

    so_subscribe( so_direct_mbox() ).event(
      // Type of the signal to handle.
      so_5::signal< msg_get_status >,
      // Handler.
      &a_watch_dog_t::evt_get_status );
  }
void a_watch_dog_t::evt_shutdown()
  {
    so_environment().stop();
  }
std::string a_watch_dog_t::evt_get_status()
  {
    return m_everything_fine ? "ok" : "failure";
  }

There is a possibility to use C++11 lambda functions as event handlers. They should be used as arguments to event() method in subscription chain:

void a_watch_dog_t::so_define_agent()
  {
    so_subscribe( so_direct_mbox() ).event(
      // Subscription for a message.
      // Type of the message will be deduced.
      [this]( const msg_failure_detected & msg ) {
        m_everything_fine = false;
        m_failure_description = msg.reason();
      } );

    so_subscribe( so_direct_mbox() ).event(
      // Subscription for a signal.
      // Type of the signal must be specified.
      so_5::signal< msg_shutdown >,
      [this]() { so_environment().stop(); } );

    so_subscribe( so_direct_mbox() ).event(
      // Subscription for a signal.
      // Type of the signal must be specified.
      so_5::signal< msg_get_status >,
      [this]() -> std::string {
        return m_everything_fine ? "ok" : "failure";
      } );
  }

Note. The message handler defined via lambda function cannot receive argument of type const so_5::rt::event_data_t<MSG>&, only as const MSG&.

Since v.5.4.0 every ordinary agent has its own MPSC mbox. This mbox is automatically created in the base class constructor -- so_5::rt::agent_t. Reference to direct agent's mbox can be obtained by agent_t::so_direct_mbox() method.

Ad-hoc Agents

Ad-hoc agents are agents which could be constructed and registered without necessity of defining dedicated C++ class.

To construct an ad-hoc agent it is necessary to call define_method for a cooperation object. This method returns a special proxy object which can be used to define the behavior of a new agent. For example, the next method chain creates the agents which handles a message and a signal:

auto coop = env.create_coop( "sample" );
// Definition of ad-hoc agent.
coop->define_agent()
   // First event for that agent.
   .event(
      // A mbox as message source.
      some_mbox,
      // Event handling code. Type of message is deduced automatically.
      []( const msg_convert & msg ) -> int {
         return std::atoi( msg.m_value );
      } )
   // Second event for that agent.
   .event(
      // A mbox as message source.
      some_mbox,
      // Type of signal. Must be specified explicitly.
      so_5::signal< msg_get_status >,
      // Event handling code.
      []() -> std::string {
         return "ready";
      } );

There is no C++ class for ad-hoc agent, and that's why there are no virtual methods like so_define_agent(), so_evt_start(), so_evt_finish() and so on.

Subscriptions for ad-hoc agent should be done in define_agent().event() chains as shown above.

To define the start action for ad-hoc agent the define_agent().on_start() chain must be used:

coop->define_agent()
  // The start action for the agent.
  .on_start( []() { std::cout << "Hello, world" << std::endl; } )
  ... /* other agent definitions... */;

To define the finish action for ad-hoc agent the define_agent().on_start() chain must be used:

coop->define_agent()
  // The finish action for the agent.
  .on_finish( []() { std::cout << "Bye, world" << std::endl; } )
  ... /* other agent definitions... */;

Thread safety for event handlers

By default SObjectizer thinks that all event handlers are not thread safe. Standard dispatchers guarantee that not_thread_safe event handlers will work alone and only on the context of one working thread.

Since v.5.4.0 it is possible to specify that an event handler is a thread safe handler. It means that event handler doesn't change the agent state (or, does it in a thread safe manner). If user specifies an event handler as thread_safe handler it gives to SObjectizer's dispatcher a right of running thread-safe handlers of the same agent in parallel on the different working threads. However, with the following conditions:

  • a not_thread_safe event handler can't be started until there is any other running event handler of the agent. The start of not_thread_safe event handler will be delayed until all already running event handlers finish their work;
  • no one event handler can't be started until there is a working not_thread_safe event handler;
  • no one thread_safe event handler can't be started until there is a working not_thread_safe event handler.

To specify thread_safe event handler it is necessary to pass the additional argument to event() method in subscription chain:

void so_define_agent() override
   {
      so_subscribe( so_direct_mbox() ).in( st_disconnected )
         // thread_safety flag is not specified.
         // This is not_thread_safe event handler and it
         // can't work in parallel with other event handlers
         // because it changes agent's state.
         .event( so_5::signal< msg_reconnect >, &my_agent::evt_reconnect )
         // This is thread_safe agent and it doesn't change agent's state.
         .event(
               &my_agent::evt_send_when_disconnected,
               so_5::thread_safe )
         // This is thread_safe agent and it doesn't change agent's state.
         .event( so_5::signal< msg_get_status >,
               []() -> std::string { return "disconnected"; },
               so_5::thread_safe );
      ...
   } 

As an example of thread_safe and not_thread_safe event handler, lets see an agent for performing cryptography operations: encrypting and decrypting. These operations are stateless and can be done in parallel. But the agent must be reconfigured from time to time. The reconfiguration operation is stateful and event handler for reconfigure message is not_thread_safe.

User must only specify the thread_safety flag for those event handlers. All of other work will be done by SObjectizer. It means that evt_encrypt and evt_decrypt will be running in parallel until an occurrence of msg_reconfigure. SObjectizer will wait until all previously started evt_encrypt/evt_decrypt finish their work and only then will start evt_reconfigure. While evt_reconfigure is working all of other event handlers will be delayed and will be started only after return from evt_reconfigure:

class a_cryptographer_t : public so_5::rt::agent_t 
    {
    public :
        virtual void
        so_define_agent() override
            {
                // not_thread_safe event handler.
                so_subscribe( so_direct_mbox() )
                    .event( &a_cryptographer_t::evt_reconfigure );

                // thread_safe event handlers.
                so_subscribe( so_direct_mbox() )
                    .event( &a_cryptographer_t::evt_encrypt, so_5::thread_safe )
                    .event( &a_cryptographer_t::evt_decrypt, so_5::thread_safe );
            }

        void
        evt_reconfigure( const msg_reconfigure & evt ) { ... }

        encrypt_result_t
        evt_encrypt( const msg_encrypt & evt ) { ... }

        decrypt_result_t
        evt_decrypt( const msg_decrypt & evt ) { ... }
    };

Dispatchers

Every agent should work on some execution context: some working thread must be assigned to an agent (or, in SObjectizer's terms, agent must be bound to some working thread). The special object is responsible for that -- dispatcher.

The dispatchers could be seen as managers for working threads. SObjectizer's user is free from thread management. All threads are started and stopped by dispatcher under SObjectizer Environment control. All that the user should do is to specify which dispatcher should be used by SObjectizer Environment and bind agents to appropriate dispatchers.

There are five of dispatcher types which are available for the user "out of the box":

  • the simplest one, which binds every agent to the same working thread. It is called one_thread dispatcher;
  • the dispatcher, which binds an agent to a dedicated working thread. The agent becomes an active object -- it works on its own thread and doesn't share this thread with the other agents. That dispatcher is called active_obj dispatcher;
  • the dispatcher, which binds an agent group to one dedicated working thread. Every agent within the group works on the same working thread but the other agents work on different threads. That dispatcher is called active_group dispatcher;
  • the dispatcher, which uses pool of working threads and distributes event handlers between them. It is called thread_pool dispatcher. This dispatcher doesn't distinguish between not_thread_safe and thread_safe event handlers and assumes that all event handlers are not_thread_safe. It means that agent work only on one working thread and no more than one event at a time;
  • the dispatcher, which uses pool of working threads and distinguish between not_thread_safe and thread_safe event handlers. It is called adv_thread_pool dispatcher. This dispatcher allows to run several of thread_safe event handlers of one agent on different threads in parallel.

The one_thread dispatcher is available to user as a default dispatcher. There is no need to do something to launch that kind of dispatcher. By default all agents are handled by that default dispatcher.

The other types of dispatchers require some user actions. First of all, dispatcher instance must be created and passed to SObjectizer Environment. Each dispatcher instance must have a unique dispatcher name. The name will be used later to bind agents to the dispatcher.

Dispatcher is passed to SObjectizer Environment via so_environment_params_t object which could be tuned in the run_so_environment function:

so_5::api::run_so_environment(
    []( so_5::rt::so_environment_t & env ) { /* some initial actions */ },
    // SObjectizer's parameters tuning.
    []( so_5::rt::so_environment_params_t & params ) {
        // Add an instance of active_obj dispatcher with name 'active_obj'.
        p.add_named_dispatcher( "active_obj",
            so_5::disp::active_obj::create_disp() );
        // Add another instance of active_obj dispatcher with name 'shutdowner'.
        p.add_named_dispatcher( "shutdowner",
            so_5::disp::active_obj::create_disp() );

        // Add an instance of active_group dispatcher with name 'groups'.
        p.add_named_dispatcher( "groups",
            so_5::disp::active_group::create_disp() );
        ...
    } );

Also dispatcher can be added to running SObjectizer Environment by the method add_dispatcher_if_not_exists:

void my_agent::evt_long_request( const request & evt )
{
   // New child cooperation is necessary for handling the request.
   // An instance of dispatcher is required for the cooperation.

   // Create a dispatcher if it not exists.
   so_environment().add_dispatcher_if_not_exists(
         "long_request_handler_disp",
         []() { return so_5::disp::active_group::create_disp(); } );
   ...
} 

Then user should create some dispatcher_binder objects for binding agents to the dispatcher. That object will be used during the cooperation registration for binding agent to its working thread. The dispatcher binder could be specified for whole cooperation:

auto coop = env.create_coop( "ping_pong",
    // All agents  will be bound to dispatcher with 'active_obj' name.
    so_5::disp::active_obj::create_disp_binder( "active_obj" ) );
// Will be an active object because there is default dispatcher binder for the cooperation.
coop->add_agent( new a_pinger_t(...) );
// Will be an active object too.
coop->add_agent( new a_ponger_t(...) );

Or, the dispatcher binder could be specified in add_agent method as an additional argument:

auto coop = env.create_coop( "ping_pong",
    // Agents will be bound to dispatcher with 'active_obj' name by default.
    so_5::disp::active_obj::create_disp_binder( "active_obj" ) );
// Will be an active object because there is a default dispatcher binder for the cooperation.
coop->add_agent( new a_pinger_t(...) );
// Will be an active object too.
coop->add_agent( new a_ponger_t(...) );
// Will be an active object but on a different dispatcher.
coop->add_agent( new a_shutdowner_t(...),
    so_5::disp::active_obj::create_disp_binder( "shutdowner" ) );
// Will be part of active group 'g1' on dispatcher 'groups'.
coop->add_agent( new a_group_member_t(...),
    so_5::disp::active_group::create_disp_binder( "groups", "g1" ) );
// Will be part of active group 'g2' on dispatcher 'groups'.
coop->add_agent( new a_group_member_t(...),
    so_5::disp::active_group::create_disp_binder( "groups", "g2" ) );

Delayed and Periodic Messages/Signals

Often it is necessary to send a message/signal which must be delayed in some time. The simplest way to do that is to use so_environment_t::single_timer() method. For example:

void a_transaction_handler_t::evt_initiate_request( const msg_request_data & msg )
    {
        // Store the request info for further actions...
        save_request_info( msg );
        // ...initiate a request to remove site...
        send_request( msg );
        // ...and wait for some time.
        so_environment().single_timer( new msg_request_timedout( msg ),
            m_self_mbox,
            // Message delayed for 500 millisecons.
            500 );
    }
void a_transaction_handler_t::evt_timeout( const msg_request_timedout & msg )
    {
        // Some actions dealing with absence of response from remote site...
    }

There is a variant of single_timer() method for signals:

struct msg_watch_dog : public so_5::rt::signal_t {};

so_environment().single_timer< msg_watch_dog_time >( mbox, delay_in_millisecs );

There are schedule_timer() methods which return timer_id_ref_t objects. That object is a kind of smart pointer references to the delayed message instance. Timer ID could be used for canceling delivery of the delayed message:

void a_transaction_handler_t::evt_initiate_request( const msg_request_data & msg )
    {
        // Store the request info for further actions...
        save_request_info( msg );
        // ...initiate a request to remove site...
        send_request( msg );
        // ...and wait for some time.
        m_timeout_id = so_environment().schedule_timer( new msg_request_timedout( msg ),
            m_self_mbox,
            // Message is delayed for 500 millisecons.
            500 );
    }
void a_transaction_handler_t::evt_timeout( const msg_request_timedout & msg )
    {
        // Some actions dealing with absence of response from remote site...
    }
void a_transaction_handler_t::evt_response( const msg_response_data & msg )
    {
        // Response received. So, there is no need to deliver timeout message.
        m_timeout_id->release();

        // Other response processing actions here...
    }

As usual there is a variant of schedule_timer() for signals:

m_watchdog_id = so_environment().schedule_timer< msg_watch_dog >( mbox, delay_in_millisec );

The schedule_timer() also can schedule delivery of periodic messages/signals. The last argument for schedule_timer(), which has the default value 0, must be used for that. It defines a period of message delivery.

// Scheduling watch_dog signal.
m_watchdog_id = so_environment().schedule_timer< msg_watch_dog >(
    // Mbox for signal to be delivered to.
    mbox,
    // Delay for the first appearance of the signal. In milliseconds.
    150,
    // A period for signal repetitions. In milliseconds.
    300 );

Note. In the case of schedule_timer() for a periodic delivery of message with actual data the same message object will be delivered every time.

struct msg_hello : public so_5::rt::message_t {};

void a_demo_t::so_define_agent()
    {
        so_subscribe( m_self_mbox ).event(
            []( const msg_hello & msg ) {
                // The same pointer value will be printed for every message appearance.
                std::cout << "msg pointer: " << &msg << std::endl;
            } );
    }

void a_demo_t::so_evt_start()
    {
        m_hello_id = so_environment().schedule_timer(
            new msg_hello(),
            m_self_mbox,
            100,
            100 );
    }

Agent's States

Agents are often used to implement some kinds of finite automata. Based on that, SObjectizer has a facility to define states of agents explicitly. State restricts the set of messages which could be handled by agent in that state.

To define a state for an agent it is necessary to declare and initialize so_5::rt::state_t object as an agent attribute:

class a_transaction_handler_t : public so_5::rt::agent_t
  {
    /*
     *--------------------------------
     * Declarations of agent's states.
     *--------------------------------
     */
    // Agent is waiting for an initial request.
    so_5::rt::state_t st_wait_for_request = so_make_state( "waiting for request" );
    // Agent is waiting for a response from remote state.
    so_5::rt::state_t st_wait_for_remove_site_response = so_make_state( "waiting for response" );
    // Agent doesn't receive any response.
    so_5::rt::state_t st_response_timedout = so_make_state( "no response received" );
    // The response is received and agent is handing it.
    so_5::rt::state_t st_response_processing = so_make_state( "processing" );
  ...
  };

Every agent in SObjectizer has the default state. It is defined by SObjectizer and can be accessed through agent_t::so_default_state() method.

Every agent starts its work in the default state. Because of that if an agent needs to be started in another state the switch must be performed in so_define_agent() method:

void a_transaction_handler_t::so_define_agent()
    {
        so_change_state( st_wait_for_request );

        /* Other initialization actions here... */
    }

The so_subscribe().event() method chain, example of which is shown above, does a subscription of event handler in default state. If an agent has several states it should specify a state in so_subscribe().event() chain by in() method:

void a_transaction_handler_t::so_define_agent()
    {
        so_change_state( st_wait_for_request );

        // Message msg_request_data will be processed only in st_wait_for_request state.
        so_subscribe( m_self_mbox ).in( st_wait_for_request )
            .event( &a_transaction_handler_t::evt_request );

        // Response will be processed only in st_wait_for_response state.
        so_subscribe( m_self_mbox ).in( st_wait_for_response )
            .event( &a_transaction_handler_t::evt_response );

        // Timeout must be processed in two states.
        so_subscribe( m_self_mbox )
            .in( st_wait_for_request )
            .in( st_wait_for_response )
            .event( &a_transaction_handler_t::evt_timeout );
    }

When a message/signal is sent to an mbox it will be stored in the agent's message queue regardless of the current agent state. The state of agent is checking when a message/signal is extracted from the queue and it's ready to be processed. SObjectizer checks the current agent state and if the message is subscribed in the current state the message will be passed to agent's event handler. Otherwise, the message is discarded and the next message is extracted from the queue.

To switch agent from one state to another agent_t::so_change_state() method must be used:

void a_transaction_handler_t::evt_request( const msg_request_data & msg )
    {
        so_change_state( st_wait_for_response );
        ...
    }
void a_transaction_handler_t::evt_response( const msg_response_data & msg )
    {
        so_change_state( st_response_processing );
        ...
    }
void a_transaction_handler_t::evt_timeout( const msg_timedout & msg )
    {
        so_change_state( st_response_timeout );
        ...
    }

Note. Only ordinary agents can have a state. Ad-hoc agents have no possibility to work with agent states.

Asynchronous and Synchronous Agent's Interaction

The main and primary form of agent's interaction in SObjectizer is asynchronous message passing. It means that if an agent requires some response from another agent then the application logic of the first agent should be splitted into several event handling methods. For example:

// Messages for agent's interaction.
struct msg_get_status : public so_5::rt::signal_t {};
struct msg_current_status : public so_5::rt::message_t
    {
        std::string m_status;
        msg_current_status( const std::string & s ) : m_status( s )
            {}
    };

// The first agent. It needs the status from another agent.
void a_status_requester_t::so_evt_start()
    {
        /* Do something by itself... */

        // There we should ask status of another agent.
        m_other_agent_mbox->deliver_signal< msg_get_status >();
        // Response will be received and processed in different event handler.
    }
void a_status_requester_t::evt_status_response( const msg_current_status & msg )
    {
        if( "ok" == msg.m_status )
            // One scenario...
        else
            // The other scenario...
    }

// The second agent. It provides the current status.
void a_status_provider_t::evt_get_status()
    {
        m_self_mbox->deliver_message( new msg_current_status( "ok" ) );
    }

That approach requires a lot of coding but has some advantages. It is scalable. It is hard to create a deadlock. The request-response handling could be more flexible. A requester could interact with several different agents at once, and so on.

Fortunately, SObjectizer has a possibility to use some form of synchronous interaction. Agent could send service request to another agent and then synchronously wait for a response on std::future object. The sample above could be rewritten in the following way:

// Messages for agent's interaction.
struct msg_get_status : public so_5::rt::signal_t {};

// The first agent. It needs the status from another agent.
void a_status_requester_t::so_evt_start()
    {
        /* Do something by itself... */

        // Do request of another agent status.
        std::future< std::string > s = m_other_agent_mbox->get_one< std::string >()
            .async< msg_get_status >();
        if( "ok" == s.get() )
            // One scenario...
        else
            // The other scenario...
    }

// The second agent. It provides the current status.
std::string a_status_provider_t::evt_get_status()
    {
        return "ok"
    }

Or:

// Do request of another agent status.
std::string s = m_other_agent_mbox->get_one< std::string >()
    .wait_forever().sync_get< msg_get_status >();
if( "ok" == s )
    // One scenario...
else
    // The other scenario...

However, the synchronous interaction could lead to deadlocks and requires more attention from the programmer. So, it should be used very carefully.

For more information about synchronous interaction see [so-5.3.0 Synchronous interaction with agents].

Non-handled Exceptions

C++ exceptions are actively used by SObjectizer. It's the only way to report about error in SObjectizer. Because of that it is possible to receive SObjectizer initiated exception in agent's event handler. If exception is not handled by agent and goes out from event handling method (or lambda function) then it is intercepted and handled by SObjectizer.

By default SObjectizer calls std::abort() for non-handled exceptions, and it could be customized by user. For the details of dealing with non-handled exceptions see [so-5.3.0 Exception reaction inheritance].


Related

Wiki: Basics
Wiki: so-5.2.3 Parent-child cooperation relationship
Wiki: so-5.3.0 Exception reaction inheritance
Wiki: so-5.3.0 Synchronous interaction with agents
Wiki: so-5.4 By Example Coop User Resources
Wiki: so-5.4 By Example Cooperation Notifications
Wiki: so-5.4 By Example Minimal Ping-Pong
Wiki: so-5.4 By Example Periodic Hello
Wiki: so-5.4 By Example Synchronous Services and Exceptions
Wiki: so-5.4.0 New type of mbox - multi-producer and single-consumer