Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions include/elio/coro/cancel_token.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ struct cancel_state {
uint64_t next_id = 1;

uint64_t add_callback(std::function<void()> cb) {
std::lock_guard<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
if (cancelled.load(std::memory_order_relaxed)) {
// Already cancelled, invoke immediately outside lock
// Need to release lock first
mutex.unlock();
// Already cancelled, invoke callback outside lock
// Release the unique_lock to avoid double-unlock UB
lock.release();
cb();
mutex.lock();
// Don't re-acquire - we're done
return 0;
}
uint64_t id = next_id++;
Expand Down
13 changes: 10 additions & 3 deletions include/elio/coro/frame_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <array>
#include <atomic>
#include <cstdint>
#include <mutex>

namespace elio::coro {

Expand Down Expand Up @@ -216,6 +217,10 @@ class frame_allocator {
// Pool registry for cross-thread access
static constexpr size_t MAX_POOLS = 256;

// Registry entries - atomic for lock-free reads, protected by mutex for writes
static inline std::atomic<frame_allocator*> pool_registry_[MAX_POOLS]{};
static inline std::mutex registry_mutex_; // Protects unregister operations

static void register_pool(frame_allocator* pool) noexcept {
uint32_t id = pool->pool_id_;
if (id < MAX_POOLS) {
Expand All @@ -226,10 +231,15 @@ class frame_allocator {
static void unregister_pool(frame_allocator* pool) noexcept {
uint32_t id = pool->pool_id_;
if (id < MAX_POOLS) {
// Use mutex to ensure no concurrent lookups during unregister
// This prevents the race where a lookup sees a valid pointer
// but the pool is being destroyed
std::lock_guard<std::mutex> lock(registry_mutex_);
pool_registry_[id].store(nullptr, std::memory_order_release);
}
}

// Get pool by ID - returns nullptr if pool was unregistered
static frame_allocator* get_pool_by_id(uint32_t id) noexcept {
if (id < MAX_POOLS) {
return pool_registry_[id].load(std::memory_order_acquire);
Expand All @@ -247,9 +257,6 @@ class frame_allocator {

// Global pool ID counter
static inline std::atomic<uint32_t> next_pool_id_{0};

// Global pool registry for cross-thread lookups
static inline std::array<std::atomic<frame_allocator*>, MAX_POOLS> pool_registry_{};
};

} // namespace elio::coro
20 changes: 20 additions & 0 deletions include/elio/coro/task_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,18 @@ struct task_state {
}

bool set_waiter(std::coroutine_handle<> h) noexcept {
// Fast path: check if already done before trying to set waiter
// This avoids the race of setting a waiter on an already-completed task
if (is_done()) {
return false;
}

// Try to atomically set the waiter
void* expected = nullptr;
if (waiter_.compare_exchange_strong(expected, h.address(),
std::memory_order_release, std::memory_order_acquire)) {
// Double-check if task completed between our initial check and CAS
// If so, we need to notify the waiter we just set
if (is_done()) {
void* addr = waiter_.exchange(nullptr, std::memory_order_acq_rel);
if (addr) {
Expand All @@ -136,6 +145,7 @@ struct task_state {
}
return true;
}
// Another waiter already set (shouldn't happen with single await)
return false;
}

Expand Down Expand Up @@ -201,9 +211,18 @@ struct task_state<void> {
}

bool set_waiter(std::coroutine_handle<> h) noexcept {
// Fast path: check if already done before trying to set waiter
// This avoids the race of setting a waiter on an already-completed task
if (is_done()) {
return false;
}

// Try to atomically set the waiter
void* expected = nullptr;
if (waiter_.compare_exchange_strong(expected, h.address(),
std::memory_order_release, std::memory_order_acquire)) {
// Double-check if task completed between our initial check and CAS
// If so, we need to notify the waiter we just set
if (is_done()) {
void* addr = waiter_.exchange(nullptr, std::memory_order_acq_rel);
if (addr) {
Expand All @@ -213,6 +232,7 @@ struct task_state<void> {
}
return true;
}
// Another waiter already set (shouldn't happen with single await)
return false;
}

Expand Down
43 changes: 31 additions & 12 deletions include/elio/io/io_uring_backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,30 @@

namespace elio::io {

/// Map to track handles being resumed for thread-safe check-and-resume
/// Uses a mutex for simplicity - a handle address is mapped to a bool flag
/// indicating if the resume is in progress
inline std::unordered_map<void*, std::atomic<bool>>& get_resume_tracking_map() {
static std::unordered_map<void*, std::atomic<bool>> map;
return map;
/// Number of shards for resume tracking map to reduce lock contention
/// Using a power of 2 allows fast modulo via bitwise AND
static constexpr size_t kResumeTrackingShards = 16;

/// Get the shard index for a given user_data pointer
inline size_t get_resume_tracking_shard(const void* user_data) {
// Use pointer value to distribute across shards
// Mix bits to get better distribution
uintptr_t addr = reinterpret_cast<uintptr_t>(user_data);
return (addr ^ (addr >> 8) ^ (addr >> 16)) & (kResumeTrackingShards - 1);
}

inline std::mutex& get_resume_tracking_mutex() {
static std::mutex mutex;
return mutex;
/// Per-shard tracking data
struct resume_tracking_shard {
std::unordered_map<void*, std::atomic<bool>> map;
std::mutex mutex;
};

/// Get the sharded resume tracking maps and mutexes
/// Using sharding to reduce lock contention under high I/O load
inline std::array<resume_tracking_shard, kResumeTrackingShards>&
get_resume_tracking_shards() {
static std::array<resume_tracking_shard, kResumeTrackingShards> shards;
return shards;
}

/// io_uring backend implementation
Expand Down Expand Up @@ -410,11 +423,17 @@ class io_uring_backend : public io_backend {
auto handle = std::coroutine_handle<>::from_address(user_data);

if (handle) {
// Thread-safe check-and-resume using atomic operations.
// Thread-safe check-and-resume using sharded atomic operations.
// Uses per-shard locking to reduce lock contention under high I/O load.
// Try to atomically claim this resume to prevent use-after-free.
// If another thread is already resuming this handle, skip.
auto& tracking_map = get_resume_tracking_map();
std::lock_guard<std::mutex> lock(get_resume_tracking_mutex());

// Get the shard for this user_data to reduce contention
size_t shard = get_resume_tracking_shard(user_data);
auto& shards = get_resume_tracking_shards();

std::lock_guard<std::mutex> lock(shards[shard].mutex);
auto& tracking_map = shards[shard].map;

// Get or create the atomic flag for this handle
auto it = tracking_map.find(user_data);
Expand Down
2 changes: 1 addition & 1 deletion include/elio/runtime/mpsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class mpsc_queue {
mpsc_queue& operator=(const mpsc_queue&) = delete;
mpsc_queue(mpsc_queue&&) = delete;
mpsc_queue& operator=(mpsc_queue&&) = delete;

/// Push an item (multiple producers allowed) - lock-free, wait-free for non-full queue
bool push(T* item) noexcept {
size_t pos = tail_.load(std::memory_order_relaxed);
Expand Down
26 changes: 13 additions & 13 deletions include/elio/runtime/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,11 @@ inline void worker_thread::stop() {
inline void worker_thread::drain_remaining_tasks() noexcept {
// First drain inbox to deque
void* addr;
while ((addr = inbox_.pop()) != nullptr) {
queue_.push(addr);
while ((addr = inbox_->pop()) != nullptr) {
queue_->push(addr);
}
// Then destroy all tasks in the deque
while ((addr = queue_.pop()) != nullptr) {
while ((addr = queue_->pop()) != nullptr) {
auto handle = std::coroutine_handle<>::from_address(addr);
if (handle) {
handle.destroy();
Expand All @@ -325,11 +325,11 @@ inline void worker_thread::drain_remaining_tasks() noexcept {
inline void worker_thread::redistribute_tasks(scheduler* sched) noexcept {
// First drain inbox to deque
void* addr;
while ((addr = inbox_.pop()) != nullptr) {
queue_.push(addr);
while ((addr = inbox_->pop()) != nullptr) {
queue_->push(addr);
}
// Then redistribute all tasks to active workers
while ((addr = queue_.pop()) != nullptr) {
while ((addr = queue_->pop()) != nullptr) {
auto handle = std::coroutine_handle<>::from_address(addr);
if (handle && !handle.done()) {
// Respawn to an active worker
Expand All @@ -344,8 +344,8 @@ inline void worker_thread::drain_inbox() noexcept {
// Drain MPSC inbox into local Chase-Lev deque
// Drain all available items to ensure tasks aren't stuck in inbox
void* item;
while ((item = inbox_.pop()) != nullptr) {
queue_.push(item);
while ((item = inbox_->pop()) != nullptr) {
queue_->push(item);
}
}

Expand Down Expand Up @@ -388,17 +388,17 @@ inline std::coroutine_handle<> worker_thread::get_next_task() noexcept {
// In single-worker mode, use pop_local() to skip seq_cst fence
// Pass !single_worker as allow_concurrent_steals - pop_local() will fall back
// to pop() if there could be concurrent stealers
void* addr = queue_.pop_local(!single_worker);
void* addr = queue_->pop_local(!single_worker);
if (addr) {
needs_sync_ = false; // Local task, no sync needed
return std::coroutine_handle<>::from_address(addr);
}

// Local queue empty - drain any externally submitted tasks from inbox
drain_inbox();

// Try local deque again after draining inbox
addr = queue_.pop_local(!single_worker);
addr = queue_->pop_local(!single_worker);
if (addr) {
needs_sync_ = true; // Came from inbox, needs sync
return std::coroutine_handle<>::from_address(addr);
Expand Down Expand Up @@ -500,7 +500,7 @@ inline void worker_thread::poll_io_when_idle() {
// Optional spinning phase (if configured via wait_strategy)
if (strategy_.spin_iterations > 0) {
for (size_t i = 0; i < strategy_.spin_iterations; ++i) {
if (inbox_.size_approx() > 0) {
if (inbox_->size_approx() > 0) {
idle_.store(false, std::memory_order_relaxed);
return;
}
Expand Down
64 changes: 52 additions & 12 deletions include/elio/runtime/worker_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class worker_thread {
wait_strategy strategy = wait_strategy::blocking())
: scheduler_(sched)
, worker_id_(worker_id)
, queue_()
, inbox_()
, queue_(std::make_unique<chase_lev_deque<void>>())
, inbox_(std::make_unique<mpsc_queue<void>>())
, running_(false)
, tasks_executed_(0)
, strategy_(strategy)
Expand All @@ -45,8 +45,48 @@ class worker_thread {

worker_thread(const worker_thread&) = delete;
worker_thread& operator=(const worker_thread&) = delete;
worker_thread(worker_thread&&) = delete;
worker_thread& operator=(worker_thread&&) = delete;

worker_thread(worker_thread&& other) noexcept
: scheduler_(other.scheduler_)
, worker_id_(other.worker_id_)
, queue_(std::move(other.queue_))
, inbox_(std::move(other.inbox_))
, thread_(std::move(other.thread_))
, running_(other.running_.load(std::memory_order_relaxed))
, tasks_executed_(other.tasks_executed_.load(std::memory_order_relaxed))
, idle_(other.idle_.load(std::memory_order_relaxed))
, last_task_time_(other.last_task_time_.load(std::memory_order_relaxed))
, needs_sync_(other.needs_sync_)
, strategy_(std::move(other.strategy_))
, io_context_(std::move(other.io_context_)) {
// Reset other's pointers to empty states to prevent double-free
other.queue_ = std::make_unique<chase_lev_deque<void>>();
other.inbox_ = std::make_unique<mpsc_queue<void>>();
}

worker_thread& operator=(worker_thread&& other) noexcept {
if (this != &other) {
stop();

scheduler_ = other.scheduler_;
worker_id_ = other.worker_id_;
queue_ = std::move(other.queue_);
inbox_ = std::move(other.inbox_);
thread_ = std::move(other.thread_);
running_.store(other.running_.load(std::memory_order_relaxed), std::memory_order_relaxed);
tasks_executed_.store(other.tasks_executed_.load(std::memory_order_relaxed), std::memory_order_relaxed);
idle_.store(other.idle_.load(std::memory_order_relaxed), std::memory_order_relaxed);
last_task_time_.store(other.last_task_time_.load(std::memory_order_relaxed), std::memory_order_relaxed);
needs_sync_ = other.needs_sync_;
strategy_ = std::move(other.strategy_);
io_context_ = std::move(other.io_context_);

// Reset other's pointers to empty states to prevent double-free
other.queue_ = std::make_unique<chase_lev_deque<void>>();
other.inbox_ = std::make_unique<mpsc_queue<void>>();
}
return *this;
}

void start();
void stop();
Expand All @@ -63,7 +103,7 @@ class worker_thread {
if (!handle) [[unlikely]] return;

// Try fast path: push to lock-free inbox
if (inbox_.push(handle.address())) [[likely]] {
if (inbox_->push(handle.address())) [[likely]] {
// Lazy wake: only wake if worker is idle (waiting for work)
// This avoids unnecessary wake syscalls when worker is busy
// Use relaxed load - occasional extra wake is fine, we optimize for the common case
Expand All @@ -84,7 +124,7 @@ class worker_thread {
std::this_thread::yield();
#endif
}
if (inbox_.push(handle.address())) {
if (inbox_->push(handle.address())) {
// Lazy wake: only wake if worker is idle
if (idle_.load(std::memory_order_relaxed)) {
wake();
Expand All @@ -99,28 +139,28 @@ class worker_thread {
/// Schedule a task from owner thread - pushes directly to local deque
void schedule_local(std::coroutine_handle<> handle) {
if (!handle) [[unlikely]] return;
queue_.push(handle.address());
queue_->push(handle.address());
}

[[nodiscard]] size_t tasks_executed() const noexcept {
return tasks_executed_.load(std::memory_order_relaxed);
}

[[nodiscard]] size_t queue_size() const noexcept {
return queue_.size() + inbox_.size_approx();
return queue_->size() + inbox_->size_approx();
}

[[nodiscard]] std::coroutine_handle<> steal_task() noexcept {
// Don't allow stealing from stopped workers
if (!running_.load(std::memory_order_acquire)) return nullptr;
void* addr = queue_.steal();
void* addr = queue_->steal();
return addr ? std::coroutine_handle<>::from_address(addr) : nullptr;
}

/// Steal multiple tasks at once for better throughput
template<size_t N>
size_t steal_batch(std::array<void*, N>& output) noexcept {
return queue_.steal_batch(output);
return queue_->steal_batch(output);
}

[[nodiscard]] bool is_running() const noexcept {
Expand Down Expand Up @@ -182,8 +222,8 @@ class worker_thread {

scheduler* scheduler_;
size_t worker_id_;
chase_lev_deque<void> queue_; // Owner's local deque (SPMC)
mpsc_queue<void> inbox_; // External submissions (MPSC)
std::unique_ptr<chase_lev_deque<void>> queue_; // Owner's local deque (SPMC)
std::unique_ptr<mpsc_queue<void>> inbox_; // External submissions (MPSC)
std::thread thread_;
std::atomic<bool> running_;
std::atomic<size_t> tasks_executed_;
Expand Down
2 changes: 1 addition & 1 deletion include/elio/sync/primitives.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ class channel {
other.closed_ = true;
}

channel& operator=(channel&& other) noexcept {
channel& operator=(channel&& other) noexcept(false) {
if (this != &other) {
close();
capacity_ = other.capacity_;
Expand Down