Program Listing for File CommandResponder.hpp

Return to documentation for file (src/mimir/control/CommandResponder.hpp)

#pragma once

#include <boost/log/trivial.hpp>
#include "mimir/StateMachineFwd.hpp"

#include <memory>

#include "mimir/FkinDds.hpp"

// Note: This functionality is heavily inspired/copied from odss::reqrep
namespace mimir
{
  namespace detail
  {
    inline dds::sub::qos::DataReaderQos ReqReaderQos(const dds::sub::Subscriber& s)
    {
      auto qos = s.default_datareader_qos();
      qos << dds::core::policy::Durability::TransientLocal()
          << dds::core::policy::Reliability::Reliable();
      return qos;
    }
    inline dds::pub::qos::DataWriterQos RepWriterQos(const dds::pub::Publisher& p)
    {
      auto qos = p.default_datawriter_qos();
      qos << dds::core::policy::Durability::TransientLocal();
      // should TransientLocal be removed?
      return qos;
    }
  }

  namespace control
  {

    template <typename T>
    class CommandListener :
      public dds::sub::NoOpDataReaderListener<T>
    {
    public:
      CommandListener(
          std::function<void(dds::sub::DataReader<T>&)> doHandle)
      {
        m_doHandleFcn = std::move(doHandle);
      }

      virtual void on_data_available(dds::sub::DataReader<T>& dataReader)
      {
        if(m_doHandleFcn)
          m_doHandleFcn(dataReader);
      }

    private:
      std::function<void(dds::sub::DataReader<T>&)> m_doHandleFcn;
    };


    class CommandResponder
    {
    public:

      CommandResponder(
          const std::string& requestTopicName,
          const std::string& replyTopicName,
          const std::string& recipient,
          dds::pub::Publisher publisher,
          dds::sub::Subscriber subscriber,
          boost::statechart::fifo_scheduler<> & scheduler,
          boost::statechart::fifo_scheduler<>::processor_handle machine) :
        m_scheduler(scheduler),
        m_stateMachine(machine),
        m_requestReader(dds::core::null),
        m_replyWriter(dds::core::null)
      {

        try
        {
          dds::topic::Topic<fkin::Command> requestTopic(
              subscriber.participant(),
              requestTopicName);
          dds::topic::Filter filter("header.recipient = %0", {recipient});
          dds::topic::ContentFilteredTopic<fkin::Command> filteredTopic(
              requestTopic, recipient + requestTopicName, filter);

          m_requestReader = dds::sub::DataReader<fkin::Command>(
              subscriber,
              filteredTopic,
              detail::ReqReaderQos(subscriber));

          m_replyWriter = dds::pub::DataWriter<fkin::CommandResponse>(
              publisher,
              dds::topic::Topic<fkin::CommandResponse>(
                  publisher.participant(),
                  replyTopicName),
              detail::RepWriterQos(publisher));

          // clear reader queue of any old samples
          m_requestReader.wait_for_historical_data(dds::core::Duration::infinite());
          m_requestReader.take();

          m_listener = std::make_unique<CommandListener<fkin::Command>>(
              [=](dds::sub::DataReader<fkin::Command>&){ handleResponse(); });
          m_requestReader.listener(
              m_listener.get(),
              dds::core::status::StatusMask::data_available());
        }
        catch (const dds::core::Exception& e) {
          BOOST_LOG_TRIVIAL(fatal) << "DDS exception: " + std::string(e.what());
          throw std::runtime_error("CommandResponder failed to contruct");
        }

      }

      CommandResponder(const CommandResponder&) = delete;
      CommandResponder& operator=(const CommandResponder&) = delete;
      CommandResponder(CommandResponder&&) = default;
      CommandResponder& operator=(CommandResponder&&) = default;

      ~CommandResponder()
      {
        m_requestReader.listener(nullptr, dds::core::status::StatusMask::none());
      }

    private:
      void handleResponse()
      {

        const auto samples = m_requestReader.select()
         .state(dds::sub::status::DataState::new_data())
         .take();

        std::for_each(
            samples.begin(),
            samples.end(),
            [=](const dds::sub::Sample<fkin::Command>& sample)
            {
              std::string cmd("Noop");
              if (sample.info().valid())
              {
                switch(sample.data().command())
                {
                case fkin::CommandType::START_PROCESS:
                  cmd = "Started " + sample.data().header().recipient();
                  m_scheduler.queue_event(
                      m_stateMachine,
                      make_intrusive(new mimir::EvStart()));
                  break;
                case fkin::CommandType::STOP_PROCESS:
                  cmd = "Stopped " + sample.data().header().recipient();
                  m_scheduler.queue_event(
                      m_stateMachine,
                      make_intrusive(new mimir::EvStop()));
                  break;
                case fkin::CommandType::TERMINATE_PROCESS:
                  cmd = "Terminated " + sample.data().header().recipient();
                  m_scheduler.queue_event(
                      m_stateMachine,
                      make_intrusive(new mimir::EvKill()));
                  break;
                default:
                  ;
                }
              }

              // Send response
              auto response = fkin::CommandResponse(
                  fkin::ReplyHeader(sample.data().header().requestID()),
                  true,
                  cmd);

              m_replyWriter << response;
            });
      }

      CommandResponder() = delete;
      boost::statechart::fifo_scheduler<> & m_scheduler;
      boost::statechart::fifo_scheduler<>::processor_handle m_stateMachine;
      dds::sub::DataReader<fkin::Command> m_requestReader;
      dds::pub::DataWriter<fkin::CommandResponse> m_replyWriter;
      std::unique_ptr<CommandListener<fkin::Command>> m_listener;

    };


  }
}