diff --git a/include/condy/channel.hpp b/include/condy/channel.hpp index 9d441f7..79af5f9 100644 --- a/include/condy/channel.hpp +++ b/include/condy/channel.hpp @@ -363,7 +363,8 @@ class Channel::PushFinishHandle PushFinishHandle(T item) : item_(std::move(item)) {} - void cancel() noexcept { + void cancel([[maybe_unused]] Runtime *runtime) noexcept { + assert(runtime == runtime_); if (channel_->cancel_push_(this)) { // Successfully canceled assert(result_ == -ENOTRECOVERABLE); @@ -422,7 +423,8 @@ class Channel::PopFinishHandle public: using ReturnType = std::pair; - void cancel() noexcept { + void cancel([[maybe_unused]] Runtime *runtime) noexcept { + assert(runtime == runtime_); if (channel_->cancel_pop_(this)) { // Successfully canceled assert(result_.first == -ENOTRECOVERABLE); diff --git a/include/condy/channel_legacy.hpp b/include/condy/channel_legacy.hpp index 9edf713..a573bcc 100644 --- a/include/condy/channel_legacy.hpp +++ b/include/condy/channel_legacy.hpp @@ -353,7 +353,8 @@ class Channel::PushFinishHandle PushFinishHandle(T item) : item_(std::move(item)) {} - void cancel() noexcept { + void cancel([[maybe_unused]] Runtime *runtime) noexcept { + assert(runtime == runtime_); if (channel_->cancel_push_(this)) { // Successfully canceled canceled_ = true; @@ -418,7 +419,8 @@ class Channel::PopFinishHandle public: using ReturnType = T; - void cancel() noexcept { + void cancel([[maybe_unused]] Runtime *runtime) noexcept { + assert(runtime == runtime_); if (channel_->cancel_pop_(this)) { // Successfully canceled runtime_->resume_work(); diff --git a/include/condy/concepts.hpp b/include/condy/concepts.hpp index f56b1a5..d1eff2d 100644 --- a/include/condy/concepts.hpp +++ b/include/condy/concepts.hpp @@ -19,6 +19,7 @@ namespace condy { class Ring; class Invoker; class BufferBase; +class Runtime; namespace detail { @@ -27,13 +28,13 @@ struct FixedFd; } // namespace detail template -concept HandleLike = requires(T handle, Invoker *invoker) { +concept HandleLike = requires(T handle, Invoker *invoker, Runtime *runtime) { typename std::decay_t::ReturnType; { handle.set_invoker(invoker) } -> std::same_as; { handle.extract_result() } -> std::same_as::ReturnType>; - { handle.cancel() } -> std::same_as; + { handle.cancel(runtime) } -> std::same_as; }; template diff --git a/include/condy/finish_handles.hpp b/include/condy/finish_handles.hpp index 832bd1c..21461c7 100644 --- a/include/condy/finish_handles.hpp +++ b/include/condy/finish_handles.hpp @@ -9,12 +9,10 @@ #pragma once #include "condy/concepts.hpp" -#include "condy/condy_uring.hpp" -#include "condy/context.hpp" #include "condy/invoker.hpp" -#include "condy/ring.hpp" -#include "condy/work_type.hpp" +#include "condy/runtime.hpp" #include +#include #include #include #include @@ -24,35 +22,6 @@ namespace condy { -class Ring; - -class OpFinishHandleBase { -public: - using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept; - - void cancel() noexcept { - auto *ring = detail::Context::current().ring(); - io_uring_sqe *sqe = ring->get_sqe(); - io_uring_prep_cancel(sqe, this, 0); - io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore)); - io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); - } - - bool handle(io_uring_cqe *cqe) noexcept { - assert(handle_func_ != nullptr); - return handle_func_(this, cqe); - } - - void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; } - -protected: - OpFinishHandleBase() = default; - -protected: - HandleFunc handle_func_ = nullptr; - Invoker *invoker_ = nullptr; -}; - template class OpFinishHandle : public OpFinishHandleBase { public: @@ -67,6 +36,11 @@ class OpFinishHandle : public OpFinishHandleBase { return cqe_handler_.extract_result(); } + void cancel(Runtime *runtime) noexcept { + assert(runtime != nullptr); + runtime->cancel(this); + } + private: static bool handle_static_(void *data, io_uring_cqe *cqe) noexcept { auto *self = static_cast(data); @@ -190,11 +164,11 @@ template class RangedParallelFinishHandle { order_.resize(handles_.size()); } - void cancel() noexcept { + void cancel(Runtime *runtime) noexcept { if (!canceled_) { canceled_ = true; for (auto &handle : handles_) { - handle->cancel(); + handle->cancel(runtime); } } } @@ -218,9 +192,10 @@ template class RangedParallelFinishHandle { if constexpr (Cancel) { if (!canceled_) { canceled_ = true; + auto *runtime = detail::Context::current().runtime(); for (size_t i = 0; i < handles_.size(); i++) { if (i != idx) { - handles_[i]->cancel(); + handles_[i]->cancel(runtime); } } } @@ -294,11 +269,14 @@ template class ParallelFinishHandle { foreach_set_invoker_(); } - void cancel() noexcept { + void cancel(Runtime *runtime) noexcept { if (!canceled_) { canceled_ = true; - std::apply([](auto *...handles) { (handles->cancel(), ...); }, - handles_); + std::apply( + [runtime](auto *...handles) { + (handles->cancel(runtime), ...); + }, + handles_); } } @@ -326,13 +304,13 @@ template class ParallelFinishHandle { } template - void foreach_call_cancel_() noexcept { + void foreach_call_cancel_(Runtime *runtime) noexcept { if constexpr (I < sizeof...(Handles)) { auto handle = std::get(handles_); if constexpr (I != SkipIdx) { - handle->cancel(); + handle->cancel(runtime); } - foreach_call_cancel_(); + foreach_call_cancel_(runtime); } } @@ -343,7 +321,8 @@ template class ParallelFinishHandle { if constexpr (Cancel) { if (!canceled_) { canceled_ = true; - foreach_call_cancel_(); + auto *runtime = detail::Context::current().runtime(); + foreach_call_cancel_(runtime); } } diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index 940b6cb..cee9652 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -7,7 +7,6 @@ #include "condy/condy_uring.hpp" #include "condy/context.hpp" -#include "condy/finish_handles.hpp" #include "condy/intrusive.hpp" #include "condy/invoker.hpp" #include "condy/ring.hpp" @@ -64,6 +63,25 @@ inline int sync_msg_ring(io_uring_sqe *sqe_data) noexcept { } // namespace detail +class OpFinishHandleBase { +public: + using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept; + + bool handle(io_uring_cqe *cqe) noexcept { + assert(handle_func_ != nullptr); + return handle_func_(this, cqe); + } + + void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; } + +protected: + OpFinishHandleBase() = default; + +protected: + HandleFunc handle_func_ = nullptr; + Invoker *invoker_ = nullptr; +}; + /** * @brief The event loop runtime for executing asynchronous * @details This class provides a single-threaded runtime for executing diff --git a/tests/test_awaiter_operations.cpp b/tests/test_awaiter_operations.cpp index 4ee7348..c57df09 100644 --- a/tests/test_awaiter_operations.cpp +++ b/tests/test_awaiter_operations.cpp @@ -1,5 +1,4 @@ -#include "condy/finish_handles.hpp" -#include "condy/work_type.hpp" +#include "condy/sync_wait.hpp" #include #include #include @@ -11,36 +10,7 @@ using namespace condy::operators; -namespace { - -void event_loop(size_t &unfinished) { - auto *ring = condy::detail::Context::current().ring(); - while (unfinished > 0) { - ring->submit(); - ring->reap_completions([&](io_uring_cqe *cqe) { - auto [data, type] = condy::decode_work(io_uring_cqe_get_data(cqe)); - if (type == condy::WorkType::Ignore) { - return; - } - auto handle_ptr = static_cast(data); - handle_ptr->handle(cqe); - }); - } -} - -// Just placeholder -condy::Runtime runtime; - -} // namespace - TEST_CASE("test awaiter_operations - test make_op_awaiter") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { co_await condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -50,23 +20,11 @@ TEST_CASE("test awaiter_operations - test make_op_awaiter") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test when_all") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -82,23 +40,11 @@ TEST_CASE("test awaiter_operations - test when_all") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test when_any") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -118,23 +64,11 @@ TEST_CASE("test awaiter_operations - test when_any") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test ranged when_all") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -153,23 +87,11 @@ TEST_CASE("test awaiter_operations - test ranged when_all") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test when_all with empty range") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { using OpAwaiter = @@ -183,20 +105,11 @@ TEST_CASE("test awaiter_operations - test when_all with empty range") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test ranged when_any") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts1{ @@ -225,23 +138,11 @@ TEST_CASE("test awaiter_operations - test ranged when_any") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test when_any with empty range") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { using OpAwaiter = @@ -255,20 +156,11 @@ TEST_CASE("test awaiter_operations - test when_any with empty range") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test &&") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -284,23 +176,11 @@ TEST_CASE("test awaiter_operations - test &&") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test ||") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -320,23 +200,11 @@ TEST_CASE("test awaiter_operations - test ||") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - mixed && and ||") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -358,23 +226,11 @@ TEST_CASE("test awaiter_operations - mixed && and ||") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - ranged awaiter push") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -393,23 +249,11 @@ TEST_CASE("test awaiter_operations - ranged awaiter push") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test link") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -430,23 +274,11 @@ TEST_CASE("test awaiter_operations - test link") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test >>") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -467,23 +299,11 @@ TEST_CASE("test awaiter_operations - test >>") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test drain") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -494,23 +314,11 @@ TEST_CASE("test awaiter_operations - test drain") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test drain with when_all") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -523,23 +331,11 @@ TEST_CASE("test awaiter_operations - test drain with when_all") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test parallel all") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -557,11 +353,6 @@ TEST_CASE("test awaiter_operations - test parallel all") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } \ No newline at end of file diff --git a/tests/test_op_awaiter.cpp b/tests/test_op_awaiter.cpp index 3479f3f..495c0ba 100644 --- a/tests/test_op_awaiter.cpp +++ b/tests/test_op_awaiter.cpp @@ -1,5 +1,6 @@ #include "condy/finish_handles.hpp" #include "condy/provided_buffers.hpp" +#include "condy/sync_wait.hpp" #include #include #include @@ -7,36 +8,7 @@ #include #include -namespace { - -void event_loop(size_t &unfinished) { - auto *ring = condy::detail::Context::current().ring(); - while (unfinished > 0) { - ring->submit(); - ring->reap_completions([&](io_uring_cqe *cqe) { - auto [data, type] = condy::decode_work(io_uring_cqe_get_data(cqe)); - if (type == condy::WorkType::Ignore) { - return; - } - auto handle_ptr = static_cast(data); - handle_ptr->handle(cqe); - }); - } -} - -// Just placeholder -condy::Runtime runtime; - -} // namespace - TEST_CASE("test op_awaiter - basic routine") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { co_await condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -46,23 +18,11 @@ TEST_CASE("test op_awaiter - basic routine") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test op_awaiter - multiple ops") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { co_await condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -73,23 +33,11 @@ TEST_CASE("test op_awaiter - multiple ops") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test op_awaiter - concurrent op") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto awaiter1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -105,23 +53,11 @@ TEST_CASE("test op_awaiter - concurrent op") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test op_awaiter - cancel op") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -143,115 +79,34 @@ TEST_CASE("test op_awaiter - cancel op") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } -namespace { - -void mock_multishot_event_loop(size_t &unfinished) { - auto *ring = condy::detail::Context::current().ring(); - while (unfinished > 0) { - ring->submit(); - ring->reap_completions([&](io_uring_cqe *cqe) { - auto [data, type] = condy::decode_work(io_uring_cqe_get_data(cqe)); - if (type == condy::WorkType::Ignore) { - return; - } - auto handle_ptr = static_cast(data); - // Mock Multishot - io_uring_cqe mock_cqe = *cqe; - mock_cqe.res = 42; - mock_cqe.flags |= IORING_CQE_F_MORE; - handle_ptr->handle(&mock_cqe); - handle_ptr->handle(cqe); - }); - } -} - -} // namespace - -TEST_CASE("test op_awaiter - multishot op") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); +TEST_CASE("test op_awaiter - select buffer op") { + int pipefd[2]; + REQUIRE(pipe(pipefd) == 0); - bool handle_called = false; - auto handle_multishot = [&](int res) -> condy::Coro { - REQUIRE(res == 42); - handle_called = true; - co_return; - }; + ssize_t r = ::write(pipefd[1], "test", 4); + REQUIRE(r == 4); size_t unfinished = 1; auto func = [&]() -> condy::Coro { - co_await condy::detail::make_multishot_op_awaiter( - [&](int res) { - auto coro = handle_multishot(res); - coro.release().resume(); - }, - io_uring_prep_nop); + condy::ProvidedBufferPool pool(16, 32); + auto [res, buf] = co_await condy::detail::make_select_buffer_op_awaiter( + &pool, io_uring_prep_read, pipefd[0], nullptr, 0, 0); + REQUIRE(res >= 0); + REQUIRE(buf.size() == 32); + REQUIRE(std::memcmp(buf.data(), "test", 4) == 0); --unfinished; }; auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - mock_multishot_event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - REQUIRE(handle_called == true); - - context.reset(); -} - -TEST_CASE("test op_awaiter - select buffer op") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - - { - condy::ProvidedBufferPool pool(16, 32); - - int pipefd[2]; - REQUIRE(pipe(pipefd) == 0); - - ssize_t r = ::write(pipefd[1], "test", 4); - REQUIRE(r == 4); - - size_t unfinished = 1; - auto func = [&]() -> condy::Coro { - auto [res, buf] = - co_await condy::detail::make_select_buffer_op_awaiter( - &pool, io_uring_prep_read, pipefd[0], nullptr, 0, 0); - REQUIRE(res >= 0); - REQUIRE(buf.size() == 32); - REQUIRE(std::memcmp(buf.data(), "test", 4) == 0); - --unfinished; - }; - - auto coro = func(); - REQUIRE(unfinished == 1); - - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); - REQUIRE(unfinished == 0); - } - context.reset(); + close(pipefd[0]); + close(pipefd[1]); } \ No newline at end of file diff --git a/tests/test_op_finish_handle.cpp b/tests/test_op_finish_handle.cpp index 821c13a..9c69b64 100644 --- a/tests/test_op_finish_handle.cpp +++ b/tests/test_op_finish_handle.cpp @@ -103,50 +103,6 @@ TEST_CASE("test op_finish_handle - concurrent ops") { context.reset(); } -TEST_CASE("test op_finish_handle - cancel op") { - size_t unfinished = 1; - SetUnfinishedInvoker invoker{unfinished}; - - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - - condy::OpFinishHandle handle1, handle2; - condy::ParallelFinishHandle, - condy::OpFinishHandle> - finish_handle; - finish_handle.init(&handle1, &handle2); - finish_handle.set_invoker(&invoker); - - auto *sqe = ring.get_sqe(); - __kernel_timespec ts{ - .tv_sec = 60ll * 60ll, - .tv_nsec = 0, - }; - io_uring_prep_timeout(sqe, &ts, 0, 0); - io_uring_sqe_set_data(sqe, &handle1); - - sqe = ring.get_sqe(); - io_uring_prep_nop(sqe); - io_uring_sqe_set_data(sqe, &handle2); - - event_loop(unfinished); - - REQUIRE(unfinished == 0); - - auto r = finish_handle.extract_result(); - auto &[order, results] = r; - REQUIRE(order[0] == 1); - REQUIRE(std::get<0>(results) == -ECANCELED); - REQUIRE(std::get<1>(results) == 0); - - context.reset(); -} - namespace { struct SetFinishWorkInvoker diff --git a/tests/test_parallel_awaiter.cpp b/tests/test_parallel_awaiter.cpp index a5f693d..443f001 100644 --- a/tests/test_parallel_awaiter.cpp +++ b/tests/test_parallel_awaiter.cpp @@ -1,4 +1,5 @@ #include "condy/invoker.hpp" +#include "condy/runtime.hpp" #include #include #include @@ -9,7 +10,7 @@ namespace { struct SimpleFinishHandle { using ReturnType = int; - void cancel() { cancelled_++; } + void cancel(condy::Runtime *) { cancelled_++; } void invoke(int res) { res_ = res; diff --git a/tests/test_parallel_finish_handle.cpp b/tests/test_parallel_finish_handle.cpp index baf3554..03836f3 100644 --- a/tests/test_parallel_finish_handle.cpp +++ b/tests/test_parallel_finish_handle.cpp @@ -11,7 +11,7 @@ struct SetFinishInvoker : public condy::InvokerAdapter { struct SimpleFinishHandle { using ReturnType = int; - void cancel() { cancelled_++; } + void cancel(condy::Runtime *) { cancelled_++; } void invoke(int res) { res_ = res; @@ -66,7 +66,7 @@ TEST_CASE("test parallel_finish_handle - RangedWhenAllFinishHandle cancel") { h2.invoke(2); REQUIRE(!invoker.finished); - handle.cancel(); + handle.cancel(nullptr); REQUIRE(!invoker.finished); REQUIRE(h1.cancelled_ == 1); REQUIRE(h2.cancelled_ == 1); @@ -152,7 +152,7 @@ TEST_CASE("test parallel_finish_handle - RangedWhenAnyFinishHandle " REQUIRE(!invoker.finished); REQUIRE(h2.cancelled_ == 1); // Should not increase - handle.cancel(); + handle.cancel(nullptr); REQUIRE(!invoker.finished); REQUIRE(h2.cancelled_ == 1); // Should not increase @@ -260,7 +260,7 @@ TEST_CASE("test parallel_finish_handle - WhenAllFinishHandle cancel") { h2.invoke(2); REQUIRE(!invoker.finished); - finish_handle.cancel(); + finish_handle.cancel(nullptr); REQUIRE(!invoker.finished); REQUIRE(h1.cancelled_ == 1); REQUIRE(h2.cancelled_ == 1); @@ -347,7 +347,7 @@ TEST_CASE("test parallel_finish_handle - WhenAnyFinishHandle multiple " REQUIRE(!invoker.finished); REQUIRE(h2.cancelled_ == 1); // Should not increase - finish_handle.cancel(); + finish_handle.cancel(nullptr); REQUIRE(!invoker.finished); REQUIRE(h2.cancelled_ == 1); // Should not increase