/* * threads_actor_thread.cpp * * Created on: Nov 13, 2021 * Author: erki */ #include "threads_actor_thread.hpp" #include #include #include #include namespace { struct SignalMessage { std::size_t actor_index; const char* data; static SignalMessage fromData(char* data) { SignalMessage message; std::memcpy(&message.actor_index, data, sizeof(message.actor_index)); message.data = data + sizeof(message.actor_index); return message; } }; }// namespace namespace Threads { ActorThread::ActorThread(const std::size_t queue_length, const char* name, const osPriority_t priority, const std::uint32_t stack_size) : Thread(name, priority, stack_size), queue_length_(queue_length), queue_(nullptr) { actors_.fill(nullptr); } void ActorThread::operator()() { init_(); osThreadYield(); while (true) { doMessageLoop_(); osThreadYield(); } } void ActorThread::init_() { for (IActor* actor : actors_) { if (actor != nullptr) { queue_item_length_ = std::max(queue_item_length_, actor->signalSize()); actor->init(); } } queue_ = xQueueCreate(queue_length_, queue_item_length_ + sizeof(std::size_t)); assert(queue_ != nullptr); rx_buffer_ = new char[queue_item_length_ + sizeof(std::size_t)]; tx_buffer_ = new char[queue_item_length_ + sizeof(std::size_t)]; } void ActorThread::doMessageLoop_() { const BaseType_t has_data = xQueueReceive(queue_, rx_buffer_, portMAX_DELAY); if (has_data == pdTRUE) { const SignalMessage message = SignalMessage::fromData(rx_buffer_); assert(message.actor_index <= actors_.size()); IActor* actor = actors_[message.actor_index]; assert(actor != nullptr); actor->dispatchSignal(message.data); } } std::size_t ActorThread::acceptActor_(IActor* actor) { for (auto it = actors_.begin(); it != actors_.end(); ++it) { if (*it == nullptr) { *it = actor; return std::distance(actors_.begin(), it); } } assert(false); return -1; } }// namespace Threads