skullc-peripherals/Threads/Src/threads_actor_thread.cpp

111 lines
2.1 KiB
C++

/*
* threads_actor_thread.cpp
*
* Created on: Nov 13, 2021
* Author: erki
*/
#include "threads_actor_thread.hpp"
#include <algorithm>
#include <cassert>
#include <cstring>
#include <threads_iactor.hpp>
namespace
{
struct SignalMessage
{
std::size_t actor_index;
const char* data;
static SignalMessage fromData(char* data)
{
SignalMessage message;
std::memcpy(&message.actor_index, data, sizeof(message.actor_index));
message.data = data + sizeof(message.actor_index);
return message;
}
};
}// namespace
namespace Threads
{
ActorThread::ActorThread(const std::size_t queue_length, const char* name, const osPriority_t priority, const std::uint32_t stack_size)
: Thread<ActorThread>(name, priority, stack_size), queue_length_(queue_length), queue_(nullptr)
{
actors_.fill(nullptr);
}
void ActorThread::operator()()
{
init_();
osThreadYield();
while (true)
{
doMessageLoop_();
osThreadYield();
}
}
void ActorThread::init_()
{
for (IActor* actor : actors_)
{
if (actor != nullptr)
{
queue_item_length_ = std::max(queue_item_length_, actor->signalSize());
actor->init();
}
}
queue_ = xQueueCreate(queue_length_, queue_item_length_ + sizeof(std::size_t));
assert(queue_ != nullptr);
rx_buffer_ = new char[queue_item_length_ + sizeof(std::size_t)];
tx_buffer_ = new char[queue_item_length_ + sizeof(std::size_t)];
}
void ActorThread::doMessageLoop_()
{
const BaseType_t has_data = xQueueReceive(queue_, rx_buffer_, portMAX_DELAY);
if (has_data == pdTRUE)
{
const SignalMessage message = SignalMessage::fromData(rx_buffer_);
assert(message.actor_index <= actors_.size());
IActor* actor = actors_[message.actor_index];
assert(actor != nullptr);
actor->dispatchSignal(message.data);
}
}
std::size_t ActorThread::acceptActor_(IActor* actor)
{
for (auto it = actors_.begin(); it != actors_.end(); ++it)
{
if (*it == nullptr)
{
*it = actor;
return std::distance(actors_.begin(), it);
}
}
assert(false);
return -1;
}
}// namespace Threads