WIP8: compositional awaiters are done
we are. Almost done
This commit is contained in:
parent
0107d3e6e7
commit
5e5aee38dc
@ -10,6 +10,7 @@
|
||||
#include "skullc/coro/sleep.hpp"
|
||||
#include "skullc/coro/task.hpp"
|
||||
#include "skullc/coro/this_coro.hpp"
|
||||
#include "skullc/coro/composition.hpp"
|
||||
|
||||
#include <semaphore>
|
||||
|
||||
@ -317,3 +318,89 @@ TEST_CASE("Signal awaiters work.", "[coro],[signal]")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
skullc::coro::Task<> test_wait_all()
|
||||
{
|
||||
co_await skullc::coro::wait_all(
|
||||
test_sleepy_coro(0, 10ms),
|
||||
test_sleepy_coro(1, 20ms)
|
||||
);
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
skullc::coro::Task<int> test_sleepy_coro_return(const int expected, const std::chrono::duration<uint32_t, std::milli>& duration = 10ms)
|
||||
{
|
||||
co_await skullc::coro::sleep(0ms, duration);
|
||||
REQUIRE(expected == test_coro_called);
|
||||
test_coro_called++;
|
||||
co_return expected;
|
||||
}
|
||||
|
||||
skullc::coro::Task<> testwait_first()
|
||||
{
|
||||
auto val = co_await skullc::coro::wait_first(
|
||||
test_sleepy_coro_return(0, 10ms),
|
||||
test_sleepy_coro_return(1, 20ms)
|
||||
);
|
||||
|
||||
REQUIRE(std::get<0>(val) == 0);
|
||||
REQUIRE(std::get<1>(val) == std::nullopt);
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST_CASE("Wait all awaiter works.", "[coro],[wait_all]")
|
||||
{
|
||||
using namespace skullc::coro;
|
||||
Scheduler<std::vector> scheduler;
|
||||
skullc::this_coro::scheduler.register_scheduler(scheduler);
|
||||
|
||||
test_coro_called = 0;
|
||||
|
||||
scheduler.start_tasks(test_wait_all());
|
||||
scheduler.loop(0);
|
||||
scheduler.loop(0);
|
||||
scheduler.loop(0);
|
||||
|
||||
REQUIRE(test_coro_called == 0);
|
||||
scheduler.loop(10);
|
||||
scheduler.loop(10);
|
||||
|
||||
REQUIRE(test_coro_called == 1);
|
||||
scheduler.loop(20);
|
||||
scheduler.loop(20);
|
||||
scheduler.loop(20);
|
||||
|
||||
REQUIRE(test_coro_called == 2);
|
||||
}
|
||||
|
||||
TEST_CASE("Wait one awaiter works.", "[coro],[wait_first]")
|
||||
{
|
||||
using namespace skullc::coro;
|
||||
Scheduler<std::vector> scheduler;
|
||||
skullc::this_coro::scheduler.register_scheduler(scheduler);
|
||||
|
||||
test_coro_called = 0;
|
||||
|
||||
scheduler.start_tasks(testwait_first());
|
||||
scheduler.loop(0);
|
||||
scheduler.loop(0);
|
||||
scheduler.loop(0);
|
||||
|
||||
REQUIRE(test_coro_called == 0);
|
||||
scheduler.loop(10);
|
||||
scheduler.loop(10);
|
||||
|
||||
REQUIRE(test_coro_called == 1);
|
||||
scheduler.loop(20);
|
||||
scheduler.loop(20);
|
||||
scheduler.loop(20);
|
||||
|
||||
REQUIRE(test_coro_called == 1);
|
||||
}
|
||||
|
||||
197
coro/inc/skullc/coro/composition.hpp
Normal file
197
coro/inc/skullc/coro/composition.hpp
Normal file
@ -0,0 +1,197 @@
|
||||
//
|
||||
// Created by erki on 25/02/25.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "skullc/coro/this_coro.hpp"
|
||||
#include "skullc/coro/task.hpp"
|
||||
|
||||
#include <coroutine>
|
||||
#include <variant>
|
||||
|
||||
namespace skullc::coro
|
||||
{
|
||||
|
||||
namespace detail
|
||||
{
|
||||
|
||||
struct WaitAllAwaitable
|
||||
{
|
||||
WaitAllAwaitable() = delete;
|
||||
|
||||
template<typename... Awaitables>
|
||||
WaitAllAwaitable(Awaitables&&... args)
|
||||
{
|
||||
(register_awaitable(std::forward<Awaitables>(args)), ...);
|
||||
}
|
||||
|
||||
~WaitAllAwaitable()
|
||||
{
|
||||
if (continuation)
|
||||
this_coro::scheduler().remove(continuation);
|
||||
}
|
||||
|
||||
template<typename Awaitable>
|
||||
void register_awaitable(Awaitable&& task)
|
||||
{
|
||||
auto t = run_awaitable(task);
|
||||
this_coro::scheduler().schedule(t.get_handle(), 0);
|
||||
t.detach();
|
||||
}
|
||||
|
||||
template<typename Awaitable>
|
||||
Task<> run_awaitable(Awaitable&& awaitable)
|
||||
{
|
||||
start_awaitable();
|
||||
co_await awaitable;
|
||||
awaitable_completed();
|
||||
co_return;
|
||||
}
|
||||
|
||||
void start_awaitable()
|
||||
{
|
||||
pending++;
|
||||
}
|
||||
|
||||
void awaitable_completed()
|
||||
{
|
||||
pending--;
|
||||
if (pending == 0 && continuation)
|
||||
this_coro::scheduler().schedule(continuation, 0);
|
||||
}
|
||||
|
||||
auto operator co_await() noexcept
|
||||
{
|
||||
struct Awaitable
|
||||
{
|
||||
WaitAllAwaitable* wait_all;
|
||||
|
||||
bool await_ready() { return false; }
|
||||
|
||||
void await_suspend(std::coroutine_handle<> h)
|
||||
{
|
||||
wait_all->continuation = h;
|
||||
}
|
||||
|
||||
void await_resume()
|
||||
{ }
|
||||
};
|
||||
|
||||
return Awaitable{this};
|
||||
}
|
||||
|
||||
std::coroutine_handle<> continuation{};
|
||||
int pending = 0;
|
||||
};
|
||||
|
||||
template<typename Awaitable>
|
||||
struct AwaitableResumeType
|
||||
{
|
||||
using value_type = std::optional<typename Awaitable::value_type>;
|
||||
};
|
||||
|
||||
template<typename... Awaitables>
|
||||
struct WaitFirstAwaitable
|
||||
{
|
||||
WaitFirstAwaitable() = delete;
|
||||
|
||||
WaitFirstAwaitable(Awaitables&&... args)
|
||||
: WaitFirstAwaitable(std::index_sequence_for<Awaitables...>(), std::forward<Awaitables>(args)...)
|
||||
{}
|
||||
|
||||
template<std::size_t... Is>
|
||||
WaitFirstAwaitable(std::index_sequence<Is...>, Awaitables&&... args)
|
||||
{
|
||||
((std::get<Is>(result) = std::nullopt), ...);
|
||||
pending = true;
|
||||
(register_awaitable<Is>(std::forward<Awaitables>(args)), ...);
|
||||
}
|
||||
|
||||
~WaitFirstAwaitable()
|
||||
{
|
||||
if (continuation)
|
||||
this_coro::scheduler().remove(continuation);
|
||||
}
|
||||
|
||||
template<std::size_t I, typename Awaitable>
|
||||
void register_awaitable(Awaitable&& task)
|
||||
{
|
||||
auto t = run_awaitable<I>(task);
|
||||
this_coro::scheduler().schedule(t.get_handle(), 0);
|
||||
coroutines[I] = t.get_handle();
|
||||
t.detach();
|
||||
}
|
||||
|
||||
template<std::size_t I, typename Awaitable>
|
||||
Task<> run_awaitable(Awaitable&& awaitable)
|
||||
{
|
||||
auto val = co_await awaitable;
|
||||
awaitable_completed<I>(std::move(val));
|
||||
co_return;
|
||||
}
|
||||
|
||||
template<std::size_t I>
|
||||
void awaitable_completed(auto&& val)
|
||||
{
|
||||
if (pending)
|
||||
{
|
||||
pending = false;
|
||||
for (auto j = 0; j < coroutines.size(); j++)
|
||||
{
|
||||
if (j != I)
|
||||
{
|
||||
this_coro::scheduler().remove(coroutines[j]);
|
||||
// @todo: also clean up a related poller, if necessary.
|
||||
coroutines[j].destroy();
|
||||
}
|
||||
}
|
||||
|
||||
std::get<I>(result) = val;
|
||||
this_coro::scheduler().schedule(continuation, 0);
|
||||
}
|
||||
}
|
||||
|
||||
auto operator co_await() noexcept
|
||||
{
|
||||
struct Awaitable
|
||||
{
|
||||
WaitFirstAwaitable* wait_first;
|
||||
|
||||
bool await_ready() { return false; }
|
||||
|
||||
void await_suspend(std::coroutine_handle<> h)
|
||||
{
|
||||
wait_first->continuation = h;
|
||||
}
|
||||
|
||||
auto await_resume()
|
||||
{
|
||||
return std::move(wait_first->result);
|
||||
}
|
||||
};
|
||||
|
||||
return Awaitable{this};
|
||||
}
|
||||
|
||||
std::array<std::coroutine_handle<>, sizeof...(Awaitables)> coroutines;
|
||||
std::tuple<typename AwaitableResumeType<Awaitables>::value_type...> result;
|
||||
std::coroutine_handle<> continuation{};
|
||||
int pending = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
template<typename... Awaitables>
|
||||
auto wait_all(Awaitables&&... args)
|
||||
{
|
||||
return detail::WaitAllAwaitable(std::forward<Awaitables>(args)...);
|
||||
}
|
||||
|
||||
template<typename... Awaitables>
|
||||
auto wait_first(Awaitables&&... args)
|
||||
{
|
||||
return detail::WaitFirstAwaitable<Awaitables...>(std::forward<Awaitables>(args)...);
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user