Compare commits

...

2 Commits

Author SHA1 Message Date
Erki
0d1ea1c1c2 Peripherals: format ICM file
All checks were successful
continuous-integration/drone/push Build is passing
2021-11-14 01:31:12 +02:00
Erki
43bc8d8265 Threads: Actors are now composable over threads 2021-11-14 01:31:01 +02:00
6 changed files with 307 additions and 130 deletions

View File

@ -15,15 +15,15 @@
#include <type_traits> #include <type_traits>
#include <variant> #include <variant>
#include "threads_primitivethread.hpp" #include <threads_actor_thread.hpp>
#include "threads_signal.hpp" #include <threads_iactor.hpp>
#include "threads_timer.hpp" #include <threads_signal.hpp>
namespace Threads namespace Threads
{ {
template<typename CRTP, typename... Ts> template<typename CRTP, typename... Ts>
class Actor class Actor : public IActor
{ {
public: public:
using value_type = std::variant<Ts...>; using value_type = std::variant<Ts...>;
@ -31,14 +31,11 @@ public:
static_assert(std::is_default_constructible_v<value_type>, "Ts must be default constructible."); static_assert(std::is_default_constructible_v<value_type>, "Ts must be default constructible.");
static_assert(std::is_trivially_copyable_v<value_type>, "Ts must be trivially copyable."); static_assert(std::is_trivially_copyable_v<value_type>, "Ts must be trivially copyable.");
Actor(const std::uint32_t queue_size, const char* name, const osPriority_t priority, const std::uint32_t stack_size) Actor() : IActor(sizeof(value_type)),
: thread_(&Actor<CRTP, Ts...>::threadHandler_, this, name, priority, stack_size), signal_(this)
msg_queue_(xQueueCreate(queue_size, sizeof(value_type))), signal_(this) {}
{
assert(msg_queue_);
}
virtual ~Actor() {} ~Actor() override {}
template<typename T> template<typename T>
struct Signal : Signallable<T> struct Signal : Signallable<T>
@ -59,17 +56,13 @@ public:
void emit(const T& data) override void emit(const T& data) override
{ {
parent::value_type to_send = data; parent::value_type to_send = data;
xQueueSend(q_->msg_queue_, &to_send, 0); q_->dispatchEvent(to_send);
} }
BaseType_t emitFromIsr(const T& data) override BaseType_t emitFromIsr(const T& data) override
{ {
parent::value_type to_send = data; parent::value_type to_send = data;
BaseType_t was_awoken = pdFALSE; return q_->dispatchEventFromIsr(to_send);
xQueueSendFromISR(q_->msg_queue_, &to_send, &was_awoken);
return was_awoken;
} }
private: private:
@ -97,13 +90,12 @@ public:
return &signal_; return &signal_;
} }
protected: void dispatchSignal(const char* data) final
virtual void init() = 0; {
dispatchImpl_(data);
}
private: private:
PrimitiveThread thread_;
QueueHandle_t msg_queue_;
struct Visitor_ struct Visitor_
{ {
using parent = Actor<CRTP, Ts...>; using parent = Actor<CRTP, Ts...>;
@ -121,113 +113,15 @@ private:
} }
}; };
void operator()() void dispatchImpl_(const char* data)
{ {
init(); const value_type* signal_data = reinterpret_cast<const value_type*>(data);
std::visit(Visitor_{this}, *signal_data);
value_type data{};
while (true)
{
const BaseType_t got = xQueueReceive(msg_queue_, &data, portMAX_DELAY);
if (got == pdTRUE)
{
std::visit(Visitor_{this}, data);
}
}
}
static void threadHandler_(void* d)
{
auto* dd = static_cast<Actor<CRTP, Ts...>*>(d);
(*dd)();
} }
Signal<value_type> signal_; Signal<value_type> signal_;
}; };
template<typename CRTP>
class Actor<CRTP, void>
{
public:
using value_type = void;
Actor(const std::uint32_t queue_size, const char* name, const osPriority_t priority, const std::uint32_t stack_size)
: thread_([](void* d) {
auto* dd = static_cast<Actor<CRTP, value_type>*>(d);
(*dd)();
},
this, name, priority, stack_size),
msg_queue_(xQueueCreate(queue_size, sizeof(int))), signal_(*this)
{
assert(msg_queue_);
}
virtual ~Actor() {}
Signallable<value_type>* getSignal()
{
return &signal_;
}
protected:
virtual void init() = 0;
private:
PrimitiveThread thread_;
QueueHandle_t msg_queue_;
void operator()()
{
init();
int data;
while (true)
{
const BaseType_t got = xQueueReceive(msg_queue_, &data, portMAX_DELAY);
if (got == pdTRUE)
{
auto* crtp = static_cast<CRTP*>(this);
crtp->onSignal();
}
}
}
friend struct Signal_;
struct Signal_ : Signallable<value_type>
{
using parent = Actor<CRTP, void>;
parent& q;
explicit Signal_(parent& q)
: q(q)
{}
void emit() override
{
const int data = 0;
xQueueSend(q.msg_queue_, &data, 0);
}
BaseType_t emitFromIsr() override
{
const int data = 0;
BaseType_t was_awoken = pdFALSE;
xQueueSendFromISR(q.msg_queue_, &data, &was_awoken);
return was_awoken;
}
};
Signal_ signal_;
};
}// namespace Threads }// namespace Threads

