Erki 66461af1e4
Some checks failed
CI & Unit Tests / Unit-Tests (push) Failing after 13s
CI & Unit Tests / Docs (push) Successful in 10s
WIP3
2025-02-06 21:27:51 +02:00

234 lines
4.7 KiB
C++

//
// Created by erki on 30/01/25.
//
#include <catch2/catch.hpp>
#include "skullc/coro/scheduler.hpp"
#include "skullc/coro/semaphore.hpp"
#include "skullc/coro/sleep.hpp"
#include "skullc/coro/task.hpp"
#include "skullc/coro/this_coro.hpp"
#include <semaphore>
namespace
{
int test_coro_called = 0;
using namespace std::chrono_literals;
skullc::coro::Task<> test_coro(const int expected)
{
REQUIRE(expected == test_coro_called);
test_coro_called++;
co_return;
}
skullc::coro::Task<> test_sleepy_coro(const int expected, const std::chrono::duration<uint32_t, std::milli>& duration = 10ms)
{
co_await skullc::coro::sleep(0ms, duration);
REQUIRE(expected == test_coro_called);
test_coro_called++;
co_return;
}
template<typename T>
skullc::coro::Task<> test_semaphore_coro(T* semaphore, const int expected)
{
co_await skullc::coro::await_semaphore(*semaphore);
REQUIRE(expected == test_coro_called);
test_coro_called++;
co_return;
}
}// namespace
TEST_CASE("Scheduler runs coroutines.", "[coro]")
{
using namespace skullc::coro;
Scheduler<std::vector> scheduler;
skullc::this_coro::scheduler.register_scheduler(scheduler);
test_coro_called = 0;
SECTION("With a single coroutine.")
{
scheduler.start_tasks(test_coro(0));
scheduler.loop(0);
REQUIRE(test_coro_called == 1);
}
SECTION("With two coroutines.")
{
scheduler.start_tasks(test_coro(0), test_coro(1));
scheduler.loop(0);
REQUIRE(test_coro_called == 2);
}
SECTION("With a single sleepy coroutine.")
{
using namespace std::chrono_literals;
scheduler.start_tasks(test_sleepy_coro(0));
scheduler.loop(0);
REQUIRE(test_coro_called == 0);
const std::chrono::duration<uint32_t, std::milli> half_sleep = 5ms;
scheduler.loop(half_sleep.count());
REQUIRE(test_coro_called == 0);
const std::chrono::duration<uint32_t, std::milli> full_sleep = 11ms;
scheduler.loop(full_sleep.count());
REQUIRE(test_coro_called == 1);
}
SECTION("With a semaphore awaiter.")
{
std::binary_semaphore semaphore{0};
std::counting_semaphore<10> counting_semaphore{0};
scheduler.start_tasks(
test_semaphore_coro(&semaphore, 0),
test_semaphore_coro(&counting_semaphore, 1));
scheduler.loop(0);
REQUIRE(test_coro_called == 0);
semaphore.release(1);
scheduler.loop(1);
scheduler.loop(2);
REQUIRE(test_coro_called == 1);
counting_semaphore.release(1);
scheduler.loop(3);
scheduler.loop(4);
REQUIRE(test_coro_called == 2);
}
SECTION("Coros scheduled for the same millisecond get executed with the same time.")
{
scheduler.start_tasks(
test_sleepy_coro(0, std::chrono::milliseconds{2}),
test_sleepy_coro(1, std::chrono::milliseconds{2}));
scheduler.loop(0);
scheduler.loop(0);
scheduler.loop(0);
REQUIRE(test_coro_called == 0);
scheduler.loop(2);
scheduler.loop(2);
REQUIRE(test_coro_called == 2);
}
}
#include "skullc/coro/peripheral_awaiters.hpp"
namespace
{
struct TestGpio
{
bool is_set = false;
void set(const bool value)
{
is_set = value;
}
void toggle()
{
is_set = !is_set;
}
bool read() const
{
return is_set;
}
};
skullc::coro::Task<> await_pin_edge(TestGpio* pin, const Peripherals::Hal::Edge edge)
{
co_await skullc::coro::PinEdgeAwaiter(pin, edge);
test_coro_called = 1;
co_return;
}
}// namespace
TEST_CASE("Peripheral awaiters work.", "[coro],[awaiters]")
{
using namespace skullc::coro;
Scheduler<std::vector> scheduler;
skullc::this_coro::scheduler.register_scheduler(scheduler);
test_coro_called = 0;
SECTION("Pin edge awaiter works with rising edge.")
{
TestGpio gpio{false};
scheduler.start_tasks(await_pin_edge(&gpio, Peripherals::Hal::Edge::Rising));
scheduler.loop(0);
REQUIRE(test_coro_called == 0);
SECTION("Keeping low generates no edge.")
{
scheduler.loop(1);
scheduler.loop(2);
scheduler.loop(3);
REQUIRE(test_coro_called == 0);
}
SECTION("Raising generated a rising edge.")
{
gpio.is_set = true;
scheduler.loop(1);
scheduler.loop(2);
REQUIRE(test_coro_called == 1);
}
}
SECTION("Pin edge awaiter works with falling edge.")
{
TestGpio gpio{true};
scheduler.start_tasks(await_pin_edge(&gpio, Peripherals::Hal::Edge::Falling));
scheduler.loop(0);
REQUIRE(test_coro_called == 0);
SECTION("Keeping high generates no edge.")
{
scheduler.loop(1);
scheduler.loop(2);
scheduler.loop(3);
REQUIRE(test_coro_called == 0);
}
SECTION("Falling generated a falling edge.")
{
gpio.is_set = false;
scheduler.loop(1);
scheduler.loop(2);
REQUIRE(test_coro_called == 1);
}
}
}