Threads: Actors are now composable over threads
This commit is contained in:
parent
fb319fd21f
commit
43bc8d8265
@ -15,15 +15,15 @@
|
||||
#include <type_traits>
|
||||
#include <variant>
|
||||
|
||||
#include "threads_primitivethread.hpp"
|
||||
#include "threads_signal.hpp"
|
||||
#include "threads_timer.hpp"
|
||||
#include <threads_actor_thread.hpp>
|
||||
#include <threads_iactor.hpp>
|
||||
#include <threads_signal.hpp>
|
||||
|
||||
namespace Threads
|
||||
{
|
||||
|
||||
template<typename CRTP, typename... Ts>
|
||||
class Actor
|
||||
class Actor : public IActor
|
||||
{
|
||||
public:
|
||||
using value_type = std::variant<Ts...>;
|
||||
@ -31,14 +31,11 @@ public:
|
||||
static_assert(std::is_default_constructible_v<value_type>, "Ts must be default constructible.");
|
||||
static_assert(std::is_trivially_copyable_v<value_type>, "Ts must be trivially copyable.");
|
||||
|
||||
Actor(const std::uint32_t queue_size, const char* name, const osPriority_t priority, const std::uint32_t stack_size)
|
||||
: thread_(&Actor<CRTP, Ts...>::threadHandler_, this, name, priority, stack_size),
|
||||
msg_queue_(xQueueCreate(queue_size, sizeof(value_type))), signal_(this)
|
||||
{
|
||||
assert(msg_queue_);
|
||||
}
|
||||
Actor() : IActor(sizeof(value_type)),
|
||||
signal_(this)
|
||||
{}
|
||||
|
||||
virtual ~Actor() {}
|
||||
~Actor() override {}
|
||||
|
||||
template<typename T>
|
||||
struct Signal : Signallable<T>
|
||||
@ -59,17 +56,13 @@ public:
|
||||
void emit(const T& data) override
|
||||
{
|
||||
parent::value_type to_send = data;
|
||||
xQueueSend(q_->msg_queue_, &to_send, 0);
|
||||
q_->dispatchEvent(to_send);
|
||||
}
|
||||
|
||||
BaseType_t emitFromIsr(const T& data) override
|
||||
{
|
||||
parent::value_type to_send = data;
|
||||
BaseType_t was_awoken = pdFALSE;
|
||||
|
||||
xQueueSendFromISR(q_->msg_queue_, &to_send, &was_awoken);
|
||||
|
||||
return was_awoken;
|
||||
return q_->dispatchEventFromIsr(to_send);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -97,13 +90,12 @@ public:
|
||||
return &signal_;
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void init() = 0;
|
||||
void dispatchSignal(const char* data) final
|
||||
{
|
||||
dispatchImpl_(data);
|
||||
}
|
||||
|
||||
private:
|
||||
PrimitiveThread thread_;
|
||||
QueueHandle_t msg_queue_;
|
||||
|
||||
struct Visitor_
|
||||
{
|
||||
using parent = Actor<CRTP, Ts...>;
|
||||
@ -121,113 +113,15 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
void operator()()
|
||||
void dispatchImpl_(const char* data)
|
||||
{
|
||||
init();
|
||||
|
||||
value_type data{};
|
||||
|
||||
while (true)
|
||||
{
|
||||
const BaseType_t got = xQueueReceive(msg_queue_, &data, portMAX_DELAY);
|
||||
|
||||
if (got == pdTRUE)
|
||||
{
|
||||
std::visit(Visitor_{this}, data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void threadHandler_(void* d)
|
||||
{
|
||||
auto* dd = static_cast<Actor<CRTP, Ts...>*>(d);
|
||||
(*dd)();
|
||||
const value_type* signal_data = reinterpret_cast<const value_type*>(data);
|
||||
std::visit(Visitor_{this}, *signal_data);
|
||||
}
|
||||
|
||||
Signal<value_type> signal_;
|
||||
};
|
||||
|
||||
template<typename CRTP>
|
||||
class Actor<CRTP, void>
|
||||
{
|
||||
public:
|
||||
using value_type = void;
|
||||
|
||||
Actor(const std::uint32_t queue_size, const char* name, const osPriority_t priority, const std::uint32_t stack_size)
|
||||
: thread_([](void* d) {
|
||||
auto* dd = static_cast<Actor<CRTP, value_type>*>(d);
|
||||
(*dd)();
|
||||
},
|
||||
this, name, priority, stack_size),
|
||||
msg_queue_(xQueueCreate(queue_size, sizeof(int))), signal_(*this)
|
||||
{
|
||||
assert(msg_queue_);
|
||||
}
|
||||
|
||||
virtual ~Actor() {}
|
||||
|
||||
Signallable<value_type>* getSignal()
|
||||
{
|
||||
return &signal_;
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void init() = 0;
|
||||
|
||||
private:
|
||||
PrimitiveThread thread_;
|
||||
QueueHandle_t msg_queue_;
|
||||
|
||||
void operator()()
|
||||
{
|
||||
init();
|
||||
|
||||
int data;
|
||||
|
||||
while (true)
|
||||
{
|
||||
const BaseType_t got = xQueueReceive(msg_queue_, &data, portMAX_DELAY);
|
||||
|
||||
if (got == pdTRUE)
|
||||
{
|
||||
auto* crtp = static_cast<CRTP*>(this);
|
||||
crtp->onSignal();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
friend struct Signal_;
|
||||
|
||||
struct Signal_ : Signallable<value_type>
|
||||
{
|
||||
using parent = Actor<CRTP, void>;
|
||||
parent& q;
|
||||
|
||||
explicit Signal_(parent& q)
|
||||
: q(q)
|
||||
{}
|
||||
|
||||
void emit() override
|
||||
{
|
||||
const int data = 0;
|
||||
|
||||
xQueueSend(q.msg_queue_, &data, 0);
|
||||
}
|
||||
|
||||
BaseType_t emitFromIsr() override
|
||||
{
|
||||
const int data = 0;
|
||||
|
||||
BaseType_t was_awoken = pdFALSE;
|
||||
xQueueSendFromISR(q.msg_queue_, &data, &was_awoken);
|
||||
|
||||
return was_awoken;
|
||||
}
|
||||
};
|
||||
|
||||
Signal_ signal_;
|
||||
};
|
||||
|
||||
}// namespace Threads
|
||||
|
||||
|
||||
|
||||
50
Threads/Inc/threads_actor_thread.hpp
Normal file
50
Threads/Inc/threads_actor_thread.hpp
Normal file
@ -0,0 +1,50 @@
|
||||
/*
|
||||
* threads_actor_thread.hpp
|
||||
*
|
||||
* Created on: Nov 13, 2021
|
||||
* Author: erki
|
||||
*/
|
||||
|
||||
#ifndef SKULLC_THREADS_ACTOR_THREAD_HPP_
|
||||
#define SKULLC_THREADS_ACTOR_THREAD_HPP_
|
||||
|
||||
#include <array>
|
||||
#include <cstdint>
|
||||
|
||||
#include <freertos_os2.h>
|
||||
#include <queue.h>
|
||||
|
||||
#include <threads_thread.hpp>
|
||||
|
||||
namespace Threads
|
||||
{
|
||||
|
||||
class IActor;
|
||||
|
||||
class ActorThread : public Thread<ActorThread>
|
||||
{
|
||||
public:
|
||||
ActorThread(const std::size_t queue_length, const char* name, const osPriority_t priority, const std::uint32_t stack_size);
|
||||
|
||||
void operator()();
|
||||
|
||||
private:
|
||||
void init_();
|
||||
void doMessageLoop_();
|
||||
|
||||
friend class IActor;
|
||||
std::size_t acceptActor_(IActor* actor);
|
||||
|
||||
std::size_t queue_length_;
|
||||
std::size_t queue_item_length_ = 0;
|
||||
|
||||
QueueHandle_t queue_ = nullptr;
|
||||
char* rx_buffer_ = nullptr;
|
||||
char* tx_buffer_ = nullptr;
|
||||
std::array<IActor*, 32> actors_;
|
||||
};
|
||||
|
||||
}// namespace Threads
|
||||
|
||||
|
||||
#endif /* SKULLC_THREADS_ACTOR_THREAD_HPP_ */
|
||||
68
Threads/Inc/threads_iactor.hpp
Normal file
68
Threads/Inc/threads_iactor.hpp
Normal file
@ -0,0 +1,68 @@
|
||||
/*
|
||||
* threads_iactor.hpp
|
||||
*
|
||||
* Created on: Nov 14, 2021
|
||||
* Author: erki
|
||||
*/
|
||||
|
||||
#ifndef SKULLC_THREADS_IACTOR_HPP_
|
||||
#define SKULLC_THREADS_IACTOR_HPP_
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
#include <freertos_os2.h>
|
||||
|
||||
namespace Threads
|
||||
{
|
||||
|
||||
class ActorThread;
|
||||
|
||||
class IActor
|
||||
{
|
||||
public:
|
||||
IActor(const std::size_t signal_size)
|
||||
: signal_size_(signal_size)
|
||||
{}
|
||||
|
||||
virtual ~IActor() {}
|
||||
|
||||
virtual void init() = 0;
|
||||
virtual void dispatchSignal(const char* data) = 0;
|
||||
|
||||
void moveToThread(ActorThread* thread);
|
||||
|
||||
std::size_t actorIndex() const
|
||||
{
|
||||
return actor_index_;
|
||||
}
|
||||
|
||||
std::size_t signalSize() const
|
||||
{
|
||||
return signal_size_;
|
||||
}
|
||||
|
||||
protected:
|
||||
template<typename T>
|
||||
void dispatchEvent(const T& data)
|
||||
{
|
||||
dispatchEventImpl_(&data, sizeof(T));
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
BaseType_t dispatchEventFromIsr(const T& data)
|
||||
{
|
||||
return dispatchEventFromIsrImpl_(&data, sizeof(T));
|
||||
}
|
||||
|
||||
private:
|
||||
ActorThread* parent_thread_ = nullptr;
|
||||
std::size_t actor_index_ = 0;
|
||||
std::size_t signal_size_;
|
||||
|
||||
void dispatchEventImpl_(const void* data, const std::size_t data_size);
|
||||
BaseType_t dispatchEventFromIsrImpl_(const void* data, const std::size_t data_size);
|
||||
};
|
||||
|
||||
}// namespace Threads
|
||||
|
||||
#endif /* SKULLC_THREADS_IACTOR_HPP_ */
|
||||
110
Threads/Src/threads_actor_thread.cpp
Normal file
110
Threads/Src/threads_actor_thread.cpp
Normal file
@ -0,0 +1,110 @@
|
||||
/*
|
||||
* threads_actor_thread.cpp
|
||||
*
|
||||
* Created on: Nov 13, 2021
|
||||
* Author: erki
|
||||
*/
|
||||
|
||||
|
||||
#include "threads_actor_thread.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <cassert>
|
||||
#include <cstring>
|
||||
|
||||
#include <threads_iactor.hpp>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
struct SignalMessage
|
||||
{
|
||||
std::size_t actor_index;
|
||||
const char* data;
|
||||
|
||||
static SignalMessage fromData(char* data)
|
||||
{
|
||||
SignalMessage message;
|
||||
std::memcpy(&message.actor_index, data, sizeof(message.actor_index));
|
||||
message.data = data + sizeof(message.actor_index);
|
||||
|
||||
return message;
|
||||
}
|
||||
};
|
||||
|
||||
}// namespace
|
||||
|
||||
namespace Threads
|
||||
{
|
||||
|
||||
ActorThread::ActorThread(const std::size_t queue_length, const char* name, const osPriority_t priority, const std::uint32_t stack_size)
|
||||
: Thread<ActorThread>(name, priority, stack_size), queue_length_(queue_length), queue_(nullptr)
|
||||
{
|
||||
actors_.fill(nullptr);
|
||||
}
|
||||
|
||||
void ActorThread::operator()()
|
||||
{
|
||||
init_();
|
||||
|
||||
osThreadYield();
|
||||
|
||||
while (true)
|
||||
{
|
||||
doMessageLoop_();
|
||||
|
||||
osThreadYield();
|
||||
}
|
||||
}
|
||||
|
||||
void ActorThread::init_()
|
||||
{
|
||||
for (IActor* actor : actors_)
|
||||
{
|
||||
if (actor != nullptr)
|
||||
{
|
||||
queue_item_length_ = std::max(queue_item_length_, actor->signalSize());
|
||||
|
||||
actor->init();
|
||||
}
|
||||
}
|
||||
|
||||
queue_ = xQueueCreate(queue_length_, queue_item_length_ + sizeof(std::size_t));
|
||||
assert(queue_ != nullptr);
|
||||
|
||||
rx_buffer_ = new char[queue_item_length_ + sizeof(std::size_t)];
|
||||
tx_buffer_ = new char[queue_item_length_ + sizeof(std::size_t)];
|
||||
}
|
||||
|
||||
void ActorThread::doMessageLoop_()
|
||||
{
|
||||
const BaseType_t has_data = xQueueReceive(queue_, rx_buffer_, portMAX_DELAY);
|
||||
|
||||
if (has_data == pdTRUE)
|
||||
{
|
||||
const SignalMessage message = SignalMessage::fromData(rx_buffer_);
|
||||
assert(message.actor_index <= actors_.size());
|
||||
|
||||
IActor* actor = actors_[message.actor_index];
|
||||
assert(actor != nullptr);
|
||||
|
||||
actor->dispatchSignal(message.data);
|
||||
}
|
||||
}
|
||||
|
||||
std::size_t ActorThread::acceptActor_(IActor* actor)
|
||||
{
|
||||
for (auto it = actors_.begin(); it != actors_.end(); ++it)
|
||||
{
|
||||
if (*it == nullptr)
|
||||
{
|
||||
*it = actor;
|
||||
return std::distance(actors_.begin(), it);
|
||||
}
|
||||
}
|
||||
|
||||
assert(false);
|
||||
return -1;
|
||||
}
|
||||
|
||||
}// namespace Threads
|
||||
55
Threads/Src/threads_iactor.cpp
Normal file
55
Threads/Src/threads_iactor.cpp
Normal file
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* threads_iactor.cpp
|
||||
*
|
||||
* Created on: Nov 14, 2021
|
||||
* Author: erki
|
||||
*/
|
||||
|
||||
|
||||
#include "threads_iactor.hpp"
|
||||
|
||||
#include <cstring>
|
||||
|
||||
#include <queue.h>
|
||||
|
||||
#include "threads_actor_thread.hpp"
|
||||
|
||||
namespace Threads
|
||||
{
|
||||
|
||||
void IActor::moveToThread(ActorThread* thread)
|
||||
{
|
||||
actor_index_ = thread->acceptActor_(this);
|
||||
parent_thread_ = thread;
|
||||
}
|
||||
|
||||
void IActor::dispatchEventImpl_(const void* data, const std::size_t data_size)
|
||||
{
|
||||
taskENTER_CRITICAL();
|
||||
|
||||
std::memcpy(parent_thread_->tx_buffer_, &actor_index_, sizeof(std::size_t));
|
||||
char* data_dest = parent_thread_->tx_buffer_ + sizeof(std::size_t);
|
||||
std::memcpy(data_dest, data, data_size);
|
||||
|
||||
xQueueSend(parent_thread_->queue_, parent_thread_->tx_buffer_, 0);
|
||||
|
||||
taskEXIT_CRITICAL();
|
||||
}
|
||||
|
||||
BaseType_t IActor::dispatchEventFromIsrImpl_(const void* data, const std::size_t data_size)
|
||||
{
|
||||
const auto isr_data = taskENTER_CRITICAL_FROM_ISR();
|
||||
|
||||
std::memcpy(parent_thread_->tx_buffer_, &actor_index_, sizeof(std::size_t));
|
||||
char* data_dest = parent_thread_->tx_buffer_ + sizeof(std::size_t);
|
||||
std::memcpy(data_dest, data, data_size);
|
||||
|
||||
BaseType_t task_awoken = pdFALSE;
|
||||
xQueueSendFromISR(parent_thread_->queue_, parent_thread_->tx_buffer_, &task_awoken);
|
||||
|
||||
taskEXIT_CRITICAL_FROM_ISR(isr_data);
|
||||
|
||||
return task_awoken;
|
||||
}
|
||||
|
||||
}// namespace Threads
|
||||
Loading…
x
Reference in New Issue
Block a user