Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions include/elio/coro/frame_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@
#include <cstdint>
#include <mutex>

// Architecture-specific CPU pause/yield hint for tight spin loops.
// Reduces power consumption and allows the HT sibling to run.
#if defined(__x86_64__) || defined(__i386__)
# define ELIO_CPU_PAUSE() __builtin_ia32_pause()
#elif defined(__aarch64__) || defined(__arm__)
# define ELIO_CPU_PAUSE() __asm__ __volatile__("yield" ::: "memory")
#else
# include <thread>
# define ELIO_CPU_PAUSE() std::this_thread::yield()
#endif

namespace elio::coro {

/// Thread-local free-list based frame allocator for small coroutine frames
Expand Down Expand Up @@ -167,14 +178,19 @@ class frame_allocator {
while (head && count < REMOTE_QUEUE_BATCH && free_count_ < POOL_SIZE) {
block_header* next = head->next.load(std::memory_order_acquire);

// If next is null but tail points elsewhere, spin briefly
// (producer is in the middle of push)
// If next is null but tail points elsewhere, the producer is in the
// middle of push() (has done the tail exchange but not yet written
// prev->next). Spin briefly with a CPU pause hint.
if (!next && remote_tail_.load(std::memory_order_acquire) != head) {
// Spin wait for producer to complete
for (int i = 0; i < 100 && !head->next.load(std::memory_order_acquire); ++i) {
// Brief spin
for (int i = 0; i < 16; ++i) {
ELIO_CPU_PAUSE();
next = head->next.load(std::memory_order_acquire);
if (next) break;
}
next = head->next.load(std::memory_order_acquire);
// If the link still isn't ready, stop without consuming 'head'.
// Consuming it would leave the queue in a broken state because
// the producer would later write through a recycled pointer.
if (!next) break;
}

pool_[free_count_++] = head;
Expand All @@ -190,12 +206,16 @@ class frame_allocator {
while (head) {
block_header* next = head->next.load(std::memory_order_acquire);

// If next is null but tail points elsewhere, spin briefly
// Same safe spin pattern as reclaim_remote_returns(), but with more
// retries because we're in teardown and really want to drain the queue.
if (!next && remote_tail_.load(std::memory_order_acquire) != head) {
for (int i = 0; i < 1000 && !head->next.load(std::memory_order_acquire); ++i) {
// Brief spin
for (int i = 0; i < 32; ++i) {
ELIO_CPU_PAUSE();
next = head->next.load(std::memory_order_acquire);
if (next) break;
}
next = head->next.load(std::memory_order_acquire);
// Stop safely rather than risk corrupting a partially-linked node.
if (!next) break;
}

if (free_count_ < POOL_SIZE) {
Expand Down
9 changes: 7 additions & 2 deletions include/elio/coro/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ template<typename T>
struct join_state {
std::optional<T> value_;
std::exception_ptr exception_;
std::atomic<void*> waiter_{nullptr}; // Stores coroutine_handle address
// waiter_/completed_ are the hot path: written by the coroutine producer
// (complete()) and the awaiting consumer (set_waiter()). Aligning them
// to a new cache line separates them from value_/exception_ which may
// be large and written only once, preventing false sharing.
alignas(64) std::atomic<void*> waiter_{nullptr}; // Stores coroutine_handle address
std::atomic<bool> completed_{false};

void set_value(T&& value) {
Expand Down Expand Up @@ -109,7 +113,8 @@ struct join_state {
template<>
struct join_state<void> {
std::exception_ptr exception_;
std::atomic<void*> waiter_{nullptr};
// Hot atomics on their own cache line (see join_state<T> comment above)
alignas(64) std::atomic<void*> waiter_{nullptr};
std::atomic<bool> completed_{false};

void set_value() {
Expand Down
19 changes: 15 additions & 4 deletions include/elio/io/io_uring_backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
#include <atomic>
#include <mutex>
#include <unordered_map>
#include <algorithm>
#include <thread>

namespace elio::io {

/// Number of shards for resume tracking map to reduce lock contention
/// Using a power of 2 allows fast modulo via bitwise AND
static constexpr size_t kResumeTrackingShards = 16;
/// Number of shards for resume tracking map to reduce lock contention.
/// 64 shards accommodate up to ~64-core machines without measurable contention;
/// must be a power of 2 for efficient bitwise modulo.
static constexpr size_t kResumeTrackingShards = 64;

/// Get the shard index for a given user_data pointer
inline size_t get_resume_tracking_shard(const void* user_data) {
Expand Down Expand Up @@ -57,7 +60,15 @@ class io_uring_backend : public io_backend {
public:
/// Configuration options
struct config {
uint32_t queue_depth = 256; ///< SQ/CQ depth
/// SQ/CQ depth: defaults to 512 * num_hw_threads, clamped to [1024, 32768].
/// A larger ring reduces SQE exhaustion under high concurrency without
/// wasting memory on small machines.
uint32_t queue_depth = []() -> uint32_t {
unsigned hw = std::thread::hardware_concurrency();
if (hw == 0) hw = 4; // conservative default if detection fails
auto d = static_cast<uint32_t>(hw) * 512u;
return std::clamp(d, 1024u, 32768u);
}();
uint32_t flags = 0; ///< io_uring_setup flags
bool sq_poll = false; ///< Enable SQ polling (requires privileges)
};
Expand Down
52 changes: 37 additions & 15 deletions include/elio/runtime/chase_lev_deque.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,24 +199,46 @@ class chase_lev_deque {
return nullptr;
}

/// Steal multiple elements at once (thieves only) - lock-free
/// WARNING: This function has a race condition with pop() when the owner
/// pops items between when we read bottom and when we load items.
/// Use steal() for single-item stealing which is properly synchronized.
/// This function is kept for reference but should not be used.
/// Steal up to N elements atomically (thieves only) - lock-free
///
/// Uses a single CAS on top_ to claim min(size/2, N, 1) slots at once,
/// which dramatically reduces CAS overhead compared to N individual steal()
/// calls when many items are available.
///
/// Safety: the CAS atomically reserves the range [t, t+batch); the owner
/// pops from the bottom (indices ≥ new bottom_), so there is no overlap
/// as long as batch ≤ available = b − t, which is enforced below.
///
/// Returns the number of items stored in output[0..N-1].
template<size_t N>
[[deprecated("Use steal() instead - steal_batch has race conditions with pop()")]]
size_t steal_batch(std::array<T*, N>& output) noexcept {
size_t stolen = 0;

while (stolen < N) {
// Single-item steal to avoid race with owner's pop
T* item = steal();
if (!item) break;
output[stolen++] = item;
size_t t = top_.load(std::memory_order_acquire);
// seq_cst fence: required for correctness with pop() (same as steal())
std::atomic_thread_fence(std::memory_order_seq_cst);
size_t b = bottom_.load(std::memory_order_acquire);

if (t >= b) return 0; // empty

size_t available = b - t;
// Steal at most half the queue (work-stealing heuristic), at least 1
size_t batch = std::min(std::max(available / 2, size_t{1}), N);

// Atomically claim [t, t+batch) by advancing top_
if (!top_.compare_exchange_strong(t, t + batch,
std::memory_order_acq_rel, std::memory_order_relaxed)) {
return 0; // Lost race with another thief or owner
}

return stolen;

// We exclusively own slots [t, t+batch). Load items from the buffer.
// buffer_.load(acquire) ensures we see any resize that happened before
// our CAS; old buffers are kept alive in old_buffers_ so this is safe
// even if a resize happened concurrently.
circular_buffer* buf = buffer_.load(std::memory_order_acquire);
for (size_t i = 0; i < batch; ++i) {
output[i] = buf->load(t + i);
}

return batch;
}

[[nodiscard]] size_t size() const noexcept {
Expand Down
15 changes: 12 additions & 3 deletions include/elio/runtime/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,20 @@ class scheduler {
}

std::vector<std::unique_ptr<worker_thread>> workers_;
std::atomic<size_t> num_threads_;

// Frequently-read fields on their own cache line to avoid false sharing
// with the spawn counter and the slow-path workers_mutex_.
alignas(64) std::atomic<size_t> num_threads_;
std::atomic<bool> running_;
std::atomic<bool> paused_;
std::atomic<size_t> spawn_index_;
mutable std::mutex workers_mutex_;

// spawn_index_ is incremented on every spawn(); isolate it so modifications
// don't invalidate the num_threads_/running_ cache line on other cores.
alignas(64) std::atomic<size_t> spawn_index_;

// workers_mutex_ is only touched on slow-path resize operations;
// keep it away from the hot read fields above.
alignas(64) mutable std::mutex workers_mutex_;
wait_strategy wait_strategy_;

static inline thread_local scheduler* current_scheduler_ = nullptr;
Expand Down
Loading
Loading