Menu

[ru] Фильтры сообщений для реализации в версии 5.5.5

2015-04-17
2015-04-24
  • Yauheni Akhotnikau

    В версии 5.5.5 запланировано добавление такой вещи, как фильтры сообщений. Данный пост объясняет, зачем это нужно и как будет выглядеть. Цель такого описания -- дать возможность выявить возможные проблемы в процессе обсуждения, еще до воплощения в коде.

    Назначение фильтров сообщений

    Фильтры сообщений предназначены для того, чтобы воспрепятствовать постановке в очередь заявок тех сообщений, в получении которых агент не заинтересован. Например, в штатном примере queue_size_stats можно посмотреть о чем идет речь:

    void
    evt_quantity(
        const so_5::rt::stats::messages::quantity< std::size_t > & evt )
    {
        // Process only messages related to dispatcher's queue sizes.
        if( so_5::rt::stats::suffixes::work_thread_queue_size() == evt.m_suffix )
        {
            std::ostringstream ss;
    
            ss << "stats: '" << evt.m_prefix << evt.m_suffix << "': " << evt.m_value;
    
            so_5::send< log_message >( m_logger, ss.str() );
        }
    }
    

    Тут агент отказывается обрабатывать сообщения, которые его не интересуют из-за своего содержимого. Ненужные сообщения просто выбрасываются. Но ведь они должны были первоначально попасть в очередь сообщений, затем быть извлеченными оттуда, затем переданными агенту на обработку. И все это для того, чтобы быть выброшенными.

    Представляется, что гораздо эффективнее проверить актуальность сообщения для конкретного получателя еще до того, как сообщение будет помещено в очередь получателя. Что в определенных сценариях позволит избежать постановки в очереди заявок никому не интересных, подлежащих выбрасыванию сообщений.

    В каких сценариях работы будут полезны фильтры сообщений?

    Идея создания фильтров сообщений возникла при работе с мониторинговой информацией самого SObjectizer-а. Все сообщения c run-time статистикой идут в один общий mbox. И получатели, которые заинтересованы только в части из этих сообщений (например, только в сообщениях о размерах очередей отдельных диспетчеров) не имеют никакой возможности оградить себя от всех остальных сообщений. Поэтому самый первый сценарий, в котором фильтры сообщений планируется задействовать -- это run-time мониторинг состояния SObjectizer Environment.

    По аналогии можно предположить, что фильтры сообщений будут полезны при работе с широковещательной рассылкой сообщений вообще. Т.е. если какой-то агент широковещательно рассылает информацию через MPMC-mbox (не суть важно, что это за информация -- текущие показатели температуры воздуха или количество находящихся в обработке транзакций), могут быть агенты-получатели, которые хотели бы фильтровать поток сообщений. Например, получать сообщения только в случае, если температура воздуха выходит за заданные пределы или количество транзакций увеличивается свыше какого-то порога.

    Что из себя будет представлять фильтр-сообщений?

    Фильтр-сообщений -- это объект, на вход которому подается экземпляр сообщения, а на выходе получается значение true (сообщение прошло фильтр и может быть доставлено) или false (сообщение забраковано и не должно доставляться конкретному получателю).

    В программе фильтры сообщений будут создаваться специальными вспомогательными функциями, которые будут ориентированы, в первую очередь на представление фильтра в виде лямбда-функции.

    Как фильтры сообщений планируется задавать?

    Фильтр задается при подписке события агента. При этом используются следующие правила:

    • фильтр может быть задан только при самой первой подписке на сообщение;
    • если при установке фильтра выясняется, что для сообщения уже есть хотя бы одна подписка (не важно, с фильтром или без), то это ошибка (выбрасывается исключение);
    • при последующих подписках фильтр не указывается, используется тот, что был задан в первой подписке;
    • при необходимости сменить фильтр необходимо удалить все подписки на это сообщение, после чего создать их заново (новый фильтр при этом устанавливается при создании самой первой подписки).

    Пример назначения подписки:

    void trx_count_listener::so_define_agent() override
    {
      // Фильтр задается при самой первой подписке на сообщения.
      st_normal.event( mbox, &trx_count_listener::handle_trx_count,
        // Назначение фильтра. Интересуют только сообщения, которые
        // сообщают о слишком большом количестве транзакций.
        message_filter( []( const msg_trx_count & msg ) {
          return msg.m_count > 1000;
        } ) );
      // В остальных сообщениях фильтр не задается.
      st_await_shutdown.event( mbox, &trx_count_listener::log_and_ignore );
      st_shutting_down.event( mbox, &trx_count_listener::abort_execution );
    }
    

    Задание фильтров и признака thread_safety

    Для того, чтобы не создавать большого количества перегруженных функций event(), предлагается объединить задание thread_safety и фильтра сообщений в одном параметре:

    void temperature_listener::so_define_agent() override
    {
      so_subscribe( mbox ).event( temperature_listener::evt_tranform_and_resend,
        // Событие является thread_safe…
        so_5::thread_safe
        // ...плюс задается еще и фильтр.
        + message_filter( []( const msg_temperature & msg ) {
          return !is_normal_temperature( msg );
        } ) );
      ...
    }
    
     
  • Yauheni Akhotnikau

    В процессе продумывания реализации фильтров сообщений (а так же с оглядкой на необходимость поддержки приоритетов для событий в будущем), появляются опасения, что MPSC (или direct) mbox-ы в этом случае потеряют свой смысл.

    Задача direct-mbox-а была в предоставлении возможности прямой доставки сообщения в очередь агента-получателя. Поэтому реализация direct-mbox-ов оказывалась более простой и эффективной, чем реализация MPMC-mbox-а. Внутри direct-mbox-а не хранится никакой информации о типах сообщений, только ссылка на очередь агента-получателя. Это позволяет при выполнении send-а просто дернуть очередь и отдать ей заявку.

    С появлением фильтров поведение direct-mbox-а должно усложниться. Теперь в direct-mbox-е нужно хранить соответствие между типами сообщений и установленными для них фильтрами. Это соответствие (в виде std::map или std::unordered_map) должно быть защищено объектом синхронизации (mutex-ом или spinlock-ом). При доставке сообщения в direct-mbox потребуется захват объекта синхронизации, поиск фильтра по типу сообщения. Т.е. действия по доставке сообщения через direct-mbox становятся практически такими же, как и действия при доставке через MPMC-mbox. А эффективность direct_mbox-а в сравнении с MPMC-mbox падает.

    Что заставляет думать об одном из следующих выходов:

    1. Использовать в реализации direct-mbox-а тот же самый MPMC-mbox. Ведь все равно MPMC и MPSC-mbox-ы начнут работать почти с одинаковой скоростью. Зато не нужно будет сопровождать в коде SObjectizer несколько разных реализаций.

    2. Запретить установку фильтров для direct-mbox-ов. Довольно бессмысленное ограничение. Тем более, что после реализации фильров нужно будет браться за реализацию приоритетов сообщений. А там все опять придет к тому, что внутри direct-mbox-а будет карта отображения типов сообщений на приоритет их обработчиков. И эту карты все равно нужно будет защищать каким-то примитивом синхронизации.

    3. Дабы не терять эффективность работы direct-mbox-ов для тех агентов, которые не нуждаются ни в фильтрах, ни в приоритетах, заставить пользователя указывать те фичи direct-mbox-а, которые он планирует использовать, в конструкторе агента. Что-то вроде:

    // Агент, который не нуждается ни в фильтрах сообщений,
    // ни в приоритетах событий.
    class simplest_agent : public so_5::rt::agent_t
    {
    public :
        simplest_agent( context_t ctx )
            // Контекст напрямую передается базовому классу,
            // что означает, что ни фильтры, ни приоритеты
            // direct-mbox-ом поддерживаться не будут.
            :    so_5::rt::agent_t( ctx )
        {}
    
        virtual void so_define_agent() override
        {
            // Попытка назначить фильтр для сообщения из чужого
            // mbox-а разрешена.
            so_subscribe( some_foreign_mbox ).event(
                &simplest_agent::evt_temperature ),
                message_filter( []( const msg_temperature & msg ) {...} );
    
            // Попытка назначить фильтр для сообщения из своего
            // mbox-а завершиться выбрасыванием исключения.
            so_subscribe_self().event(
                &simplest_agent::evt_load_level,
                // Эта строчка спровоцирует исключение.
                message_filter( []( const msg_load_level & msg ) {...} );
        }
    };
    
    // Агент, которому нужны фильтры сообщений на его
    // direct-mbox-е.
    class complex_agent : public so_5::rt::agent_t
    {
    public :
        complex_agent( context_t ctx )
            // Указываем, что агенту нужна функциональность фильров.
            :    so_5::rt::agent_t( ctx + use_message_filters )
        {}
    
        virtual void so_define_agent() override
        {
            // Попытка назначить фильтр для сообщения из чужого
            // mbox-а разрешена.
            so_subscribe( some_foreign_mbox ).event(
                &complex_agent::evt_temperature ),
                message_filter( []( const msg_temperature & msg ) {...} );
    
            // Попытка назначить фильтр для сообщения из своего
            // mbox-а разрешена уточнением контекста в конструкторе.
            so_subscribe_self().event(
                &complex_agent::evt_load_level,
                message_filter( []( const msg_load_level & msg ) {...} );
        }
    }; 
    

    4. Нужно найти еще какой-то вариант хранения информации о фильтрах агентов, который бы не требовал изменения реализации MPSC-mbox-ов.

     
    • Yauheni Akhotnikau

      Просто в качестве дополнительной информации. Разница в производительности между MPSC и MPMC mbox-ами составляет порядка 24%. Т.е. если простую реализацию MPSC mbox-а заменить более сложной, аналогичной реализации MPMC mbox-а, то можно ожидать падения производительности MPSC-mbox-а где-то в районе 15-20%.

       
  • Yauheni Akhotnikau

    Альтернативный вариант, который не потребует переделки MPSC-mbox-ов (а только MPMC) заключается в реализации операции redirect_if для MPMC-mbox-а:

    virtual void temperature_sensor::so_define_agent() override
    {
        // Назначаем переадресацию нужных нам сообщений
        // на собственный mbox агента.
        m_temperature_sensor_mbox.redirect_if(
            *this, so_direct_mbox(),
            []( const msg_temperature & msg ) {
                return !is_normal_temperature( msg );
            } );
        // Подписываемся уже на сообщения из собственного
        // mbox-а агента.
        so_subscribe_self().event( temperature_sensor::evt_value );
    }
    
     
  • Yauheni Akhotnikau

    Еще дополнительная информация к размышлению по поводу фильтров сообщений.

    Фильтры и лимиты

    Если для сообщения заданы и фильтр, и лимит, то при отсылке сообщения получателю сначала нужно пропустить сообщение через фильтр. И только если оно прошло фильтр, обрабатывать превышение лимита. Т.е. сообщения, которые к агенту не могут прийти, не должны учитываться в лимитах (это важно, т.к. есть несколько мест, где может производится фильтрация сообщений, но при наличии лимита фильтрация должна происходить до обработки лимита).

    Двойная фильтрация

    Возможна следующая ситуация:

    • сначала агент-слушатель сообщений о текущей температуре воздуха был сконфигурирован на получение сообщений, если температура находится за пределами [0..20];
    • была создана подписка с соответствующим фильтром, в очередь к агенту упало 100500 сообщений, прошедших фильтрацию, только часть из них агент успел обработать, остальные остались в очереди;
    • конфигурация агента-слушателя изменилась, теперь его интересуют сообщения о температуре, выходящей за пределы [-5..45].

    Очевидно, что не все сообщения из уже находящихся в очереди агента будут удовлетворять новым условиям. Такие сообщения нужно отфильтровывать. Это означает, что фильтр для сообщения должен применяться дважды:

    1. При отсылке сообщения в mbox. Проверка осуществляется mbox-ом до обработки лимитов и до передачи сообщения в очередь соответствующего агента.
    2. При извлечении заявку на обработку сообщения из очереди агента до вызова обработчика события.
     
  • Yauheni Akhotnikau

    Еще одна альтернативная идея. Нет никаких фильтров сообщений. Но есть возможность запустить обработчик события агента прямо внутри send/deliver_message. В SO-4 такая штука называлась insend-event, т.е. обработчик вызывался на контексте, где происходила отсылка сообщения, минуя постановку заявки в очередь диспетчера. При этом программист должен был гарантировать, что insend-event является thread-safe обработчиком.

    При наличии insend-event-ов фильтрация может строится на основе перенапраления сообщения в другой mbox (по аналогии с описанной выше идеей redirect_if):

    void temperature_listener::so_define_agent()
    {
        // Назначаем переадресацию нужных нам сообщений
        // на собственный mbox агента.
        so_subscribe( m_temperature_sensor_mbox )
            .event( [this]( const msg_temperature & msg ) {
                // Интересует ли нас это сообщение?
                if( !is_normal_temperature( msg ) )
                    // Да, поэтому пересылаем его на собственный mbox.
                    send_to_agent< msg_temperature >( *this, msg );
            },
            // Указываем, что это событие должно быть запущено на
            // контексте отправителя сообщения.
            so_5::insend_event );
    
        // Подписываемся уже на сообщения из собственного
        // mbox-а агента.
        so_subscribe_self().event( temperature_listener::evt_value );
    }
    
     
    • Yauheni Akhotnikau

      Либо, как другой вариант того же самого. Разрешается делать не insend-event-ы у агента, а делать вот такой dispatcher_binder -- insend. Т.е. контекст на обработку события агента предоставляется прямо на контексте нити отправителя сообщения.

      В этом случае фильтр оформляется в виде отдельного, скорее всего ad-hoc, агента:

      // При создании кооперации с temperature_listener-ом...
      env.introduce_coop( ..., []( agent_coop_t & coop ) {
          // Сам listener.
          auto listener = coop.make_agent< temperature_listener >(...);
          // Агент для фильтрации сообщений.
          coop.define_agent( create_insend_binder() )
              .event( temperature_sensor_mbox,
                  [listener]( const msg_temperature & msg ) {
                      if( !is_normal_temperature( msg ) )
                          send_to_agent< msg_temperature >(
                              *listener, msg );
                  } );
          ...
      } );
      
       

      Last edit: Yauheni Akhotnikau 2015-04-22
  • Yauheni Akhotnikau

    Похоже, самым простым способом в реализации, а так же достаточно понятным в использовании, будет следующий механизм.

    Фильтр для доставки сообщений, удовлетворяющих некоторым условиям устанавливается на MPMC-mbox посредством метода so_set_delivery_filter():

    void temperature_listener::so_define_agent() {
      so_set_delivery_filter( sensor_mbox,
        []( const msg_temperature & msg ) {
          return !is_normal_temperature( msg );
        } );
      ...
    }
    

    Заменить один фильтр другим можно вызвав so_set_delivery_filter() еще раз, передав в него новый фильтр. При этом двойная фильтрация сообщений не выполняется. Т.е. если в очередь встало сообщение, удовлетворявшее первому фильтру, то оно дойдет до агента даже если второй фильтр его пропустить не должен. Такие ситуации разработчик должен обрабатывать самостоятельно.

    Удалить фильтр можно посредством метода so_drop_delivery_filter():

    so_drop_delivery_filter< msg_temperature >( sensor_mbox );
    

    При подписке никаких фильров указывать не нужно, все методы для подписки событий агентов остаются в том виде, в котором они есть сейчас.

    Фильтр можно установить в любой момент -- вне зависимости, определены уже подписки на сообщение из конкретного mbox-а или нет. Если фильтр определен, а подписок нет, то mbox не будет доставлять сообщение до агента.

    Если фильтр устанавливается уже после того, как сделана подписка, то в очереди заявок агента могут оказаться сообщения, отправленые ему еще до установки фильтра и которые фильтр бы не пропустил бы.

    Отмена фильтра никак не сказывается на подписках. Если подписки есть, то они сохраняются и после того, как фильтр будет удален.

     

Log in to post a comment.