Menu

so-5.4 By Example Synchronous Services and Exceptions

Yauheni Akhotnikau Boris Sivko

Introduction

This sample demonstrates working synchronous services. It is strongly recommended to read [so-5.4 Basics], [so-5.4 By Example Minimal Ping-Pong] and [so-5.4 By Example Periodic Hello] before for a better understanding of what is happening here in the code.

There are two agents which are working on different execution context. One of them provides service for converting strings to integers and the other uses this service by synchronous service requests.

This sample also demonstrates handling of exceptions: one of them is thrown by the first agent if conversion is impossible, and the other is thrown by SObjectizer if conversion takes too much time (this situation is provoked by the first agent just for demonstration).

Sample Code

/*
 * A sample for an exception from service handler demonstration.
 */

#include <iostream>
#include <stdexcept>
#include <sstream>
#include <chrono>

#include <so_5/rt/h/rt.hpp>
#include <so_5/api/h/api.hpp>
#include <so_5/h/types.hpp>

#include <so_5/disp/active_obj/h/pub.hpp>

struct msg_convert : public so_5::rt::message_t
    {
        const std::string m_value;

        msg_convert( const std::string & value ) : m_value( value )
            {}
    };

class a_convert_service_t
    :   public so_5::rt::agent_t
    {
    public :
        a_convert_service_t(
            so_5::rt::so_environment_t & env,
            const so_5::rt::mbox_ref_t & self_mbox )
            :   so_5::rt::agent_t( env )
            ,   m_self_mbox( self_mbox )
            {}

        virtual void
        so_define_agent()
            {
                so_subscribe( m_self_mbox )
                        .event( []( const msg_convert & msg ) -> int
                        {
                            std::istringstream s( msg.m_value );
                            int result;
                            s >> result;
                            if( s.fail() )
                                throw std::invalid_argument( "unable to convert to int: '" +
                                        msg.m_value + "'" );

                            // A special case for timeout imitation.
                            if( 42 == result )
                                std::this_thread::sleep_for( std::chrono::milliseconds(150) );

                            return result;
                        } );
            }

    private :
        const so_5::rt::mbox_ref_t m_self_mbox;
    };

class a_client_t
    :   public so_5::rt::agent_t
    {
    public :
        a_client_t(
            so_5::rt::so_environment_t & env,
            const so_5::rt::mbox_ref_t & svc_mbox )
            :   so_5::rt::agent_t( env )
            ,   m_svc_mbox( svc_mbox )
            {}

        virtual void
        so_evt_start()
            {
                std::string values_to_convert[] = {
                        "1", "2", "a1", "a2", "3", "a3", "41", "42", "43" };

                auto svc = m_svc_mbox->get_one< int >().wait_for(
                        std::chrono::milliseconds( 100 ) );

                for( const auto & s : values_to_convert )
                    {
                        try
                            {
                                std::cout << "converting '" << s << "'" << std::flush;

                                auto r = svc.sync_get( new msg_convert( s ) );

                                std::cout << "' -> " << r << std::endl;
                            }
                        catch( const std::exception & x )
                            {
                                std::cerr << "\n*** an exception during converting "
                                        "value '" << s << "': " << x.what()
                                        << std::endl;
                            }
                    }

                so_environment().stop();
            }

    private :
        const so_5::rt::mbox_ref_t m_svc_mbox;
    };

void
init(
    so_5::rt::so_environment_t & env )
    {
        auto coop = env.create_coop(
                "test_coop",
                so_5::disp::active_obj::create_disp_binder( "active_obj" ) );

        auto svc_mbox = env.create_local_mbox();

        coop->add_agent( new a_convert_service_t( env, svc_mbox ) );
        coop->add_agent( new a_client_t( env, svc_mbox ) );

        env.register_coop( std::move( coop ) );
    }

int
main( int, char ** )
    {
        try
            {
                so_5::api::run_so_environment(
                    &init,
                    []( so_5::rt::so_environment_params_t & p ) {
                        p.add_named_dispatcher(
                            "active_obj",
                            so_5::disp::active_obj::create_disp() );
                    } );
            }
        catch( const std::exception & ex )
            {
                std::cerr << "Error: " << ex.what() << std::endl;
                return 1;
            }

        return 0;
    }

Sample in Details

Conversion Service Request Handler

SObjectizer supports some form of synchronous interaction between agents and uses a slightly different terminology for it. Sending a message to an agent and waiting for a message processing result is called service request. Handling a message on the processing result of it is waiting someone is called service request handling. And the event handler which is used for service request handling is called service request handler.

But when we are speaking about service request handlers the different terminology is just for convenience. Technically, there is no difference between ordinary event handler and service request handler: they have the same format and they use the same forms of message/signal subscription. Thus, the terms service request handler and event handler are interchangeable. Even more -- there is no possibility for event handler to distinguish the fact, that is it called for ordinary asynchronous message processing or for a serving service request. It means that service request handlers always work asynchronously. Only the service requester knows about synchronous part of that interaction.

