Menu

so-5.4 By Example Periodic Hello

Yauheni Akhotnikau

Introduction

This sample shows how to work with named mboxes and timers. It is strongly recommended to read [so-5.4 Basics] and [so-5.4 By Example Minimal Ping-Pong] before viewing this sample in order to have a better understanding of what is happening here in the code.

There are two agents in this sample. The first and the simple one is a shutdowner agent. It receives msg_stop_signal and finishes SObjectizer work.

The second agent, a_hello, uses periodic and delayed messages/signals. A periodic message msg_hello_periodic is used to show ability of SObjectizer to repeat messages with the specified period of time. A delayed signal msg_stop_signal is used to demonstrate ability of SObjectizer to cancel delayed messages/signals before they are sent.

In the so_evt_start() method a_hello agent initiates two messages:

  • a periodic msg_hello_periodic with greeting text inside;
  • a delayed signal msg_stop_signal.

The trick is that the delay interval for msg_hello_periodic appearance is less than the delay interval for msg_stop_signal. It allows a_hello to cancel previous msg_stop_signal instance and schedule new msg_stop_signal instance. But yet again the next appearance of msg_hello_periodic happened before the appearance of msg_stop_signal. It leads to a canceling and rescheduling loop of msg_stop_signal. After several iterations of that loop a_hello cancels msg_hello_periodic, but not msg_stop_signal. It causes msg_stop_signal to appear and finish the sample.

Sample Code

/*
 * A sample of a simple agent which sends a periodic message to itself.
 */

#include <iostream>
#include <time.h>

#include <ace/OS.h>

// Main SObjectizer header files.
#include <so_5/all.hpp>

// Hello message.
struct msg_hello_periodic : public so_5::rt::message_t
{
    // Greeting.
    std::string m_message;
};

// Stop message.
class msg_stop_signal : public so_5::rt::signal_t {};

// An agent class definition.
class a_hello_t : public so_5::rt::agent_t
{
    public:
        a_hello_t( so_5::rt::so_environment_t & env )
            :   so_5::rt::agent_t( env )
            ,   m_evt_count( 0 )
            ,   m_shutdowner_mbox(
                    so_environment().create_local_mbox( "shutdown" ) )
        {}
        virtual ~a_hello_t()
        {}

        // Definition of an agent for SObjectizer.
        virtual void
        so_define_agent();

        // A reaction to start of work in SObjectizer.
        virtual void
        so_evt_start();

        // Hello message handler.
        void
        evt_hello_periodic( const msg_hello_periodic & msg );

    private:
        // Agent's mbox.
        const so_5::rt::mbox_ref_t m_shutdowner_mbox;

        // Timer events' identifiers.
        so_5::timer_thread::timer_id_ref_t m_hello_timer_id;
        so_5::timer_thread::timer_id_ref_t m_stop_timer_id;

        // How much timer event has been processed.
        unsigned int m_evt_count;
};

void
a_hello_t::so_define_agent()
{
    // Message subscription.
    so_subscribe( so_direct_mbox() )
        .event( &a_hello_t::evt_hello_periodic );
}

void
a_hello_t::so_evt_start()
{
    time_t t = time( 0 );
    std::cout << asctime( localtime( &t ) )
        << "a_hello_t::so_evt_start()" << std::endl;

    std::unique_ptr< msg_hello_periodic > msg( new msg_hello_periodic );
    msg->m_message = "Hello, periodic!";

    // Sending a greeting.
    m_hello_timer_id =
        so_environment()
            .schedule_timer(
                std::move( msg ),
                so_direct_mbox(),
                // Delay for a second.
                so_5::chrono_helpers::to_ms( std::chrono::seconds(1) ),
                // Repeat every 1.25 of seconds.
                so_5::chrono_helpers::to_ms( std::chrono::milliseconds(1250) ) );

    // Sending a stop signal.
    m_stop_timer_id =
        so_environment()
            .schedule_timer< msg_stop_signal >(
                m_shutdowner_mbox,
                // Delay for two seconds.
                so_5::chrono_helpers::to_ms( std::chrono::seconds(2) ),
                // Not a periodic.
                0 );
}

