Menu

so-5.3 Basics

Yauheni Akhotnikau Boris Sivko Nicolai Grodzitski

Introduction

so_5 is a framework for organizing application's work via message passing between agents. There are agents which send and receive messages or/and signals to each other. The messages are delivered via mail boxes and every agent has its own message queue where the messages are stored before processing. Some messages could be delayed (e.g. delivered after some time) and some of them could be repeated (e.g. will be delivered periodically again and again).

Every agent is working on some execution context, but user has no need to manage his working thread manually. All of working threads are controlled by dispatchers. The user is responsible only for selection of appropriate dispatchers and distribution of agents among them.

SObjectizer Environment

SObjectizer Environment (so_environment) is a container which comprises all the stuff. SObjectizer Environment is represented to the user as an interface so_5::rt::so_environment_t.

It is possible to create several instances of SObjectizer Environment inside one application, although the need for it arises very rarely. In such case they will be totally independent instances of so_environment and agents from one instance will work independently from agents from another instance.

The interface so_5::rt::so_environment_t is an abstract class. The concrete so_environment_t implementation is created inside the run_so_environment*() functions family. The only way to start SObjectizer Environment -- is to call one of so_5::api::run_so_environment*() functions.

Every run_so_environment*() function does the same sequence of steps:

  • instantiates a concrete implementation of so_5::rt::so_environment_t;
  • calls the user supplied initialization function;
  • starts SObjectizer Environment;
  • waits for SObjectizer Environment shutdown.

It means that run_so_environment() blocks the caller thread until SObjectizer Environment finished its work.

In simple application run_so_environment() is called only once in main() function, and afterwards the application finished when run_so_environment() returns. If user wants to do something else concurrently or to run another SObjectizer Environment instance he/she should create a dedicated thread on context of which run_so_environment() will be called.

The run_so_environment*() function has several forms but only two of them are basic and could be used in almost 99% cases. The first one receives just one argument -- it is a function object (or lambda) with initialization actions:

so_5::api::run_so_environment(
  []( so_5::rt::so_environment_t & env ) {
    auto coop = env.create_coop( "hello_world" );
    coop->define_agent().on_start(
      [&env]() {
        std::cout << "Hello, world!" << std::endl;
        env.stop();
      } );
    env.register_coop( std::move( coop ) );
  } );

The second one receives two arguments. They are functional objects. The first, like in previous case, does the initialization actions. The second argument sets up SObjectizer Environment's parameters:

so_5::api::run_so_environment(
  []( so_5::rt::so_environment_t & env ) {
    auto coop = env.create_coop( "hello_world",
      so_5::disp::active_obj::create_disp_binder( "active_obj" ) );
    coop->define_agent().on_start(
      [&env]() {
        std::cout << "Hello, world!" << std::endl;
        env.stop();
      } );
    coop->define_agent().on_start(
      []() { std::cout << "Hello form another thread" << std::endl; } );
    env.register_coop( std::move( coop ) );
  } ),
  []( so_5::rt::so_environment_params_t & p ) {
    p.add_named_dispatcher( "active_obj",
      so_5::disp::active_obj::create_disp() );
  } );

Messages and Signals

An interaction between agents is performed by sending messages. There are two types of messages:

  • an ordinal message with some data inside;
  • an special case of message where there is no any data except the fact of message existence. Such kind of message is a signal.

Every message/signal must have a unique type -- a dedicated C++ class/struct.

For ordinal messages the corresponding C++ classes must be derived from so_5::rt::message_t.

For signals the corresponding C++ classes must be derived from so_5::rt::signal_t:

// This is a message type.
struct msg_convert : public so_5::rt::message_t
  { /* some fields here */ };
// This is a signal type.
struct msg_get_status : public so_5::rt::signal_t {};

All of ordinal messages must be sent as dynamically allocated objects, e.g. instances of corresponding C++ classes created by new operator:

mbox->deliver_message( new msg_convert( 42 ) );
// Or
std::unique_ptr< msg_convert > msg( new msg_convert( 42 ) );
mbox->deliver_message( std::move( msg ) );

All of signals must be sent by special methods like:

mbox->deliver_signal< msg_get_status >();

