Compare commits

...

1 Commits

Author SHA1 Message Date
Erki
dc04cd8dce WIP commit
Some checks failed
continuous-integration/drone/push Build is failing
2021-10-24 22:48:57 +03:00
3 changed files with 187 additions and 59 deletions

View File

@ -18,12 +18,46 @@
#include "threads_primitivethread.hpp" #include "threads_primitivethread.hpp"
#include "threads_signal.hpp" #include "threads_signal.hpp"
#include "threads_timer.hpp" #include "threads_timer.hpp"
#include "threads_actor_thread.hpp"
namespace Threads namespace Threads
{ {
class IActor
{
public:
IActor() = delete;
IActor(ActorThread* thread)
{
moveToThread(thread);
}
virtual ~IActor()
{ }
virtual void init()
{ }
virtual void doWork() = 0;
virtual bool hasWork() = 0;
void moveToThread(ActorThread* thread)
{
assert(thread);
if (thread_)
thread_->removeActor_(this);
thread->acceptActor_(this);
thread_ = thread;
}
protected:
ActorThread* thread_ = nullptr;
};
template<typename CRTP, typename... Ts> template<typename CRTP, typename... Ts>
class Actor class Actor : public IActor
{ {
public: public:
using value_type = std::variant<Ts...>; using value_type = std::variant<Ts...>;
@ -31,8 +65,8 @@ public:
static_assert(std::is_default_constructible_v<value_type>, "Ts must be default constructible."); static_assert(std::is_default_constructible_v<value_type>, "Ts must be default constructible.");
static_assert(std::is_trivially_copyable_v<value_type>, "Ts must be trivially copyable."); static_assert(std::is_trivially_copyable_v<value_type>, "Ts must be trivially copyable.");
Actor(const std::uint32_t queue_size, const char* name, const osPriority_t priority, const std::uint32_t stack_size) Actor(const std::uint32_t queue_size, ActorThread* thread)
: thread_(&Actor<CRTP, Ts...>::threadHandler_, this, name, priority, stack_size), : IActor(thread),
msg_queue_(xQueueCreate(queue_size, sizeof(value_type))), signal_(this) msg_queue_(xQueueCreate(queue_size, sizeof(value_type))), signal_(this)
{ {
assert(msg_queue_); assert(msg_queue_);
@ -40,6 +74,23 @@ public:
virtual ~Actor() {} virtual ~Actor() {}
void doWork() override
{
value_type data{};
const BaseType_t got = xQueueReceive(msg_queue_, &data, 0);
if (got == pdTRUE)
{
std::visit(Visitor_{this}, data);
}
}
bool hasWork() override
{
return uxQueueMessagesWaiting(msg_queue_) > 0;
}
template<typename T> template<typename T>
struct Signal : Signallable<T> struct Signal : Signallable<T>
{ {
@ -84,11 +135,7 @@ public:
return &signal_; return &signal_;
} }
protected:
virtual void init() = 0;
private: private:
PrimitiveThread thread_;
QueueHandle_t msg_queue_; QueueHandle_t msg_queue_;
struct Visitor_ struct Visitor_
@ -108,44 +155,17 @@ private:
} }
}; };
void operator()()
{
init();
value_type data{};
while (true)
{
const BaseType_t got = xQueueReceive(msg_queue_, &data, portMAX_DELAY);
if (got == pdTRUE)
{
std::visit(Visitor_{this}, data);
}
}
}
static void threadHandler_(void* d)
{
auto* dd = static_cast<Actor<CRTP, Ts...>*>(d);
(*dd)();
}
Signal<value_type> signal_; Signal<value_type> signal_;
}; };
template<typename CRTP> template<typename CRTP>
class Actor<CRTP, void> class Actor<CRTP, void> : public IActor
{ {
public: public:
using value_type = void; using value_type = void;
Actor(const std::uint32_t queue_size, const char* name, const osPriority_t priority, const std::uint32_t stack_size) Actor(const std::uint32_t queue_size, ActorThread* thread)
: thread_([](void* d) { : IActor(thread),
auto* dd = static_cast<Actor<CRTP, value_type>*>(d);
(*dd)();
},
this, name, priority, stack_size),
msg_queue_(xQueueCreate(queue_size, sizeof(int))), signal_(*this) msg_queue_(xQueueCreate(queue_size, sizeof(int))), signal_(*this)
{ {
assert(msg_queue_); assert(msg_queue_);
@ -153,36 +173,32 @@ public:
virtual ~Actor() {} virtual ~Actor() {}
void doWork() override
{
int data{};
const BaseType_t got = xQueueReceive(msg_queue_, &data, 0);
if (got == pdTRUE)
{
auto* crtp = static_cast<CRTP*>(this);
crtp->onSignal();
}
}
bool hasWork() override
{
return uxQueueMessagesWaiting(msg_queue_) > 0;
}
Signallable<value_type>* getSignal() Signallable<value_type>* getSignal()
{ {
return &signal_; return &signal_;
} }
protected:
virtual void init() = 0;
private: private:
PrimitiveThread thread_;
QueueHandle_t msg_queue_; QueueHandle_t msg_queue_;
void operator()()
{
init();
int data;
while (true)
{
const BaseType_t got = xQueueReceive(msg_queue_, &data, portMAX_DELAY);
if (got == pdTRUE)
{
auto* crtp = static_cast<CRTP*>(this);
crtp->onSignal();
}
}
}
friend struct Signal_; friend struct Signal_;
struct Signal_ : Signallable<value_type> struct Signal_ : Signallable<value_type>

View File

@ -0,0 +1,43 @@
/*
* threads_actor_thread.hpp
*
* Created on: Sep 19, 2021
* Author: erki
*/
#ifndef THREADS_ACTOR_THREAD_HPP_
#define THREADS_ACTOR_THREAD_HPP_
#include <threads_primitivethread.hpp>
namespace Threads
{
class IActor;
class ActorThread : PrimitiveThread
{
public:
ActorThread() = delete;
ActorThread(const ActorThread&) = delete;
ActorThread(ActorThread&&) = delete;
ActorThread(const char* name, const osPriority_t priority, const std::uint32_t stack_size)
: PrimitiveThread(&ActorThread::run_, this, name, priority, stack_size)
{ }
private:
friend class IActor;
static void run_(void* data);
void acceptActor_(IActor* actor);
void removeActor_(IActor* actor);
std::array<IActor*, 32> actors_;
};
}
#endif /* THREADS_ACTOR_THREAD_HPP_ */

View File

@ -0,0 +1,69 @@
/*
* threads_actor_thread.cpp
*
* Created on: Sep 25, 2021
* Author: erki
*/
#include "threads_actor_thread.hpp"
#include <algorithm>
#include "threads_actor.hpp"
namespace Threads
{
void ActorThread::run_(void* data)
{
auto* t = reinterpret_cast<ActorThread*>(data);
for (IActor* p : t->actors_)
{
if (p)
p->init();
}
while (true)
{
bool remainingWork = false;
for (IActor* p : t->actors_)
{
if (p && p->hasWork())
{
p->doWork();
remainingWork = remainingWork || p->hasWork();
t->yield();
}
}
if (!remainingWork)
{
const bool unused = t->notifyWait(portMAX_DELAY);
(void)unused;
}
}
}
void ActorThread::acceptActor_(IActor* actor)
{
auto it = std::find(actors_.begin(), actors_.end(), nullptr);
if (it == actors_.end())
return;
*it = actor;
}
void ActorThread::removeActor_(IActor* actor)
{
auto it = std::find(actors_.begin(), actors_.end(), actor);
if (it == actors_.end())
return;
*it = nullptr;
}
}