void
a_hello_t::evt_hello_periodic( const msg_hello_periodic & msg )
{
    time_t t = time( 0 );
    std::cout << asctime( localtime( &t ) )
        << msg.m_message << std::endl;

    if( 5 == ++m_evt_count )
    {
        // Stops hello message.
        m_hello_timer_id.release();
    }
    else
    {
        // Reschedule a stop signal.
        // Previous a stop signal should be canceled.
        m_stop_timer_id =
            so_environment()
                .schedule_timer< msg_stop_signal >(
                    m_shutdowner_mbox,
                    // 1300ms but specified in microsecs just for demonstration.
                    so_5::chrono_helpers::to_ms( std::chrono::microseconds(1300000) ),
                    0 );
    }
}

// Creation of 'hello' cooperation.
void
create_hello_coop( so_5::rt::so_environment_t & env )
{
    // Single agent can be registered as whole cooperation.
    env.register_agent_as_coop( "hello", new a_hello_t( env ) );
}

// Creation of 'shutdowner' cooperation.
void
create_shutdowner_coop( so_5::rt::so_environment_t & env )
{
    // Mbox for shutdowner agent.
    auto mbox = env.create_local_mbox( "shutdown" );

    // Cooperation for shutdowner.
    auto coop = env.create_coop( "shutdowner" );

    // Shutdowner agent.
    coop->define_agent()
        .event( mbox, so_5::signal< msg_stop_signal >,
            [&env, mbox]() {
                time_t t = time( 0 );
                std::cout << asctime( localtime( &t ) )
                    << "Stop SObjectizer..." << std::endl;

                // Shutting down SObjectizer.
                env.stop();
            } );

    env.register_coop( std::move( coop ) );
}

// The SObjectizer Environment initialization.
void
init( so_5::rt::so_environment_t & env )
{
    create_hello_coop( env );
    create_shutdowner_coop( env );
}

int
main( int, char ** )
{
    try
    {
        so_5::api::run_so_environment( &init );
    }
    catch( const std::exception & ex )
    {
        std::cerr << "Error: " << ex.what() << std::endl;
        return 1;
    }

    return 0;
}

Sample In Details

Message msg_hello_periodic Declaration

Every message which carries actual data must be derived from so_5::rt::message_t class:

// Hello message.
struct msg_hello_periodic : public so_5::rt::message_t
{
    // Greeting.
    std::string m_message;
};

Creation of Mbox in a_hello_t Constructor

a_hello_t( so_5::rt::so_environment_t & env )
    :   so_5::rt::agent_t( env )
    ,   m_evt_count( 0 )
    ,   m_shutdowner_mbox(
            so_environment().create_local_mbox( "shutdown" ) )
{}

The agent a_hello uses two mboxes. The first one is used for sending and handling the msg_hello_periodic message. A MPSC mbox (which is automatically created for an agent) is enough for those purposes. There is no need to create this mbox, it is accessible via so_direct_mbox() method.

The second mbox is used for sending msg_stop_signal. This mbox is also used by shutdowner agent. But a_hello and shudowner agents are created separately. Thus, it is impossible to share the reference to an anonymous mbox between them. As a solution a named mbox is used here.

The named mbox is created only once and any subsequent call to create_local_mbox() with the same name will return the reference to already created object. So one is able to receive a reference to the same mbox from anywhere in the code.

Initiation of Periodic and Delayed Messages

The lines where periodic and delayed messages are initiated might seem too complicated at first glance. But in reality they are pretty straightforward:

std::unique_ptr< msg_hello_periodic > msg( new msg_hello_periodic );
msg->m_message = "Hello, periodic!";

// Sending a greeting.
m_hello_timer_id =
    so_environment()
        .schedule_timer(
            std::move( msg ),
            so_direct_mbox(),
            // Delay for a second.
            so_5::chrono_helpers::to_ms( std::chrono::seconds(1) ),
            // Repeat every 1.25 of seconds.
            so_5::chrono_helpers::to_ms( std::chrono::milliseconds(1250) ) );

// Sending a stop signal.
m_stop_timer_id =
    so_environment()
        .schedule_timer< msg_stop_signal >(
            m_shutdowner_mbox,
            // Delay for two seconds.
            so_5::chrono_helpers::to_ms( std::chrono::seconds(2) ),
            // Not a periodic.
            0 );