View File

@ -0,0 +1,50 @@
/*
* threads_actor_thread.hpp
*
* Created on: Nov 13, 2021
* Author: erki
*/
#ifndef SKULLC_THREADS_ACTOR_THREAD_HPP_
#define SKULLC_THREADS_ACTOR_THREAD_HPP_
#include <array>
#include <cstdint>
#include <freertos_os2.h>
#include <queue.h>
#include <threads_thread.hpp>
namespace Threads
{
class IActor;
class ActorThread : public Thread<ActorThread>
{
public:
ActorThread(const std::size_t queue_length, const char* name, const osPriority_t priority, const std::uint32_t stack_size);
void operator()();
private:
void init_();
void doMessageLoop_();
friend class IActor;
std::size_t acceptActor_(IActor* actor);
std::size_t queue_length_;
std::size_t queue_item_length_ = 0;
QueueHandle_t queue_ = nullptr;
char* rx_buffer_ = nullptr;
char* tx_buffer_ = nullptr;
std::array<IActor*, 32> actors_;
};
}// namespace Threads
#endif /* SKULLC_THREADS_ACTOR_THREAD_HPP_ */

View File

@ -0,0 +1,68 @@
/*
* threads_iactor.hpp
*
* Created on: Nov 14, 2021
* Author: erki
*/
#ifndef SKULLC_THREADS_IACTOR_HPP_
#define SKULLC_THREADS_IACTOR_HPP_
#include <cstdint>
#include <freertos_os2.h>
namespace Threads
{
class ActorThread;
class IActor
{
public:
IActor(const std::size_t signal_size)
: signal_size_(signal_size)
{}
virtual ~IActor() {}
virtual void init() = 0;
virtual void dispatchSignal(const char* data) = 0;
void moveToThread(ActorThread* thread);
std::size_t actorIndex() const
{
return actor_index_;
}
std::size_t signalSize() const
{
return signal_size_;
}
protected:
template<typename T>
void dispatchEvent(const T& data)
{
dispatchEventImpl_(&data, sizeof(T));
}
template<typename T>
BaseType_t dispatchEventFromIsr(const T& data)
{
return dispatchEventFromIsrImpl_(&data, sizeof(T));
}
private:
ActorThread* parent_thread_ = nullptr;
std::size_t actor_index_ = 0;
std::size_t signal_size_;
void dispatchEventImpl_(const void* data, const std::size_t data_size);
BaseType_t dispatchEventFromIsrImpl_(const void* data, const std::size_t data_size);
};
}// namespace Threads
#endif /* SKULLC_THREADS_IACTOR_HPP_ */

