diff --git a/Threads/Inc/threads_actor.hpp b/Threads/Inc/threads_actor.hpp index 41037a1..fb81899 100644 --- a/Threads/Inc/threads_actor.hpp +++ b/Threads/Inc/threads_actor.hpp @@ -15,15 +15,15 @@ #include #include -#include "threads_primitivethread.hpp" -#include "threads_signal.hpp" -#include "threads_timer.hpp" +#include +#include +#include namespace Threads { template -class Actor +class Actor : public IActor { public: using value_type = std::variant; @@ -31,14 +31,11 @@ public: static_assert(std::is_default_constructible_v, "Ts must be default constructible."); static_assert(std::is_trivially_copyable_v, "Ts must be trivially copyable."); - Actor(const std::uint32_t queue_size, const char* name, const osPriority_t priority, const std::uint32_t stack_size) - : thread_(&Actor::threadHandler_, this, name, priority, stack_size), - msg_queue_(xQueueCreate(queue_size, sizeof(value_type))), signal_(this) - { - assert(msg_queue_); - } + Actor() : IActor(sizeof(value_type)), + signal_(this) + {} - virtual ~Actor() {} + ~Actor() override {} template struct Signal : Signallable @@ -59,17 +56,13 @@ public: void emit(const T& data) override { parent::value_type to_send = data; - xQueueSend(q_->msg_queue_, &to_send, 0); + q_->dispatchEvent(to_send); } BaseType_t emitFromIsr(const T& data) override { parent::value_type to_send = data; - BaseType_t was_awoken = pdFALSE; - - xQueueSendFromISR(q_->msg_queue_, &to_send, &was_awoken); - - return was_awoken; + return q_->dispatchEventFromIsr(to_send); } private: @@ -97,13 +90,12 @@ public: return &signal_; } -protected: - virtual void init() = 0; + void dispatchSignal(const char* data) final + { + dispatchImpl_(data); + } private: - PrimitiveThread thread_; - QueueHandle_t msg_queue_; - struct Visitor_ { using parent = Actor; @@ -121,113 +113,15 @@ private: } }; - void operator()() + void dispatchImpl_(const char* data) { - init(); - - value_type data{}; - - while (true) - { - const BaseType_t got = xQueueReceive(msg_queue_, &data, portMAX_DELAY); - - if (got == pdTRUE) - { - std::visit(Visitor_{this}, data); - } - } - } - - static void threadHandler_(void* d) - { - auto* dd = static_cast*>(d); - (*dd)(); + const value_type* signal_data = reinterpret_cast(data); + std::visit(Visitor_{this}, *signal_data); } Signal signal_; }; -template -class Actor -{ -public: - using value_type = void; - - Actor(const std::uint32_t queue_size, const char* name, const osPriority_t priority, const std::uint32_t stack_size) - : thread_([](void* d) { - auto* dd = static_cast*>(d); - (*dd)(); - }, - this, name, priority, stack_size), - msg_queue_(xQueueCreate(queue_size, sizeof(int))), signal_(*this) - { - assert(msg_queue_); - } - - virtual ~Actor() {} - - Signallable* getSignal() - { - return &signal_; - } - -protected: - virtual void init() = 0; - -private: - PrimitiveThread thread_; - QueueHandle_t msg_queue_; - - void operator()() - { - init(); - - int data; - - while (true) - { - const BaseType_t got = xQueueReceive(msg_queue_, &data, portMAX_DELAY); - - if (got == pdTRUE) - { - auto* crtp = static_cast(this); - crtp->onSignal(); - } - } - } - - friend struct Signal_; - - struct Signal_ : Signallable - { - using parent = Actor; - parent& q; - - explicit Signal_(parent& q) - : q(q) - {} - - void emit() override - { - const int data = 0; - - xQueueSend(q.msg_queue_, &data, 0); - } - - BaseType_t emitFromIsr() override - { - const int data = 0; - - BaseType_t was_awoken = pdFALSE; - xQueueSendFromISR(q.msg_queue_, &data, &was_awoken); - - return was_awoken; - } - }; - - Signal_ signal_; -}; - }// namespace Threads diff --git a/Threads/Inc/threads_actor_thread.hpp b/Threads/Inc/threads_actor_thread.hpp new file mode 100644 index 0000000..4929b66 --- /dev/null +++ b/Threads/Inc/threads_actor_thread.hpp @@ -0,0 +1,50 @@ +/* + * threads_actor_thread.hpp + * + * Created on: Nov 13, 2021 + * Author: erki + */ + +#ifndef SKULLC_THREADS_ACTOR_THREAD_HPP_ +#define SKULLC_THREADS_ACTOR_THREAD_HPP_ + +#include +#include + +#include +#include + +#include + +namespace Threads +{ + +class IActor; + +class ActorThread : public Thread +{ +public: + ActorThread(const std::size_t queue_length, const char* name, const osPriority_t priority, const std::uint32_t stack_size); + + void operator()(); + +private: + void init_(); + void doMessageLoop_(); + + friend class IActor; + std::size_t acceptActor_(IActor* actor); + + std::size_t queue_length_; + std::size_t queue_item_length_ = 0; + + QueueHandle_t queue_ = nullptr; + char* rx_buffer_ = nullptr; + char* tx_buffer_ = nullptr; + std::array actors_; +}; + +}// namespace Threads + + +#endif /* SKULLC_THREADS_ACTOR_THREAD_HPP_ */ diff --git a/Threads/Inc/threads_iactor.hpp b/Threads/Inc/threads_iactor.hpp new file mode 100644 index 0000000..011f448 --- /dev/null +++ b/Threads/Inc/threads_iactor.hpp @@ -0,0 +1,68 @@ +/* + * threads_iactor.hpp + * + * Created on: Nov 14, 2021 + * Author: erki + */ + +#ifndef SKULLC_THREADS_IACTOR_HPP_ +#define SKULLC_THREADS_IACTOR_HPP_ + +#include + +#include + +namespace Threads +{ + +class ActorThread; + +class IActor +{ +public: + IActor(const std::size_t signal_size) + : signal_size_(signal_size) + {} + + virtual ~IActor() {} + + virtual void init() = 0; + virtual void dispatchSignal(const char* data) = 0; + + void moveToThread(ActorThread* thread); + + std::size_t actorIndex() const + { + return actor_index_; + } + + std::size_t signalSize() const + { + return signal_size_; + } + +protected: + template + void dispatchEvent(const T& data) + { + dispatchEventImpl_(&data, sizeof(T)); + } + + template + BaseType_t dispatchEventFromIsr(const T& data) + { + return dispatchEventFromIsrImpl_(&data, sizeof(T)); + } + +private: + ActorThread* parent_thread_ = nullptr; + std::size_t actor_index_ = 0; + std::size_t signal_size_; + + void dispatchEventImpl_(const void* data, const std::size_t data_size); + BaseType_t dispatchEventFromIsrImpl_(const void* data, const std::size_t data_size); +}; + +}// namespace Threads + +#endif /* SKULLC_THREADS_IACTOR_HPP_ */ diff --git a/Threads/Src/threads_actor_thread.cpp b/Threads/Src/threads_actor_thread.cpp new file mode 100644 index 0000000..c68812d --- /dev/null +++ b/Threads/Src/threads_actor_thread.cpp @@ -0,0 +1,110 @@ +/* + * threads_actor_thread.cpp + * + * Created on: Nov 13, 2021 + * Author: erki + */ + + +#include "threads_actor_thread.hpp" + +#include +#include +#include + +#include + +namespace +{ + +struct SignalMessage +{ + std::size_t actor_index; + const char* data; + + static SignalMessage fromData(char* data) + { + SignalMessage message; + std::memcpy(&message.actor_index, data, sizeof(message.actor_index)); + message.data = data + sizeof(message.actor_index); + + return message; + } +}; + +}// namespace + +namespace Threads +{ + +ActorThread::ActorThread(const std::size_t queue_length, const char* name, const osPriority_t priority, const std::uint32_t stack_size) + : Thread(name, priority, stack_size), queue_length_(queue_length), queue_(nullptr) +{ + actors_.fill(nullptr); +} + +void ActorThread::operator()() +{ + init_(); + + osThreadYield(); + + while (true) + { + doMessageLoop_(); + + osThreadYield(); + } +} + +void ActorThread::init_() +{ + for (IActor* actor : actors_) + { + if (actor != nullptr) + { + queue_item_length_ = std::max(queue_item_length_, actor->signalSize()); + + actor->init(); + } + } + + queue_ = xQueueCreate(queue_length_, queue_item_length_ + sizeof(std::size_t)); + assert(queue_ != nullptr); + + rx_buffer_ = new char[queue_item_length_ + sizeof(std::size_t)]; + tx_buffer_ = new char[queue_item_length_ + sizeof(std::size_t)]; +} + +void ActorThread::doMessageLoop_() +{ + const BaseType_t has_data = xQueueReceive(queue_, rx_buffer_, portMAX_DELAY); + + if (has_data == pdTRUE) + { + const SignalMessage message = SignalMessage::fromData(rx_buffer_); + assert(message.actor_index <= actors_.size()); + + IActor* actor = actors_[message.actor_index]; + assert(actor != nullptr); + + actor->dispatchSignal(message.data); + } +} + +std::size_t ActorThread::acceptActor_(IActor* actor) +{ + for (auto it = actors_.begin(); it != actors_.end(); ++it) + { + if (*it == nullptr) + { + *it = actor; + return std::distance(actors_.begin(), it); + } + } + + assert(false); + return -1; +} + +}// namespace Threads diff --git a/Threads/Src/threads_iactor.cpp b/Threads/Src/threads_iactor.cpp new file mode 100644 index 0000000..3131010 --- /dev/null +++ b/Threads/Src/threads_iactor.cpp @@ -0,0 +1,55 @@ +/* + * threads_iactor.cpp + * + * Created on: Nov 14, 2021 + * Author: erki + */ + + +#include "threads_iactor.hpp" + +#include + +#include + +#include "threads_actor_thread.hpp" + +namespace Threads +{ + +void IActor::moveToThread(ActorThread* thread) +{ + actor_index_ = thread->acceptActor_(this); + parent_thread_ = thread; +} + +void IActor::dispatchEventImpl_(const void* data, const std::size_t data_size) +{ + taskENTER_CRITICAL(); + + std::memcpy(parent_thread_->tx_buffer_, &actor_index_, sizeof(std::size_t)); + char* data_dest = parent_thread_->tx_buffer_ + sizeof(std::size_t); + std::memcpy(data_dest, data, data_size); + + xQueueSend(parent_thread_->queue_, parent_thread_->tx_buffer_, 0); + + taskEXIT_CRITICAL(); +} + +BaseType_t IActor::dispatchEventFromIsrImpl_(const void* data, const std::size_t data_size) +{ + const auto isr_data = taskENTER_CRITICAL_FROM_ISR(); + + std::memcpy(parent_thread_->tx_buffer_, &actor_index_, sizeof(std::size_t)); + char* data_dest = parent_thread_->tx_buffer_ + sizeof(std::size_t); + std::memcpy(data_dest, data, data_size); + + BaseType_t task_awoken = pdFALSE; + xQueueSendFromISR(parent_thread_->queue_, parent_thread_->tx_buffer_, &task_awoken); + + taskEXIT_CRITICAL_FROM_ISR(isr_data); + + return task_awoken; +} + +}// namespace Threads