So it is not necessary to create a signal instance.

Note. so_5 distinguishes message/signal by their C++ type. It necessary to define some different type for each message even if they have the same structure. For example, there are two different types of message:

struct msg_convert_to_long : public so_5::rt::message_t
  {
    int m_value;
  };
struct msg_convert_to_string : public so_5::rt::message_t
  {
    int m_value;
  };

But these are not:

struct msg_convert_int : public so_5::rt::message_t
  {
    int m_value;
  };
// IT WON'T WORK IN SO_5!
typedef msg_convert_int msg_convert_to_long;
typedef msg_convert_int msg_convert_to_string;

Mboxes

SObjectizer uses a lightweight publish-subscribe model and mbox (mail box) is the heart of SObjectizer messaging system. The mbox is like bulletin board: a producer publishes message/signal on it and all of consumers of the message/signal type from the bulletin board receive the message/signal instance. It means that the producer should know a mbox to which the messages will be sent (published) and the consumer also should know the mbox to make the message subscription. It is a task of user to share that knowledge between application's agents.

The mbox can deliver messages of different types. It means that user can send various messages of various types to a single mbox. And, it is not necessary to create different mboxes for different message types.

However, the subscription is made for some specific message type. For example, if an agent A is subscribed to messages of type msg_ping from mbox M, and an agent B is subscribed to messages of type msg_pong, then user can send msg_ping or msg_pong messages to M, but A will receive only msg_ping and B will receive only msg_pong.

All of mboxes are controlled by SObjectizer Environment. The interface so_environment_t contains methods for creating mboxes. A user holds references to mboxes by so_5::rt::mbox_ref_t objects. mbox_ref_t is a kind of smart pointer. Any mbox will be automatically destroyed right after the moment, when all of mbox_ref to it are gone.

There are two types of mboxes:

  • anonymous. Those mboxes have no any identification. They are known only by mbox_ref's to them. In fact, if user lost mbox_ref to an anonymous mbox he/she lost the mbox too;
  • named. Those mboxes should have unique names. SObjectizer Environment holds a dictionary of named mboxes. If user creates a mbox with already known name then the same mbox is returned.

An anonymous mbox can be created by so_environment_t::create_local_mbox() method without the name argument:

// Anonymous mbox will be created.
auto mbox = env.create_local_mbox();

A named mbox can be created (or a reference to already created mbox is obtained) by so_environment_t::create_local_mbox() method with the name argument:

// Named mbox is created or obtained.
auto mbox = env.create_local_mbox( "notification_channel" );

The choice between anonymous or named mboxes mainly depends on possibility to pass mbox-reference to agent's constructor. If it is possible then anonymous mboxes could be better choice. For example, if there are two agents which are constructed at the same time and should pass messages to each other then anonymous mboxes is the simplest case:

// Pinger agents. Should have its own mbox and mbox of ponger agent.
class a_pinger_t : public so_5::rt::agent_t
  {
  public :
    // Receives mbox-references in the constructor:
    a_pinger_t( so_5::rt::so_environment_t & env,
      // Reference to own mbox.
      const so_5::rt::mbox_ref_t & self_mbox,
      // Reference to ponger's mbox.
      const so_5::rt::mbox_ref_t & ponger_mbox )
      : so_5::rt::agent_t( env )
      , m_self_mbox( self_mbox )
      , m_ponger_mbox( ponger_mbox )
      ...
  ...
  };
// Ponger agents. Should have its own mbox and mbox of pinger agent.
class a_ponger_t : public so_5::rt::agent_t
  {
  public :
    // Receives mbox-references in the constructor:
    a_ponger_t( so_5::rt::so_environment_t & env,
      // Reference to own mbox.
      const so_5::rt::mbox_ref_t & self_mbox,
      // Reference to pinger's mbox.
      const so_5::rt::mbox_ref_t & pinger_mbox )
      : so_5::rt::agent_t( env )
      , m_self_mbox( self_mbox )
      , m_pinger_mbox( pinger_mbox )
      ...
  ...
  };