Because of that the service request handler is defined and subscribed as an ordinary event handler out of previous samples:

so_subscribe( m_self_mbox )
        .event( []( const msg_convert & msg ) -> int
        {
            std::istringstream s( msg.m_value );
            int result;
            s >> result;
            if( s.fail() )
                throw std::invalid_argument( "unable to convert to int: '" +
                        msg.m_value + "'" );

            // A special case for timeout imitation.
            if( 42 == result )
                std::this_thread::sleep_for( std::chrono::milliseconds(150) );

            return result;
        } );

Service Invocation

There are three forms of synchronous service request invocation. All of them start from the construct mbox->get_one<RESULT_TYPE>().

The simplest form is an infinite waiting of service request processing result. For this sample it can be written as follows:

m_svc_mbox->get_one<int>().wait_forever().sync_get( new msg_convert( str ) );

or if the compiler supports variadic templates:

m_svc_mbox->get_one<int>().wait_forever().sync_get< msg_convert >( str );

It is the most dangerous form because it can easily lead to a deadlock. The second form with waiting timeout is preferable:

m_svc_mbox->get_one<int>().wait_for(timeout).sync_get( new msg_convert( str ) );

or if the compiler supports variadic templates:

m_svc_mbox->get_one<int>().wait_for(timeout).sync_get< msg_convert >( str );

This form is used in that sample. But there is the third, the most flexible form, which returns std::future object:

std::future<int> r = m_svc_mbox->get_one<int>().async( new msg_convert( str ) );

or if the compiler supports variadic templates:

std::future<int> r = m_svc_mbox->get_one<int>().async< msg_convert >( str );

User can do various forms of checking for result readiness and waiting for different timeouts when std::future object is returned by get_one().async() method chain. However, this is out of scope of this sample discussion.

In this sample the service invocation is simple and straightforward. There is a serie of string values and loop for that serie. For each value an attempt of conversion is performed. If the attempt fails with an exception that exception is logged and the next value is processed:

std::string values_to_convert[] = {
        "1", "2", "a1", "a2", "3", "a3", "41", "42", "43" };

auto svc = m_svc_mbox->get_one< int >().wait_for(
        std::chrono::milliseconds( 100 ) );

for( const auto & s : values_to_convert )
    {
        try
            {
                std::cout << "converting '" << s << "'" << std::flush;

                auto r = svc.sync_get( new msg_convert( s ) );

                std::cout << "' -> " << r << std::endl;
            }
        catch( const std::exception & x )
            {
                std::cerr << "\n*** an exception during converting "
                    "value '" << s << "': " << x.what()
                    << std::endl;
            }
    }

The only trick here is a short form of get_one().wait_for().sync_get() method chain. It is based on the fact that get_one().wait_for() returns a special proxy object. That object could be stored and used later to shorten the service invocation code.

Exceptions Propagation

Please note that an exception raised in service request handler:

if( s.fail() )
    throw std::invalid_argument( "unable to convert to int: '" +
        msg.m_value + "'" );

is then propagated to the service requester.

This is one of the principal differences between asynchronous event handlers and synchronous service request handlers. If an event handler throws out an exception then SObjectizer uses a special mechanism to determine what to do with that agent (or its cooperation or even with all Environment). See [so-5.3.0 Exception reaction inheritance] for the details.

But if a service request handler throws an exception then that exception is stored in the underlying std::promise/std::future objects and then propagated to the service requester.

Binding of Agents to Different Working Threads

It is very important to guarantee that service requester and service provider are working on different working threads. If they don't, there would be a deadlock. There isn't any help from SObjectizer to detect or prevent the deadlock (this is way get_one().wait_for() is the most prefferable service request form).

That sample solves this problem by binding agents to different working threads. The standard active object dispatcher is used for that:

// Addition of active object dispatcher to the Environment:
so_5::api::run_so_environment(
    &init,
    []( so_5::rt::so_environment_params_t & p ) {
        p.add_named_dispatcher(
            "active_obj",
            so_5::disp::active_obj::create_disp() );
    } );

and

// Setting active objects dispatcher as a default dispatcher for all of cooperation agents.
auto coop = env.create_coop(
    "test_coop",
    so_5::disp::active_obj::create_disp_binder( "active_obj" ) );

More Information

See [so-5.3.0 Synchronous interaction with agents] for more details about service requests.


Related

Wiki: Basics
Wiki: so-5.3.0 Exception reaction inheritance
Wiki: so-5.3.0 Synchronous interaction with agents
Wiki: so-5.4 Basics
Wiki: so-5.4 By Example Minimal Ping-Pong
Wiki: so-5.4 By Example Periodic Hello