Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions include/condy/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ class Channel<T, N>::PushFinishHandle

PushFinishHandle(T item) : item_(std::move(item)) {}

void cancel() noexcept {
void cancel([[maybe_unused]] Runtime *runtime) noexcept {
assert(runtime == runtime_);
if (channel_->cancel_push_(this)) {
// Successfully canceled
assert(result_ == -ENOTRECOVERABLE);
Expand Down Expand Up @@ -422,7 +423,8 @@ class Channel<T, N>::PopFinishHandle
public:
using ReturnType = std::pair<int, T>;

void cancel() noexcept {
void cancel([[maybe_unused]] Runtime *runtime) noexcept {
assert(runtime == runtime_);
if (channel_->cancel_pop_(this)) {
// Successfully canceled
assert(result_.first == -ENOTRECOVERABLE);
Expand Down
6 changes: 4 additions & 2 deletions include/condy/channel_legacy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ class Channel<T, N>::PushFinishHandle

PushFinishHandle(T item) : item_(std::move(item)) {}

void cancel() noexcept {
void cancel([[maybe_unused]] Runtime *runtime) noexcept {
assert(runtime == runtime_);
if (channel_->cancel_push_(this)) {
// Successfully canceled
canceled_ = true;
Expand Down Expand Up @@ -418,7 +419,8 @@ class Channel<T, N>::PopFinishHandle
public:
using ReturnType = T;

void cancel() noexcept {
void cancel([[maybe_unused]] Runtime *runtime) noexcept {
assert(runtime == runtime_);
if (channel_->cancel_pop_(this)) {
// Successfully canceled
runtime_->resume_work();
Expand Down
5 changes: 3 additions & 2 deletions include/condy/concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace condy {
class Ring;
class Invoker;
class BufferBase;
class Runtime;

namespace detail {

Expand All @@ -27,13 +28,13 @@ struct FixedFd;
} // namespace detail

template <typename T>
concept HandleLike = requires(T handle, Invoker *invoker) {
concept HandleLike = requires(T handle, Invoker *invoker, Runtime *runtime) {
typename std::decay_t<T>::ReturnType;
{ handle.set_invoker(invoker) } -> std::same_as<void>;
{
handle.extract_result()
} -> std::same_as<typename std::decay_t<T>::ReturnType>;
{ handle.cancel() } -> std::same_as<void>;
{ handle.cancel(runtime) } -> std::same_as<void>;
};

template <typename T>
Expand Down
65 changes: 22 additions & 43 deletions include/condy/finish_handles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
#pragma once

#include "condy/concepts.hpp"
#include "condy/condy_uring.hpp"
#include "condy/context.hpp"
#include "condy/invoker.hpp"
#include "condy/ring.hpp"
#include "condy/work_type.hpp"
#include "condy/runtime.hpp"
#include <array>
#include <cassert>
#include <cerrno>
#include <cstddef>
#include <tuple>
Expand All @@ -24,35 +22,6 @@

namespace condy {

class Ring;

class OpFinishHandleBase {
public:
using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept;

void cancel() noexcept {
auto *ring = detail::Context::current().ring();
io_uring_sqe *sqe = ring->get_sqe();
io_uring_prep_cancel(sqe, this, 0);
io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore));
io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
}

bool handle(io_uring_cqe *cqe) noexcept {
assert(handle_func_ != nullptr);
return handle_func_(this, cqe);
}

void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; }

protected:
OpFinishHandleBase() = default;

protected:
HandleFunc handle_func_ = nullptr;
Invoker *invoker_ = nullptr;
};

template <CQEHandlerLike CQEHandler>
class OpFinishHandle : public OpFinishHandleBase {
public:
Expand All @@ -67,6 +36,11 @@ class OpFinishHandle : public OpFinishHandleBase {
return cqe_handler_.extract_result();
}

void cancel(Runtime *runtime) noexcept {
assert(runtime != nullptr);
runtime->cancel(this);
}

private:
static bool handle_static_(void *data, io_uring_cqe *cqe) noexcept {
auto *self = static_cast<OpFinishHandle *>(data);
Expand Down Expand Up @@ -190,11 +164,11 @@ template <bool Cancel, HandleLike Handle> class RangedParallelFinishHandle {
order_.resize(handles_.size());
}

void cancel() noexcept {
void cancel(Runtime *runtime) noexcept {
if (!canceled_) {
canceled_ = true;
for (auto &handle : handles_) {
handle->cancel();
handle->cancel(runtime);
}
}
}
Expand All @@ -218,9 +192,10 @@ template <bool Cancel, HandleLike Handle> class RangedParallelFinishHandle {
if constexpr (Cancel) {
if (!canceled_) {
canceled_ = true;
auto *runtime = detail::Context::current().runtime();
for (size_t i = 0; i < handles_.size(); i++) {
if (i != idx) {
handles_[i]->cancel();
handles_[i]->cancel(runtime);
}
}
}
Expand Down Expand Up @@ -294,11 +269,14 @@ template <bool Cancel, HandleLike... Handles> class ParallelFinishHandle {
foreach_set_invoker_();
}

void cancel() noexcept {
void cancel(Runtime *runtime) noexcept {
if (!canceled_) {
canceled_ = true;
std::apply([](auto *...handles) { (handles->cancel(), ...); },
handles_);
std::apply(
[runtime](auto *...handles) {
(handles->cancel(runtime), ...);
},
handles_);
}
}

Expand Down Expand Up @@ -326,13 +304,13 @@ template <bool Cancel, HandleLike... Handles> class ParallelFinishHandle {
}

template <size_t SkipIdx, size_t I = 0>
void foreach_call_cancel_() noexcept {
void foreach_call_cancel_(Runtime *runtime) noexcept {
if constexpr (I < sizeof...(Handles)) {
auto handle = std::get<I>(handles_);
if constexpr (I != SkipIdx) {
handle->cancel();
handle->cancel(runtime);
}
foreach_call_cancel_<SkipIdx, I + 1>();
foreach_call_cancel_<SkipIdx, I + 1>(runtime);
}
}

Expand All @@ -343,7 +321,8 @@ template <bool Cancel, HandleLike... Handles> class ParallelFinishHandle {
if constexpr (Cancel) {
if (!canceled_) {
canceled_ = true;
foreach_call_cancel_<Idx>();
auto *runtime = detail::Context::current().runtime();
foreach_call_cancel_<Idx>(runtime);
}
}

Expand Down
20 changes: 19 additions & 1 deletion include/condy/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

#include "condy/condy_uring.hpp"
#include "condy/context.hpp"
#include "condy/finish_handles.hpp"
#include "condy/intrusive.hpp"
#include "condy/invoker.hpp"
#include "condy/ring.hpp"
Expand Down Expand Up @@ -64,6 +63,25 @@ inline int sync_msg_ring(io_uring_sqe *sqe_data) noexcept {

} // namespace detail

class OpFinishHandleBase {
public:
using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept;

bool handle(io_uring_cqe *cqe) noexcept {
assert(handle_func_ != nullptr);
return handle_func_(this, cqe);
}

void set_invoker(Invoker *invoker) noexcept { invoker_ = invoker; }

protected:
OpFinishHandleBase() = default;

protected:
HandleFunc handle_func_ = nullptr;
Invoker *invoker_ = nullptr;
};

/**
* @brief The event loop runtime for executing asynchronous
* @details This class provides a single-threaded runtime for executing
Expand Down
Loading
Loading