...
// Creation of mboxes.
auto pinger_mbox = env.create_local_mbox();
auto ponger_mbox = env.create_local_mbox();
// Creation of agents.
coop->add_agent( new a_pinger_t( env, pinger_mbox, ponger_mbox ) );
coop->add_agent( new a_ponger_t( env, ponger_mbox, pinger_mbox ) );

But there could be cases when agents are created in different parts of application (when so_sysconf is used for example) and direct exchange of mbox reference is difficult. In such cases named mboxes could help:

// Pinger agents. Should have its own mbox and mbox of ponger agent.
class a_pinger_t : public so_5::rt::agent_t
  {
  public :
    a_pinger_t( so_5::rt::so_environment_t & env )
      : so_5::rt::agent_t( env )
      , m_self_mbox( env.create_local_mbox( "pinger" ) )
      , m_ponger_mbox( env.create_local_mbox( "ponger" ) )
      ...
  ...
  };
// Ponger agents. Should have its own mbox and mbox of pinger agent.
class a_ponger_t : public so_5::rt::agent_t
  {
  public :
    a_ponger_t( so_5::rt::so_environment_t & env )
      : so_5::rt::agent_t( env )
      , m_self_mbox( env.create_local_mbox( "ponger" ) )
      , m_pinger_mbox( env.create_local_mbox( "pinger" ) )
      ...
  ...
  };
...
// Pinger is created in one place...
coop->add_agent( new a_pinger_t( env ) );
...
// Ponger is created somewhere else...
coop->add_agent( new a_ponger_t( env ) );

There could be more sophisticated cases when agents are created, destroyed and created again, and in this case the named mboxes could be useful. However, this is out of scope of this introduction to SObjectizer's basics.

There are two mbox's method families for message delivery. The first one is used to deliver ordinal messages. Methods of that family called deliver_message and type of message is deduced from the actual argument:

m_ponger_mbox->deliver_message(
  // Message type is deduced from the argument.
  new msg_ping( ... ) );
// Or via unique_ptr wrapper...
std::unique_ptr< msg_ping > msg( new msg_ping(...) );
m_ponger_mbox->deliver_message(
  // Message type is deduced from the argument.
  msg );

The second one is used to deliver signals. The type of signal must be specified explicitly:

m_pinger_mbox->deliver_signal< msg_pong >();

Cooperations

The cooperation is a kind of container for introducing (registering) and removing (deregistering) agents to/from SObjectizer Environment. With the help of cooperation the several agents which are dependent on one another could be registered at once in transaction mode: all of agents are registered successfully or no one of them.

Each cooperation must have a unique name. SObjectizer Environment maintains a dictionary of registered cooperations. An attempt to register a cooperation with already known name is an error.

A cooperation creation consists of three stages:

  1. A cooperation object must be created by so_environment_t::create_coop() method.
  2. The cooperation must be filled up with agents by agent_coop_t::add_agent() method.
  3. The cooperation must be registered by moving to so_environment_t::register_coop() method.

For example:

// Creation of cooperation.
auto coop = env.create_coop( "ping_pong_demo" );
// Creation of agents.
coop->add_agent( new a_pinger_t( env, pinger_mbox, ponger_mbox ) );
coop->add_agent( new a_ponger_t( env, ponger_mbox, pinger_mbox ) );
// Registration of cooperation.
env.register_coop( std::move( coop ) );

To deregister cooperation so_environment_t::deregister_coop() method must be used. It receives the cooperation name to be deregistered:

env.deregister_coop( "ping_pong_demo" );

Please note that the deregistration can take some time. The method deregister_coop() only starts the deregistration procedure. But deregistration is completed only when all of cooperation agents finished their message processing. Whereas the deregistration procedure is active, their agents cannot receive any new messages. The agents only handle their old messages which had been dispatched before deregister_coop() call.

In the case where a cooperation should contain only one agent the register_agent_as_coop() method can be used:

// Create agent and register it as a cooperation "demo".
env.register_agent_as_coop(
  // Cooperation name.
  "demo", 
  // The single agent of cooperation.
  new a_demo_t( env, ... ) );

Agents

Agents are objects which receive and handle messages and signals.

Every agent has its own message queue. When someone sends a message/signal to which an agent is subscribed a reference to that message/signal is stored in agent's message queue. Agents periodically get messages from their queues and process them. Processing of message/signal from queue is called 'event handling'.