First of all, an instance of msg_hello_periodic is created and a greeting text is set. std::unique_ptr is used to prevent resources leaking in the case of exceptions.

Then a periodic message is initiated. The previously created message instance will be sent every 1250 milliseconds after the first appearance which would be delayed for one second. The message will be sent to m_self_mbox.

A helper function so_5::chrono_helpers::to_ms is used here. It allows to convert the durations specified in C++11 types to milliseconds (because schedule_timer/single_timer uses milliseconds due to historical reasons).

Next, a delayed signal is initiated. It will be sent to the m_shutdowner_mbox after two seconds. The third argument to schedule_timer is 0 here. It specifies that message will be delayed, but not periodically.

Note. The timer ID storing which is returned from schedule_timer is absolutely necessary. The timer is alive until there is at least one timer ID reference. If all of timer ID references are destroyed then the timer is canceled automatically.

Canceling and Rescheduling Messages

The code in a_hello_t::evt_hello_periodic shows how timer-related messages can be canceled and rescheduled:

void
a_hello_t::evt_hello_periodic( const msg_hello_periodic & msg )
{
    time_t t = time( 0 );
    std::cout << asctime( localtime( &t ) )
        << msg.m_message << std::endl;

    if( 5 == ++m_evt_count )
    {
        // Stops hello message.
        m_hello_timer_id.release();
    }
    else
    {
        // Reschedule a stop signal.
        // Previous a stop signal should be canceled.
        m_stop_timer_id =
            so_environment()
                .schedule_timer< msg_stop_signal >(
                    m_shutdowner_mbox,
                    // 1300ms but specified in microsecs just for demonstration.
                    so_5::chrono_helpers::to_ms( std::chrono::microseconds(1300000) ),
                    0 );
    }
}

The construct m_hello_timer_id.release() cancels the delivery of periodic message msg_hello_periodic.

The new value assignment to m_stop_timer_id has the same effect. It is equivalent of:

// Initiate a new delayed message and receive ID of a new timer.
so_5::rt::timer_id_ref_t new_id = so_environment().schedule_timer...;
// Cancel the previous timer.
m_stop_timer_id.release();
// Store a new timer.
m_stop_timer_id = new_id;

Creation of Cooperation with a Single Agent Inside

It is not necessary to create a so_5::rt::agent_coop_t object and fill it up when there is only one agent inside. A special method of so_environment_t can be used in that situation:

// Creation of 'hello' cooperation.
void
create_hello_coop( so_5::rt::so_environment_t & env )
{
    // Single agent can be registered as whole cooperation.
    env.register_agent_as_coop( "hello", new a_hello_t( env ) );
}

Note. A agent_coop_t object creation is used for the shutdowner cooperation in this sample because the single agent in the cooperation is an ad-hoc agent. And the ad-hoc agent can be defined only by define_agent() method of agent_coop_t object.

Capturing a State for Ad-Hoc Agent Event

There is a subtle trick in capturing state for event of ad-hoc shutdowner agent:

// Shutdowner agent.
coop->define_agent()
    .event( mbox, so_5::signal< msg_stop_signal >,
        [&env, mbox]() {
            time_t t = time( 0 );
            std::cout << asctime( localtime( &t ) )
                << "Stop SObjectizer..." << std::endl;

            // Shutting down SObjectizer.
            env.stop();
        } );

A reference to Environment, env, is captured by reference because so_environment_t is an abstract class and env cannot be captured by value.

But the mbox is captured by value. It is very important because capturing by value creates another copy of smart reference to mbox inside lambda-objects. It prevents appearance of invalidated references to mbox.

Creation of Several Cooperations in Initialization Function

Please note that SObjectizer Environment's initialization function, init, creates two cooperations, not one as in the previous sample (see [so-5.4 By Example Minimal Ping-Pong]). It is allowed to create plenty of cooperations inside initialization functions and this could be used in complex applications.


Related

Wiki: Basics
Wiki: so-5.4 Basics
Wiki: so-5.4 By Example Minimal Ping-Pong
Wiki: so-5.4 By Example Synchronous Services and Exceptions