View File

@ -0,0 +1,110 @@
/*
* threads_actor_thread.cpp
*
* Created on: Nov 13, 2021
* Author: erki
*/
#include "threads_actor_thread.hpp"
#include <algorithm>
#include <cassert>
#include <cstring>
#include <threads_iactor.hpp>
namespace
{
struct SignalMessage
{
std::size_t actor_index;
const char* data;
static SignalMessage fromData(char* data)
{
SignalMessage message;
std::memcpy(&message.actor_index, data, sizeof(message.actor_index));
message.data = data + sizeof(message.actor_index);
return message;
}
};
}// namespace
namespace Threads
{
ActorThread::ActorThread(const std::size_t queue_length, const char* name, const osPriority_t priority, const std::uint32_t stack_size)
: Thread<ActorThread>(name, priority, stack_size), queue_length_(queue_length), queue_(nullptr)
{
actors_.fill(nullptr);
}
void ActorThread::operator()()
{
init_();
osThreadYield();
while (true)
{
doMessageLoop_();
osThreadYield();
}
}
void ActorThread::init_()
{
for (IActor* actor : actors_)
{
if (actor != nullptr)
{
queue_item_length_ = std::max(queue_item_length_, actor->signalSize());
actor->init();
}
}
queue_ = xQueueCreate(queue_length_, queue_item_length_ + sizeof(std::size_t));
assert(queue_ != nullptr);
rx_buffer_ = new char[queue_item_length_ + sizeof(std::size_t)];
tx_buffer_ = new char[queue_item_length_ + sizeof(std::size_t)];
}
void ActorThread::doMessageLoop_()
{
const BaseType_t has_data = xQueueReceive(queue_, rx_buffer_, portMAX_DELAY);
if (has_data == pdTRUE)
{
const SignalMessage message = SignalMessage::fromData(rx_buffer_);
assert(message.actor_index <= actors_.size());
IActor* actor = actors_[message.actor_index];
assert(actor != nullptr);
actor->dispatchSignal(message.data);
}
}
std::size_t ActorThread::acceptActor_(IActor* actor)
{
for (auto it = actors_.begin(); it != actors_.end(); ++it)
{
if (*it == nullptr)
{
*it = actor;
return std::distance(actors_.begin(), it);
}
}
assert(false);
return -1;
}
}// namespace Threads

View File

@ -0,0 +1,55 @@
/*
* threads_iactor.cpp
*
* Created on: Nov 14, 2021
* Author: erki
*/
#include "threads_iactor.hpp"
#include <cstring>
#include <queue.h>
#include "threads_actor_thread.hpp"
namespace Threads
{
void IActor::moveToThread(ActorThread* thread)
{
actor_index_ = thread->acceptActor_(this);
parent_thread_ = thread;
}
void IActor::dispatchEventImpl_(const void* data, const std::size_t data_size)
{
taskENTER_CRITICAL();
std::memcpy(parent_thread_->tx_buffer_, &actor_index_, sizeof(std::size_t));
char* data_dest = parent_thread_->tx_buffer_ + sizeof(std::size_t);
std::memcpy(data_dest, data, data_size);
xQueueSend(parent_thread_->queue_, parent_thread_->tx_buffer_, 0);
taskEXIT_CRITICAL();
}
BaseType_t IActor::dispatchEventFromIsrImpl_(const void* data, const std::size_t data_size)
{
const auto isr_data = taskENTER_CRITICAL_FROM_ISR();
std::memcpy(parent_thread_->tx_buffer_, &actor_index_, sizeof(std::size_t));
char* data_dest = parent_thread_->tx_buffer_ + sizeof(std::size_t);
std::memcpy(data_dest, data, data_size);
BaseType_t task_awoken = pdFALSE;
xQueueSendFromISR(parent_thread_->queue_, parent_thread_->tx_buffer_, &task_awoken);
taskEXIT_CRITICAL_FROM_ISR(isr_data);
return task_awoken;
}
}// namespace Threads