There are two kinds of agents in SObjectizer: ordinary agents and ad-hoc agents.

Ordinary Agents

Ordinary agents which exist from the very first version of SObjectizer are the most flexible and full fledged agents.

To define an ordinary agent it is necessary to derive a new class from so_5::rt::agent_t:

class a_pinger_t : public so_5::rt::agent_t
  { /* class definition here */ }

Subsequently, instances of that class may be used for some cooperation.

There are several virtual methods of so_5::rt::agent_t class which could be used for agent tuning.

The first one is so_define_agent(). It is called inside register_coop() before an agent starts its work. The main purpose of so_define_agent() is to provide a possibility to make subscriptions. Usually agents make their subscription in so_define_agent():

void a_pinger_t::so_define_agent()
  {
    so_subscribe( m_self_mbox ).event( &a_pinger_t::evt_pong );
    ...
  }

The second one is so_evt_start(). It is called as the very first agent event after the successful registration of agent's cooperation. It could be used for doing something at the start of agent's work:

void a_pinger_t::so_evt_start()
  {
    m_ponger_mbox->deliver_message( new msg_ping( "Start!" ) );
  }

The main difference between so_define_agent() and so_evt_start() is: so_define_agent() is called on the context of thread where register_coop() is called. Whereas so_evt_start() is called on the context of agent's working thread (more about it see Dispatchers section).

The next important virtual method is so_evt_finish(). It is the opposite to so_evt_start(). This method is the last method called for agent on the context of agent's working thread.

The pair of so_evt_start()/so_evt_finish() could be regarded as analogs of constructor/destructor. The method so_evt_start() could be used for allocating some resource and so_evt_finish() for deallocating it:

void a_db_manager_t::so_evt_start()
  {
    // Create a connection to DB.
    m_connection = create_database_connection( ... );
    ...
  } 
void a_db_manager_t::so_evt_finish()
  {
    // Close the connection gracefully.
    m_connection.close();
  }

The usual way for message handling for ordinary agents is to define a dedicated non-static method for each message type. For the cases of messages (e.g. messages with data inside) there could be two forms of such methods:

// Long form:
result_type event_handler( const so_5::rt::event_type_t< MSG > & evt );

// Short form:
result_type event_handler( const MSG & evt );

There is no need to declare message type during event subscription -- it is deduced from the method's prototype:

void a_ponger_t::so_define_agent()
  {
    so_subscribe( m_self_mbox ).event( &a_ponger_t::evt_ping );
  }
void a_ponger_t::evt_ping( const msg_ping & msg )
  { ... }

The difference between long and short forms is the possibility of calling event_type_t::make_reference() method for resending the same message object. If this possibility is not necessary then the short form is preferable.

For the cases of signals (e.g. messages without actual data) there could be two forms of signal-handlers. The first one is the same as the first form of message handlers:

// Long form.
result_type event_handler( const so_5::rt::event_type_t< SIGNAL > & evt );

There is also no need to declare any signal type during the signal subscription:

void a_pinger_t::so_define_agent()
  {
    so_subscribe( m_self_mbox ).event( &a_pinger_t::evt_pong );
  }
void a_ponger_t::evt_ping( const so_5::rt::event_type_t< msg_pong > & msg )
  { ... }

