From 36864b690b6daa61f945612557332fd5195bb52a Mon Sep 17 00:00:00 2001 From: wokron Date: Thu, 2 Apr 2026 15:25:36 +0800 Subject: [PATCH 1/5] extract OpFinishHandleBase --- include/condy/finish_handle_base.hpp | 28 ++++++++++++++++++ include/condy/finish_handles.hpp | 43 +++++++--------------------- include/condy/runtime.hpp | 2 +- 3 files changed, 39 insertions(+), 34 deletions(-) create mode 100644 include/condy/finish_handle_base.hpp diff --git a/include/condy/finish_handle_base.hpp b/include/condy/finish_handle_base.hpp new file mode 100644 index 0000000..aa79562 --- /dev/null +++ b/include/condy/finish_handle_base.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "condy/concepts.hpp" +#include "condy/invoker.hpp" +#include + +namespace condy { + +class OpFinishHandleBase { +public: + using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept; + + bool handle(io_uring_cqe *cqe) noexcept { + assert(handle_func_ != nullptr); + return handle_func_(this, cqe); + } + + void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; } + +protected: + OpFinishHandleBase() = default; + +protected: + HandleFunc handle_func_ = nullptr; + Invoker *invoker_ = nullptr; +}; + +} // namespace condy \ No newline at end of file diff --git a/include/condy/finish_handles.hpp b/include/condy/finish_handles.hpp index 832bd1c..3203b66 100644 --- a/include/condy/finish_handles.hpp +++ b/include/condy/finish_handles.hpp @@ -9,11 +9,9 @@ #pragma once #include "condy/concepts.hpp" -#include "condy/condy_uring.hpp" -#include "condy/context.hpp" +#include "condy/finish_handle_base.hpp" #include "condy/invoker.hpp" -#include "condy/ring.hpp" -#include "condy/work_type.hpp" +#include "condy/runtime.hpp" #include #include #include @@ -24,35 +22,6 @@ namespace condy { -class Ring; - -class OpFinishHandleBase { -public: - using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept; - - void cancel() noexcept { - auto *ring = detail::Context::current().ring(); - io_uring_sqe *sqe = ring->get_sqe(); - io_uring_prep_cancel(sqe, this, 0); - io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore)); - io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); - } - - bool handle(io_uring_cqe *cqe) noexcept { - assert(handle_func_ != nullptr); - return handle_func_(this, cqe); - } - - void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; } - -protected: - OpFinishHandleBase() = default; - -protected: - HandleFunc handle_func_ = nullptr; - Invoker *invoker_ = nullptr; -}; - template class OpFinishHandle : public OpFinishHandleBase { public: @@ -67,6 +36,14 @@ class OpFinishHandle : public OpFinishHandleBase { return cqe_handler_.extract_result(); } + void cancel() noexcept { + auto *ring = detail::Context::current().ring(); + io_uring_sqe *sqe = ring->get_sqe(); + io_uring_prep_cancel(sqe, this, 0); + io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore)); + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + } + private: static bool handle_static_(void *data, io_uring_cqe *cqe) noexcept { auto *self = static_cast(data); diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index 940b6cb..197298c 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -7,7 +7,7 @@ #include "condy/condy_uring.hpp" #include "condy/context.hpp" -#include "condy/finish_handles.hpp" +#include "condy/finish_handle_base.hpp" #include "condy/intrusive.hpp" #include "condy/invoker.hpp" #include "condy/ring.hpp" From 49fea407f531d13fbb29fe207bad5ba07c6aed59 Mon Sep 17 00:00:00 2001 From: wokron Date: Thu, 2 Apr 2026 15:38:07 +0800 Subject: [PATCH 2/5] FinishHandle use Runtime cancel --- include/condy/channel.hpp | 6 ++++-- include/condy/channel_legacy.hpp | 6 ++++-- include/condy/concepts.hpp | 5 +++-- include/condy/finish_handles.hpp | 33 ++++++++++++++++---------------- 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/include/condy/channel.hpp b/include/condy/channel.hpp index 9d441f7..79af5f9 100644 --- a/include/condy/channel.hpp +++ b/include/condy/channel.hpp @@ -363,7 +363,8 @@ class Channel::PushFinishHandle PushFinishHandle(T item) : item_(std::move(item)) {} - void cancel() noexcept { + void cancel([[maybe_unused]] Runtime *runtime) noexcept { + assert(runtime == runtime_); if (channel_->cancel_push_(this)) { // Successfully canceled assert(result_ == -ENOTRECOVERABLE); @@ -422,7 +423,8 @@ class Channel::PopFinishHandle public: using ReturnType = std::pair; - void cancel() noexcept { + void cancel([[maybe_unused]] Runtime *runtime) noexcept { + assert(runtime == runtime_); if (channel_->cancel_pop_(this)) { // Successfully canceled assert(result_.first == -ENOTRECOVERABLE); diff --git a/include/condy/channel_legacy.hpp b/include/condy/channel_legacy.hpp index 9edf713..a573bcc 100644 --- a/include/condy/channel_legacy.hpp +++ b/include/condy/channel_legacy.hpp @@ -353,7 +353,8 @@ class Channel::PushFinishHandle PushFinishHandle(T item) : item_(std::move(item)) {} - void cancel() noexcept { + void cancel([[maybe_unused]] Runtime *runtime) noexcept { + assert(runtime == runtime_); if (channel_->cancel_push_(this)) { // Successfully canceled canceled_ = true; @@ -418,7 +419,8 @@ class Channel::PopFinishHandle public: using ReturnType = T; - void cancel() noexcept { + void cancel([[maybe_unused]] Runtime *runtime) noexcept { + assert(runtime == runtime_); if (channel_->cancel_pop_(this)) { // Successfully canceled runtime_->resume_work(); diff --git a/include/condy/concepts.hpp b/include/condy/concepts.hpp index f56b1a5..d1eff2d 100644 --- a/include/condy/concepts.hpp +++ b/include/condy/concepts.hpp @@ -19,6 +19,7 @@ namespace condy { class Ring; class Invoker; class BufferBase; +class Runtime; namespace detail { @@ -27,13 +28,13 @@ struct FixedFd; } // namespace detail template -concept HandleLike = requires(T handle, Invoker *invoker) { +concept HandleLike = requires(T handle, Invoker *invoker, Runtime *runtime) { typename std::decay_t::ReturnType; { handle.set_invoker(invoker) } -> std::same_as; { handle.extract_result() } -> std::same_as::ReturnType>; - { handle.cancel() } -> std::same_as; + { handle.cancel(runtime) } -> std::same_as; }; template diff --git a/include/condy/finish_handles.hpp b/include/condy/finish_handles.hpp index 3203b66..96aec0e 100644 --- a/include/condy/finish_handles.hpp +++ b/include/condy/finish_handles.hpp @@ -36,13 +36,7 @@ class OpFinishHandle : public OpFinishHandleBase { return cqe_handler_.extract_result(); } - void cancel() noexcept { - auto *ring = detail::Context::current().ring(); - io_uring_sqe *sqe = ring->get_sqe(); - io_uring_prep_cancel(sqe, this, 0); - io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore)); - io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); - } + void cancel(Runtime *runtime) noexcept { runtime->cancel(this); } private: static bool handle_static_(void *data, io_uring_cqe *cqe) noexcept { @@ -167,11 +161,11 @@ template class RangedParallelFinishHandle { order_.resize(handles_.size()); } - void cancel() noexcept { + void cancel(Runtime *runtime) noexcept { if (!canceled_) { canceled_ = true; for (auto &handle : handles_) { - handle->cancel(); + handle->cancel(runtime); } } } @@ -193,11 +187,12 @@ template class RangedParallelFinishHandle { order_[no] = idx; if constexpr (Cancel) { + auto *runtime = detail::Context::current().runtime(); if (!canceled_) { canceled_ = true; for (size_t i = 0; i < handles_.size(); i++) { if (i != idx) { - handles_[i]->cancel(); + handles_[i]->cancel(runtime); } } } @@ -271,11 +266,14 @@ template class ParallelFinishHandle { foreach_set_invoker_(); } - void cancel() noexcept { + void cancel(Runtime *runtime) noexcept { if (!canceled_) { canceled_ = true; - std::apply([](auto *...handles) { (handles->cancel(), ...); }, - handles_); + std::apply( + [runtime](auto *...handles) { + (handles->cancel(runtime), ...); + }, + handles_); } } @@ -303,13 +301,13 @@ template class ParallelFinishHandle { } template - void foreach_call_cancel_() noexcept { + void foreach_call_cancel_(Runtime *runtime) noexcept { if constexpr (I < sizeof...(Handles)) { auto handle = std::get(handles_); if constexpr (I != SkipIdx) { - handle->cancel(); + handle->cancel(runtime); } - foreach_call_cancel_(); + foreach_call_cancel_(runtime); } } @@ -320,7 +318,8 @@ template class ParallelFinishHandle { if constexpr (Cancel) { if (!canceled_) { canceled_ = true; - foreach_call_cancel_(); + auto *runtime = detail::Context::current().runtime(); + foreach_call_cancel_(runtime); } } From c2aa4eef89beb47ba765d7e998db5e475ffc2a87 Mon Sep 17 00:00:00 2001 From: wokron Date: Thu, 2 Apr 2026 17:44:10 +0800 Subject: [PATCH 3/5] update test case --- tests/test_awaiter_operations.cpp | 243 ++------------------------ tests/test_op_awaiter.cpp | 183 ++----------------- tests/test_op_finish_handle.cpp | 44 ----- tests/test_parallel_awaiter.cpp | 3 +- tests/test_parallel_finish_handle.cpp | 10 +- 5 files changed, 43 insertions(+), 440 deletions(-) diff --git a/tests/test_awaiter_operations.cpp b/tests/test_awaiter_operations.cpp index 4ee7348..c57df09 100644 --- a/tests/test_awaiter_operations.cpp +++ b/tests/test_awaiter_operations.cpp @@ -1,5 +1,4 @@ -#include "condy/finish_handles.hpp" -#include "condy/work_type.hpp" +#include "condy/sync_wait.hpp" #include #include #include @@ -11,36 +10,7 @@ using namespace condy::operators; -namespace { - -void event_loop(size_t &unfinished) { - auto *ring = condy::detail::Context::current().ring(); - while (unfinished > 0) { - ring->submit(); - ring->reap_completions([&](io_uring_cqe *cqe) { - auto [data, type] = condy::decode_work(io_uring_cqe_get_data(cqe)); - if (type == condy::WorkType::Ignore) { - return; - } - auto handle_ptr = static_cast(data); - handle_ptr->handle(cqe); - }); - } -} - -// Just placeholder -condy::Runtime runtime; - -} // namespace - TEST_CASE("test awaiter_operations - test make_op_awaiter") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { co_await condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -50,23 +20,11 @@ TEST_CASE("test awaiter_operations - test make_op_awaiter") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test when_all") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -82,23 +40,11 @@ TEST_CASE("test awaiter_operations - test when_all") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test when_any") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -118,23 +64,11 @@ TEST_CASE("test awaiter_operations - test when_any") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test ranged when_all") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -153,23 +87,11 @@ TEST_CASE("test awaiter_operations - test ranged when_all") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test when_all with empty range") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { using OpAwaiter = @@ -183,20 +105,11 @@ TEST_CASE("test awaiter_operations - test when_all with empty range") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test ranged when_any") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts1{ @@ -225,23 +138,11 @@ TEST_CASE("test awaiter_operations - test ranged when_any") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test when_any with empty range") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { using OpAwaiter = @@ -255,20 +156,11 @@ TEST_CASE("test awaiter_operations - test when_any with empty range") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test &&") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -284,23 +176,11 @@ TEST_CASE("test awaiter_operations - test &&") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test ||") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -320,23 +200,11 @@ TEST_CASE("test awaiter_operations - test ||") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - mixed && and ||") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -358,23 +226,11 @@ TEST_CASE("test awaiter_operations - mixed && and ||") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - ranged awaiter push") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -393,23 +249,11 @@ TEST_CASE("test awaiter_operations - ranged awaiter push") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test link") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -430,23 +274,11 @@ TEST_CASE("test awaiter_operations - test link") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test >>") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -467,23 +299,11 @@ TEST_CASE("test awaiter_operations - test >>") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test drain") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -494,23 +314,11 @@ TEST_CASE("test awaiter_operations - test drain") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test drain with when_all") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -523,23 +331,11 @@ TEST_CASE("test awaiter_operations - test drain with when_all") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test awaiter_operations - test parallel all") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto aw1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -557,11 +353,6 @@ TEST_CASE("test awaiter_operations - test parallel all") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } \ No newline at end of file diff --git a/tests/test_op_awaiter.cpp b/tests/test_op_awaiter.cpp index 3479f3f..495c0ba 100644 --- a/tests/test_op_awaiter.cpp +++ b/tests/test_op_awaiter.cpp @@ -1,5 +1,6 @@ #include "condy/finish_handles.hpp" #include "condy/provided_buffers.hpp" +#include "condy/sync_wait.hpp" #include #include #include @@ -7,36 +8,7 @@ #include #include -namespace { - -void event_loop(size_t &unfinished) { - auto *ring = condy::detail::Context::current().ring(); - while (unfinished > 0) { - ring->submit(); - ring->reap_completions([&](io_uring_cqe *cqe) { - auto [data, type] = condy::decode_work(io_uring_cqe_get_data(cqe)); - if (type == condy::WorkType::Ignore) { - return; - } - auto handle_ptr = static_cast(data); - handle_ptr->handle(cqe); - }); - } -} - -// Just placeholder -condy::Runtime runtime; - -} // namespace - TEST_CASE("test op_awaiter - basic routine") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { co_await condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -46,23 +18,11 @@ TEST_CASE("test op_awaiter - basic routine") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test op_awaiter - multiple ops") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { co_await condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -73,23 +33,11 @@ TEST_CASE("test op_awaiter - multiple ops") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test op_awaiter - concurrent op") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { auto awaiter1 = condy::detail::make_op_awaiter(io_uring_prep_nop); @@ -105,23 +53,11 @@ TEST_CASE("test op_awaiter - concurrent op") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } TEST_CASE("test op_awaiter - cancel op") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - size_t unfinished = 1; auto func = [&]() -> condy::Coro { __kernel_timespec ts{ @@ -143,115 +79,34 @@ TEST_CASE("test op_awaiter - cancel op") { auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - - context.reset(); } -namespace { - -void mock_multishot_event_loop(size_t &unfinished) { - auto *ring = condy::detail::Context::current().ring(); - while (unfinished > 0) { - ring->submit(); - ring->reap_completions([&](io_uring_cqe *cqe) { - auto [data, type] = condy::decode_work(io_uring_cqe_get_data(cqe)); - if (type == condy::WorkType::Ignore) { - return; - } - auto handle_ptr = static_cast(data); - // Mock Multishot - io_uring_cqe mock_cqe = *cqe; - mock_cqe.res = 42; - mock_cqe.flags |= IORING_CQE_F_MORE; - handle_ptr->handle(&mock_cqe); - handle_ptr->handle(cqe); - }); - } -} - -} // namespace - -TEST_CASE("test op_awaiter - multishot op") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); +TEST_CASE("test op_awaiter - select buffer op") { + int pipefd[2]; + REQUIRE(pipe(pipefd) == 0); - bool handle_called = false; - auto handle_multishot = [&](int res) -> condy::Coro { - REQUIRE(res == 42); - handle_called = true; - co_return; - }; + ssize_t r = ::write(pipefd[1], "test", 4); + REQUIRE(r == 4); size_t unfinished = 1; auto func = [&]() -> condy::Coro { - co_await condy::detail::make_multishot_op_awaiter( - [&](int res) { - auto coro = handle_multishot(res); - coro.release().resume(); - }, - io_uring_prep_nop); + condy::ProvidedBufferPool pool(16, 32); + auto [res, buf] = co_await condy::detail::make_select_buffer_op_awaiter( + &pool, io_uring_prep_read, pipefd[0], nullptr, 0, 0); + REQUIRE(res >= 0); + REQUIRE(buf.size() == 32); + REQUIRE(std::memcmp(buf.data(), "test", 4) == 0); --unfinished; }; auto coro = func(); REQUIRE(unfinished == 1); - coro.release().resume(); - REQUIRE(unfinished == 1); - - mock_multishot_event_loop(unfinished); + condy::sync_wait(std::move(coro)); REQUIRE(unfinished == 0); - REQUIRE(handle_called == true); - - context.reset(); -} - -TEST_CASE("test op_awaiter - select buffer op") { - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - - { - condy::ProvidedBufferPool pool(16, 32); - - int pipefd[2]; - REQUIRE(pipe(pipefd) == 0); - - ssize_t r = ::write(pipefd[1], "test", 4); - REQUIRE(r == 4); - - size_t unfinished = 1; - auto func = [&]() -> condy::Coro { - auto [res, buf] = - co_await condy::detail::make_select_buffer_op_awaiter( - &pool, io_uring_prep_read, pipefd[0], nullptr, 0, 0); - REQUIRE(res >= 0); - REQUIRE(buf.size() == 32); - REQUIRE(std::memcmp(buf.data(), "test", 4) == 0); - --unfinished; - }; - - auto coro = func(); - REQUIRE(unfinished == 1); - - coro.release().resume(); - REQUIRE(unfinished == 1); - - event_loop(unfinished); - REQUIRE(unfinished == 0); - } - context.reset(); + close(pipefd[0]); + close(pipefd[1]); } \ No newline at end of file diff --git a/tests/test_op_finish_handle.cpp b/tests/test_op_finish_handle.cpp index 821c13a..9c69b64 100644 --- a/tests/test_op_finish_handle.cpp +++ b/tests/test_op_finish_handle.cpp @@ -103,50 +103,6 @@ TEST_CASE("test op_finish_handle - concurrent ops") { context.reset(); } -TEST_CASE("test op_finish_handle - cancel op") { - size_t unfinished = 1; - SetUnfinishedInvoker invoker{unfinished}; - - condy::Ring ring; - io_uring_params params{}; - std::memset(¶ms, 0, sizeof(params)); - ring.init(8, ¶ms); - auto &context = condy::detail::Context::current(); - context.init(&ring, &runtime); - - condy::OpFinishHandle handle1, handle2; - condy::ParallelFinishHandle, - condy::OpFinishHandle> - finish_handle; - finish_handle.init(&handle1, &handle2); - finish_handle.set_invoker(&invoker); - - auto *sqe = ring.get_sqe(); - __kernel_timespec ts{ - .tv_sec = 60ll * 60ll, - .tv_nsec = 0, - }; - io_uring_prep_timeout(sqe, &ts, 0, 0); - io_uring_sqe_set_data(sqe, &handle1); - - sqe = ring.get_sqe(); - io_uring_prep_nop(sqe); - io_uring_sqe_set_data(sqe, &handle2); - - event_loop(unfinished); - - REQUIRE(unfinished == 0); - - auto r = finish_handle.extract_result(); - auto &[order, results] = r; - REQUIRE(order[0] == 1); - REQUIRE(std::get<0>(results) == -ECANCELED); - REQUIRE(std::get<1>(results) == 0); - - context.reset(); -} - namespace { struct SetFinishWorkInvoker diff --git a/tests/test_parallel_awaiter.cpp b/tests/test_parallel_awaiter.cpp index a5f693d..443f001 100644 --- a/tests/test_parallel_awaiter.cpp +++ b/tests/test_parallel_awaiter.cpp @@ -1,4 +1,5 @@ #include "condy/invoker.hpp" +#include "condy/runtime.hpp" #include #include #include @@ -9,7 +10,7 @@ namespace { struct SimpleFinishHandle { using ReturnType = int; - void cancel() { cancelled_++; } + void cancel(condy::Runtime *) { cancelled_++; } void invoke(int res) { res_ = res; diff --git a/tests/test_parallel_finish_handle.cpp b/tests/test_parallel_finish_handle.cpp index baf3554..03836f3 100644 --- a/tests/test_parallel_finish_handle.cpp +++ b/tests/test_parallel_finish_handle.cpp @@ -11,7 +11,7 @@ struct SetFinishInvoker : public condy::InvokerAdapter { struct SimpleFinishHandle { using ReturnType = int; - void cancel() { cancelled_++; } + void cancel(condy::Runtime *) { cancelled_++; } void invoke(int res) { res_ = res; @@ -66,7 +66,7 @@ TEST_CASE("test parallel_finish_handle - RangedWhenAllFinishHandle cancel") { h2.invoke(2); REQUIRE(!invoker.finished); - handle.cancel(); + handle.cancel(nullptr); REQUIRE(!invoker.finished); REQUIRE(h1.cancelled_ == 1); REQUIRE(h2.cancelled_ == 1); @@ -152,7 +152,7 @@ TEST_CASE("test parallel_finish_handle - RangedWhenAnyFinishHandle " REQUIRE(!invoker.finished); REQUIRE(h2.cancelled_ == 1); // Should not increase - handle.cancel(); + handle.cancel(nullptr); REQUIRE(!invoker.finished); REQUIRE(h2.cancelled_ == 1); // Should not increase @@ -260,7 +260,7 @@ TEST_CASE("test parallel_finish_handle - WhenAllFinishHandle cancel") { h2.invoke(2); REQUIRE(!invoker.finished); - finish_handle.cancel(); + finish_handle.cancel(nullptr); REQUIRE(!invoker.finished); REQUIRE(h1.cancelled_ == 1); REQUIRE(h2.cancelled_ == 1); @@ -347,7 +347,7 @@ TEST_CASE("test parallel_finish_handle - WhenAnyFinishHandle multiple " REQUIRE(!invoker.finished); REQUIRE(h2.cancelled_ == 1); // Should not increase - finish_handle.cancel(); + finish_handle.cancel(nullptr); REQUIRE(!invoker.finished); REQUIRE(h2.cancelled_ == 1); // Should not increase From 24f62e7efd5d79c0203edb8a2d5e455e9622040a Mon Sep 17 00:00:00 2001 From: wokron Date: Thu, 2 Apr 2026 22:32:49 +0800 Subject: [PATCH 4/5] move OpFinishHandleBase to runtime.hpp --- include/condy/finish_handle_base.hpp | 28 ---------------------------- include/condy/finish_handles.hpp | 1 - include/condy/runtime.hpp | 20 +++++++++++++++++++- 3 files changed, 19 insertions(+), 30 deletions(-) delete mode 100644 include/condy/finish_handle_base.hpp diff --git a/include/condy/finish_handle_base.hpp b/include/condy/finish_handle_base.hpp deleted file mode 100644 index aa79562..0000000 --- a/include/condy/finish_handle_base.hpp +++ /dev/null @@ -1,28 +0,0 @@ -#pragma once - -#include "condy/concepts.hpp" -#include "condy/invoker.hpp" -#include - -namespace condy { - -class OpFinishHandleBase { -public: - using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept; - - bool handle(io_uring_cqe *cqe) noexcept { - assert(handle_func_ != nullptr); - return handle_func_(this, cqe); - } - - void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; } - -protected: - OpFinishHandleBase() = default; - -protected: - HandleFunc handle_func_ = nullptr; - Invoker *invoker_ = nullptr; -}; - -} // namespace condy \ No newline at end of file diff --git a/include/condy/finish_handles.hpp b/include/condy/finish_handles.hpp index 96aec0e..052722a 100644 --- a/include/condy/finish_handles.hpp +++ b/include/condy/finish_handles.hpp @@ -9,7 +9,6 @@ #pragma once #include "condy/concepts.hpp" -#include "condy/finish_handle_base.hpp" #include "condy/invoker.hpp" #include "condy/runtime.hpp" #include diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index 197298c..cee9652 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -7,7 +7,6 @@ #include "condy/condy_uring.hpp" #include "condy/context.hpp" -#include "condy/finish_handle_base.hpp" #include "condy/intrusive.hpp" #include "condy/invoker.hpp" #include "condy/ring.hpp" @@ -64,6 +63,25 @@ inline int sync_msg_ring(io_uring_sqe *sqe_data) noexcept { } // namespace detail +class OpFinishHandleBase { +public: + using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept; + + bool handle(io_uring_cqe *cqe) noexcept { + assert(handle_func_ != nullptr); + return handle_func_(this, cqe); + } + + void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; } + +protected: + OpFinishHandleBase() = default; + +protected: + HandleFunc handle_func_ = nullptr; + Invoker *invoker_ = nullptr; +}; + /** * @brief The event loop runtime for executing asynchronous * @details This class provides a single-threaded runtime for executing From 30633499228029fe30ec77747d2f95356e6dc347 Mon Sep 17 00:00:00 2001 From: wokron Date: Thu, 2 Apr 2026 22:34:13 +0800 Subject: [PATCH 5/5] add some assert --- include/condy/finish_handles.hpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/include/condy/finish_handles.hpp b/include/condy/finish_handles.hpp index 052722a..21461c7 100644 --- a/include/condy/finish_handles.hpp +++ b/include/condy/finish_handles.hpp @@ -12,6 +12,7 @@ #include "condy/invoker.hpp" #include "condy/runtime.hpp" #include +#include #include #include #include @@ -35,7 +36,10 @@ class OpFinishHandle : public OpFinishHandleBase { return cqe_handler_.extract_result(); } - void cancel(Runtime *runtime) noexcept { runtime->cancel(this); } + void cancel(Runtime *runtime) noexcept { + assert(runtime != nullptr); + runtime->cancel(this); + } private: static bool handle_static_(void *data, io_uring_cqe *cqe) noexcept { @@ -186,9 +190,9 @@ template class RangedParallelFinishHandle { order_[no] = idx; if constexpr (Cancel) { - auto *runtime = detail::Context::current().runtime(); if (!canceled_) { canceled_ = true; + auto *runtime = detail::Context::current().runtime(); for (size_t i = 0; i < handles_.size(); i++) { if (i != idx) { handles_[i]->cancel(runtime);