Program Listing for File CommandResponder.hpp¶
↰ Return to documentation for file (src/mimir/control/CommandResponder.hpp
)
#pragma once
#include <boost/log/trivial.hpp>
#include "mimir/StateMachineFwd.hpp"
#include <memory>
#include "mimir/FkinDds.hpp"
// Note: This functionality is heavily inspired/copied from odss::reqrep
namespace mimir
{
namespace detail
{
inline dds::sub::qos::DataReaderQos ReqReaderQos(const dds::sub::Subscriber& s)
{
auto qos = s.default_datareader_qos();
qos << dds::core::policy::Durability::TransientLocal()
<< dds::core::policy::Reliability::Reliable();
return qos;
}
inline dds::pub::qos::DataWriterQos RepWriterQos(const dds::pub::Publisher& p)
{
auto qos = p.default_datawriter_qos();
qos << dds::core::policy::Durability::TransientLocal();
// should TransientLocal be removed?
return qos;
}
}
namespace control
{
template <typename T>
class CommandListener :
public dds::sub::NoOpDataReaderListener<T>
{
public:
CommandListener(
std::function<void(dds::sub::DataReader<T>&)> doHandle)
{
m_doHandleFcn = std::move(doHandle);
}
virtual void on_data_available(dds::sub::DataReader<T>& dataReader)
{
if(m_doHandleFcn)
m_doHandleFcn(dataReader);
}
private:
std::function<void(dds::sub::DataReader<T>&)> m_doHandleFcn;
};
class CommandResponder
{
public:
CommandResponder(
const std::string& requestTopicName,
const std::string& replyTopicName,
const std::string& recipient,
dds::pub::Publisher publisher,
dds::sub::Subscriber subscriber,
boost::statechart::fifo_scheduler<> & scheduler,
boost::statechart::fifo_scheduler<>::processor_handle machine) :
m_scheduler(scheduler),
m_stateMachine(machine),
m_requestReader(dds::core::null),
m_replyWriter(dds::core::null)
{
try
{
dds::topic::Topic<fkin::Command> requestTopic(
subscriber.participant(),
requestTopicName);
dds::topic::Filter filter("header.recipient = %0", {recipient});
dds::topic::ContentFilteredTopic<fkin::Command> filteredTopic(
requestTopic, recipient + requestTopicName, filter);
m_requestReader = dds::sub::DataReader<fkin::Command>(
subscriber,
filteredTopic,
detail::ReqReaderQos(subscriber));
m_replyWriter = dds::pub::DataWriter<fkin::CommandResponse>(
publisher,
dds::topic::Topic<fkin::CommandResponse>(
publisher.participant(),
replyTopicName),
detail::RepWriterQos(publisher));
// clear reader queue of any old samples
m_requestReader.wait_for_historical_data(dds::core::Duration::infinite());
m_requestReader.take();
m_listener = std::make_unique<CommandListener<fkin::Command>>(
[=](dds::sub::DataReader<fkin::Command>&){ handleResponse(); });
m_requestReader.listener(
m_listener.get(),
dds::core::status::StatusMask::data_available());
}
catch (const dds::core::Exception& e) {
BOOST_LOG_TRIVIAL(fatal) << "DDS exception: " + std::string(e.what());
throw std::runtime_error("CommandResponder failed to contruct");
}
}
CommandResponder(const CommandResponder&) = delete;
CommandResponder& operator=(const CommandResponder&) = delete;
CommandResponder(CommandResponder&&) = default;
CommandResponder& operator=(CommandResponder&&) = default;
~CommandResponder()
{
m_requestReader.listener(nullptr, dds::core::status::StatusMask::none());
}
private:
void handleResponse()
{
const auto samples = m_requestReader.select()
.state(dds::sub::status::DataState::new_data())
.take();
std::for_each(
samples.begin(),
samples.end(),
[=](const dds::sub::Sample<fkin::Command>& sample)
{
std::string cmd("Noop");
if (sample.info().valid())
{
switch(sample.data().command())
{
case fkin::CommandType::START_PROCESS:
cmd = "Started " + sample.data().header().recipient();
m_scheduler.queue_event(
m_stateMachine,
make_intrusive(new mimir::EvStart()));
break;
case fkin::CommandType::STOP_PROCESS:
cmd = "Stopped " + sample.data().header().recipient();
m_scheduler.queue_event(
m_stateMachine,
make_intrusive(new mimir::EvStop()));
break;
case fkin::CommandType::TERMINATE_PROCESS:
cmd = "Terminated " + sample.data().header().recipient();
m_scheduler.queue_event(
m_stateMachine,
make_intrusive(new mimir::EvKill()));
break;
default:
;
}
}
// Send response
auto response = fkin::CommandResponse(
fkin::ReplyHeader(sample.data().header().requestID()),
true,
cmd);
m_replyWriter << response;
});
}
CommandResponder() = delete;
boost::statechart::fifo_scheduler<> & m_scheduler;
boost::statechart::fifo_scheduler<>::processor_handle m_stateMachine;
dds::sub::DataReader<fkin::Command> m_requestReader;
dds::pub::DataWriter<fkin::CommandResponse> m_replyWriter;
std::unique_ptr<CommandListener<fkin::Command>> m_listener;
};
}
}