This long form for signal handlers is supported for historical reasons as well as for simplification in case of heavy using of C++ templates in agents code (in such case there is no difference between message and signal subscription and handler's declaration).

The second form of signal handlers is the short form:

// Short form for signal handler.
result_type event_handler();

In this case, type of signal should be explicitly specified during signal subscription:

struct msg_shutdown : public so_5::rt::signal_t {};
struct msg_get_status : public so_5::rt::signal_t {};

void a_watch_dog_t::so_define_agent()
  {
    so_subscribe( m_self_mbox ).event(
      // Type of the signal to handle.
      so_5::signal< msg_shutdown >,
      // Handler.
      &a_watch_dog_t::evt_shutdown );

    so_subscribe( m_self_mbox ).event(
      // Type of the signal to handle.
      so_5::signal< msg_get_status >,
      // Handler.
      &a_watch_dog_t::evt_get_status );
  }
void a_watch_dog_t::evt_shutdown()
  {
    so_environment().stop();
  }
std::string a_watch_dog_t::evt_get_status()
  {
    return m_everything_fine ? "ok" : "failure";
  }

There is a possibility to use C++11 lambda functions as event handlers. They should be used as arguments to event() method in subscription chain:

void a_watch_dog_t::so_define_agent()
  {
    so_subscribe( m_self_mbox ).event(
      // Subscription for a message.
      // Type of the message will be deduced.
      [this]( const msg_failure_detected & msg ) {
        m_everything_fine = false;
        m_failure_description = msg.reason();
      } );

    so_subscribe( m_self_mbox ).event(
      // Subscription for a signal.
      // Type of the signal must be specified.
      so_5::signal< msg_shutdown >,
      [this]() { so_environment().stop(); } );

    so_subscribe( m_self_mbox ).event(
      // Subscription for a signal.
      // Type of the signal must be specified.
      so_5::signal< msg_get_status >,
      [this]() -> std::string {
        return m_everything_fine ? "ok" : "failure";
      } );
  }

Note. The message handler defined via lambda function cannot receive argument of type const so_5::rt::event_data_t<MSG>&, only as const MSG&.

Ad-hoc Agents

Ad-hoc agents are agents which could be constructed and registered without necessity of defining dedicated C++ class.

To construct an ad-hoc agent it is necessary to call define_method for a cooperation object. This method returns a special proxy object which can be used to define the behavior of a new agent. For example, this method chain creates the agents which handles a message and a signal.

auto coop = env.create_coop( "sample" );
// Definition of ad-hoc agent.
coop->define_agent()
   // First event for that agent.
   .event(
      // A mbox as message source.
      some_mbox,
      // Event handling code. Type of message is deduced automatically.
      []( const msg_convert & msg ) -> int {
         return std::atoi( msg.m_value );
      } )
   // Second event for that agent.
   .event(
      // A mbox as message source.
      some_mbox,
      // Type of signal. Must be specified explicitly.
      so_5::signal< msg_get_status >,
      // Event handling code.
      []() -> std::string {
         return "ready";
      } );

There is no C++ class for ad-hoc agent, and that's why there are no virtual methods like so_define_agent(), so_evt_start(), so_evt_finish() and so on.

Subscriptions for ad-hoc agent should be done in define_agent().event() chains as shown above.

To define the start action for ad-hoc agent the define_agent().on_start() chain must be used:

coop->define_agent()
  // The start action for the agent.
  .on_start( []() { std::cout << "Hello, world" << std::endl; } )
  ... /* other agent definitions... */;

To define the finish action for ad-hoc agent the define_agent().on_start() chain must be used:

coop->define_agent()
  // The finish action for the agent.
  .on_finish( []() { std::cout << "Bye, world" << std::endl; } )
  ... /* other agent definitions... */;

Dispatchers

Every agent should work on some execution context: some working thread must be assigned to an agent (or, in SObjectizer's terms, agent must be bound to some working thread). The special object is responsible for that -- dispatcher.

The dispatchers could be seen as managers for working threads. SObjectizer's user is free from thread management. All of threads are started and stopped by dispatcher under SObjectizer Environment control. All that the user should do is to specify which dispatcher should be used by SObjectizer Environment and bind agents to appropriate dispatchers.

There are three of dispatcher types which are available for the user "out of the box":

  • the simplest one which binds every agent to the same working thread. It is called one_thread dispatcher;
  • the dispatcher which binds an agent to a dedicated working thread. The agent becomes an active object -- it works on its own thread and doesn't share this thread with the other agents. That dispatcher is called active_obj dispatcher;
  • the dispatcher which binds an agent group to one dedicated working thread. Every agent within the group works on the same working thread but the other agents all work on different threads. That dispatcher is called active_group dispatcher.

The one_thread dispatcher is available to user as a default dispatcher. There is no need to do something to launch that kind of dispatcher. By default agents all are handled by that default dispatcher.

The other types of dispatchers require some user actions. First of all, dispatcher instance must be created and passed to SObjectizer Environment. Each dispatcher instance must have an unique dispatcher name. The name will be used later to bind agents to the dispatcher.

A dispatcher is passed to SObjectizer Environment via so_environment_params_t object which could be tuned in the run_so_environment function:

so_5::api::run_so_environment(
    []( so_5::rt::so_environment_t & env ) { /* some initial actions */ },
    // SObjectizer's parameters tuning.
    []( so_5::rt::so_environment_params_t & params ) {
        // Add an instance of active_obj dispatcher with name 'active_obj'.
        p.add_named_dispatcher( "active_obj",
            so_5::disp::active_obj::create_disp() );
        // Add another instance of active_obj dispatcher with name 'shutdowner'.
        p.add_named_dispatcher( "shutdowner",
            so_5::disp::active_obj::create_disp() );

        // Add an instance of active_group dispatcher with name 'groups'.
        p.add_named_dispatcher( "groups",
            so_5::disp::active_group::create_disp() );
        ...
    } );

Then user should create some dispatcher_binder objects for binding agents to the dispatcher. That object will be used during the cooperation registration for binding agent to its working thread. The dispatcher binder could be specified for the whole cooperation:

auto coop = env.create_coop( "ping_pong",
    // Agents all will be bound to dispatcher with 'active_obj' name.
    so_5::disp::active_obj::create_disp_binder( "active_obj" ) );
// Will be an active object because there is default dispatcher binder for the cooperation.
coop->add_agent( new a_pinger_t(...) );
// Will be an active object too.
coop->add_agent( new a_ponger_t(...) );

Or the dispatcher binder could be specified in add_agent method as an additional argument:

auto coop = env.create_coop( "ping_pong",
    // Agents will be bound to dispatcher with 'active_obj' name by default.
    so_5::disp::active_obj::create_disp_binder( "active_obj" ) );
// Will be an active object because there is a default dispatcher binder for the cooperation.
coop->add_agent( new a_pinger_t(...) );
// Will be an active object too.
coop->add_agent( new a_ponger_t(...) );
// Will be an active object but on a different dispatcher.
coop->add_agent( new a_shutdowner_t(...),
    so_5::disp::active_obj::create_disp_binder( "shutdowner" ) );
// Will be part of active group 'g1' on dispatcher 'groups'.
coop->add_agent( new a_group_member_t(...),
    so_5::disp::active_group::create_disp_binder( "groups", "g1" ) );
// Will be part of active group 'g2' on dispatcher 'groups'.
coop->add_agent( new a_group_member_t(...),
    so_5::disp::active_group::create_disp_binder( "groups", "g2" ) );

Delayed and Periodic Messages/Signals

Often it is necessary to send a message/signal which must be delayed in some time. The simplest way to do that is to use so_environment_t::single_timer() method. For example:

void a_transaction_handler_t::evt_initiate_request( const msg_request_data & msg )
    {
        // Store the request info for further actions...
        save_request_info( msg );
        // ...initiate a request to remove site...
        send_request( msg );
        // ...and wait for some time.
        so_environment().single_timer( new msg_request_timedout( msg ),
            m_self_mbox,
            // Message delayed for 500 millisecons.
            500 );
    }
void a_transaction_handler_t::evt_timeout( const msg_request_timedout & msg )
    {
        // Some actions dealing with absence of response from remote site...
    }

There is a variant of single_timer() method for signals:

struct msg_watch_dog : public so_5::rt::signal_t {};

so_environment().single_timer< msg_watch_dog_time >( mbox, delay_in_millisecs );

There are schedule_timer() methods which return timer_id_ref_t objects. That object is a kind of smart pointer references to the delayed message instance. Timer ID could be used for canceling delivery of the delayed message:

void a_transaction_handler_t::evt_initiate_request( const msg_request_data & msg )
    {
        // Store the request info for further actions...
        save_request_info( msg );
        // ...initiate a request to remove site...
        send_request( msg );
        // ...and wait for some time.
        m_timeout_id = so_environment().schedule_timer( new msg_request_timedout( msg ),
            m_self_mbox,
            // Message delayed for 500 millisecons.
            500 );
    }
void a_transaction_handler_t::evt_timeout( const msg_request_timedout & msg )
    {
        // Some actions dealing with absence of response from remote site...
    }
void a_transaction_handler_t::evt_response( const msg_response_data & msg )
    {
        // Response received. So there is no need to deliver timeout message.
        m_timeout_id->release();

        // Other response processing actions here...
    }

As usual there is a variant of schedule_timer() for signals:

m_watchdog_id = so_environment().schedule_timer< msg_watch_dog >( mbox, delay_in_millisec );

The schedule_timer() also can schedule delivery of periodic messages/signals. The last argument for schedule_timer(), which has the default value 0, must be used for that. It defines a period of message delivery.

// Scheduling watch_dog signal.
m_watchdog_id = so_environment().schedule_timer< msg_watch_dog >(
    // Mbox for signal to be delivered to.
    mbox,
    // Delay for the first appearance of the signal. In milliseconds.
    150,
    // A period for signal repetitions. In milliseconds.
    300 );

Note. In the case of schedule_timer() for a periodic delivery of message with actual data the same message object will be delivered every time.

struct msg_hello : public so_5::rt::message_t {};

void a_demo_t::so_define_agent()
    {
        so_subscribe( m_self_mbox ).event(
            []( const msg_hello & msg ) {
                // The same pointer value will be printed for every message appearance.
                std::cout << "msg pointer: " << &msg << std::endl;
            } );
    }

void a_demo_t::so_evt_start()
    {
        m_hello_id = so_environment().schedule_timer(
            new msg_hello(),
            m_self_mbox,
            100,
            100 );
    }

Agent's States

Agents are often used to implement some kinds of finite automata. Based on that, SObjectizer has a facility to define states of agents explicitly. State restricts the set of messages which could be handled by agent in that state.

To define a state for an agent it is necessary to declare and initialize so_5::rt::state_t object as an agent attribute:

class a_transaction_handler_t : public so_5::rt::agent_t
    {
        /*
         *--------------------------------
         * Declarations of agent's states.
         *--------------------------------
         */
        // Agent is waiting for an initial request.
        so_5::rt::state_t st_wait_for_request;
        // Agent is waiting for a response from remote state.
        so_5::rt::state_t st_wait_for_remove_site_response;
        // Agent doesn't receive any response.
        so_5::rt::state_t st_response_timedout;
        // The response is received and agent is handing it.
        so_5::rt::state_t st_response_processing;

    public :
        a_transaction_handler_t( so_5::rt::so_environment_t & env )
            : so_5::rt::agent_t( env )
            /*
             *-----------------------------------
             * Initialization of agent's states.
             *-----------------------------------
             */
            , st_wait_for_request( so_self_ptr(), "waiting for request" )
            , st_wait_for_remote_site_response( so_self_ptr(), "waiting for response" )
            , st_response_timeout( so_self_ptr(), "no response received" )
            , st_response_processing( so_self_ptr(), "processing" )
            {}
    ...
    };

Every agent in SObjectizer has the default state. It is defined by SObjectizer and can be accessed through agent_t::so_default_state() method.

Every agent starts its work in the default state. Because of that if an agent needs to be started in another state the switch must be performed in so_define_agent() method:

void a_transaction_handler_t::so_define_agent()
    {
        so_change_state( st_wait_for_request );

        /* Other initialization actions here... */
    }

The so_subscribe().event() method chain which is shown above does a subscription of event handler in default state. If an agent has several states it should specify a state in so_subscribe().event() chain by in() method:

void a_transaction_handler_t::so_define_agent()
    {
        so_change_state( st_wait_for_request );

        // Message msg_request_data will be processed only in st_wait_for_request state.
        so_subscribe( m_self_mbox ).in( st_wait_for_request )
            .event( &a_transaction_handler_t::evt_request );

        // Response will be processed only in st_wait_for_response state.
        so_subscribe( m_self_mbox ).in( st_wait_for_response )
            .event( &a_transaction_handler_t::evt_response );

        // Timeout must be processed in two states.
        so_subscribe( m_self_mbox )
            .in( st_wait_for_request )
            .in( st_wait_for_response )
            .event( &a_transaction_handler_t::evt_timeout );
    }

When a message/signal is sent to an mbox it will be stored in the agent's message queue regardless of the current agent state. The state of agent is checked when a message/signal is extracted from the queue and it's ready to be processed. SObjectizer checks the current agent state and if the message is subscribed in the current state it will be passed to agent's event handler. Otherwise the message is discarded and the next message is extracted from the queue.

To switch agent from one state to another agent_t::so_change_state() method must be used:

void a_transaction_handler_t::evt_request( const msg_request_data & msg )
    {
        so_change_state( st_wait_for_response );
        ...
    }
void a_transaction_handler_t::evt_response( const msg_response_data & msg )
    {
        so_change_state( st_response_processing );
        ...
    }
void a_transaction_handler_t::evt_timeout( const msg_timedout & msg )
    {
        so_change_state( st_response_timeout );
        ...
    }

Note. Only ordinary agents can have a state. Ad-hoc agents have no possibility to work with agent states.

Asynchronous and Synchronous Agent's Interaction

The main and primary form of agent's interaction in SObjectizer is asynchronous message passing. It means that if an agent requires some response from another agent then the application logic of the first agent should be splitted into several event handling methods. For example:

// Messages for agent's interaction.
struct msg_get_status : public so_5::rt::signal_t {};
struct msg_current_status : public so_5::rt::message_t
    {
        std::string m_status;
        msg_current_status( const std::string & s ) : m_status( s )
            {}
    };

// The first agent. It needs the status from another agent.
void a_status_requester_t::so_evt_start()
    {
        /* Do something by itself... */

        // There we should ask status of another agent.
        m_other_agent_mbox->deliver_signal< msg_get_status >();
        // Response will be received and processed in different event handler.
    }
void a_status_requester_t::evt_status_response( const msg_current_status & msg )
    {
        if( "ok" == msg.m_status )
            // One scenario...
        else
            // The other scenario...
    }

// The second agent. It provides the current status.
void a_status_provider_t::evt_get_status()
    {
        m_self_mbox->deliver_message( new msg_current_status( "ok" ) );
    }

That approach requires a lot of coding but has some advantages. It is scalable. It is hard to create a deadlock. The request-response handling could be more flexible. A requester could interact with several different agents at once and so on.

Fortunately, SObjectizer has possibility to use some form of synchronous interaction. An agent could send service request to another agent and then synchronously wait for a response on std::future object. The sample above could be rewritten in the following way:

// Messages for agent's interaction.
struct msg_get_status : public so_5::rt::signal_t {};

// The first agent. It needs the status from another agent.
void a_status_requester_t::so_evt_start()
    {
        /* Do something by itself... */

        // Do request of another agent status.
        std::future< std::string > s = m_other_agent_mbox->get_one< std::string >()
            .async< msg_get_status >();
        if( "ok" == s.get() )
            // One scenario...
        else
            // The other scenario...
    }

// The second agent. It provides the current status.
std::string a_status_provider_t::evt_get_status()
    {
        return "ok"
    }

Or:

// Do request of another agent status.
std::string s = m_other_agent_mbox->get_one< std::string >()
    .wait_forever().sync_get< msg_get_status >();
if( "ok" == s )
    // One scenario...
else
    // The other scenario...

However, the synchronous interaction could lead to deadlocks and requires more attention from the programmer. So it should be used very carefully.

For more information about synchronous interaction see [so-5.3.0 Synchronous interaction with agents].

Non-handled Exceptions

C++ exceptions are actively used by SObjectizer. It's the only way to report about error in SObjectizer. Because of that it is possible to receive SObjectizer initiated exception in agent's event handler. If an exception is not handled by an agent and goes out from event handling method (or lambda function) then it is intercepted and handled by SObjectizer.

By default SObjectizer calls std::abort() for non-handled exceptions, and it could be customized by user. For the details of dealing with non-handled exceptions see [so-5.3.0 Exception reaction inheritance].


Related

Wiki: Basics
Wiki: so-5.3 By Example Minimal Ping-Pong
Wiki: so-5.3 By Example Periodic Hello
Wiki: so-5.3 By Example Synchronous Services and Exceptions
Wiki: so-5.3.0 Exception reaction inheritance
Wiki: so-5.3.0 Synchronous interaction with agents