Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions examples/async_file_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,10 @@ int main(int argc, char* argv[]) {
done = true;
};

coro::detail::heap_alloc_guard guard;
auto t = run();
sched.spawn(t.release());
auto handle = coro::detail::task_access::release(t);
sched.spawn(handle);
} else if (mode == "--read") {
std::vector<std::string> files;
for (int i = 2; i < argc; ++i) {
Expand All @@ -283,8 +285,10 @@ int main(int argc, char* argv[]) {
done = true;
};

coro::detail::heap_alloc_guard guard;
auto t = run();
sched.spawn(t.release());
auto handle = coro::detail::task_access::release(t);
sched.spawn(handle);
} else if (argc >= 3) {
// File copy mode
std::string src = argv[1];
Expand All @@ -296,8 +300,10 @@ int main(int argc, char* argv[]) {
done = true;
};

coro::detail::heap_alloc_guard guard;
auto t = run();
sched.spawn(t.release());
auto handle = coro::detail::task_access::release(t);
sched.spawn(handle);
} else {
std::cerr << "Invalid arguments" << std::endl;
return 1;
Expand Down
4 changes: 2 additions & 2 deletions examples/autoscaler_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ int main() {

// Submit heavy workload
for (int i = 0; i < 2000; ++i) {
sched.spawn(workload_task(completed).release());
sched.go([&completed]() { return workload_task(completed); });
}

std::cout << "Phase 1: High load - expecting scale-up..." << std::endl;
Expand Down Expand Up @@ -80,7 +80,7 @@ int main() {

// Submit even heavier workload
for (int i = 0; i < 3000; ++i) {
sched.spawn(workload_task(completed2).release());
sched.go([&completed2]() { return workload_task(completed2); });
}

std::cout << "Phase 2: Higher load - expecting more scale-up..." << std::endl;
Expand Down
90 changes: 46 additions & 44 deletions examples/benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,22 @@ void benchmark_spawn_overhead() {
while (duration_cast<seconds>(high_resolution_clock::now() - bench_start) < MIN_BENCH_DURATION) {
runtime::scheduler sched(4);
sched.start();


std::atomic<int> completed(0);

auto batch_start = high_resolution_clock::now();


auto taskdef = [&completed]() -> coro::task<void> {
completed.fetch_add(1, std::memory_order_release);
co_return;
};

for (int i = 0; i < batch_size; ++i) {
auto t = empty_task();
sched.spawn(t.release());
sched.go(taskdef);
}

// Wait for all to complete
while (sched.pending_tasks() > 0) {
while (completed.load(std::memory_order_acquire) < batch_size) {
std::this_thread::sleep_for(microseconds(1));
}

Expand Down Expand Up @@ -127,24 +133,23 @@ void benchmark_context_switch() {
runtime::scheduler sched(4);
sched.start();

std::atomic<int> completed{0};
auto task_with_await = [&]() -> coro::task<void> {
for (int i = 0; i < awaits_per_task; ++i) {
int value = co_await compute_task(i);
std::atomic<int> completed(0);

auto taskdef = [&completed]() -> coro::task<void> {
for (int j = 0; j < awaits_per_task; ++j) {
int value = co_await compute_task(j);
(void)value;
}
completed.fetch_add(1, std::memory_order_relaxed);
co_return;
};

auto batch_start = high_resolution_clock::now();

for (int i = 0; i < batch_size; ++i) {
auto t = task_with_await();
sched.spawn(t.release());
sched.go(taskdef);
}

while (completed.load(std::memory_order_relaxed) < batch_size) {
std::this_thread::sleep_for(microseconds(1));
}
Expand Down Expand Up @@ -199,12 +204,11 @@ void benchmark_yield() {
runtime::scheduler sched(1); // Single worker thread
sched.start();

std::atomic<int> completed{0};
std::atomic<int64_t> end_time_ns{0}; // Last task records end timestamp

// Each vthread yields multiple times
auto yield_task = [&]() -> coro::task<void> {
for (int i = 0; i < yields_per_vthread; ++i) {
std::atomic<int> completed(0);
std::atomic<int64_t> end_time_ns(0); // Last task records end timestamp

auto taskdef = [&completed, &end_time_ns, num_vthreads]() -> coro::task<void> {
for (int j = 0; j < yields_per_vthread; ++j) {
co_await time::yield();
}
// Last task to complete records the end timestamp
Expand All @@ -215,17 +219,17 @@ void benchmark_yield() {
}
co_return;
};



// Capture start time in main thread
auto start_time_ns = duration_cast<nanoseconds>(
steady_clock::now().time_since_epoch()).count();

// Spawn all vthreads
for (int i = 0; i < num_vthreads; ++i) {
auto t = yield_task();
sched.spawn(t.release());
sched.go(taskdef);
}

// Wait for end_time_ns to be set (spin-wait for accuracy)
while (end_time_ns.load(std::memory_order_acquire) == 0) {
// Spin without yielding for accurate measurement
Expand Down Expand Up @@ -272,32 +276,31 @@ void benchmark_work_stealing() {
runtime::scheduler sched(4);
sched.start();

std::atomic<int> completed{0};
std::atomic<int> completed(0);

// Record initial per-worker task counts
std::vector<size_t> initial_counts(4);
for (size_t i = 0; i < 4; ++i) {
initial_counts[i] = sched.worker_tasks_executed(i);
}
auto heavy_task = [&]() -> coro::task<void> {

auto taskdef = [&completed]() -> coro::task<void> {
volatile int sum = 0;
for (int i = 0; i < 10000; ++i) {
sum = sum + i * i;
for (int j = 0; j < 10000; ++j) {
sum = sum + j * j;
}
(void)sum;
completed.fetch_add(1, std::memory_order_relaxed);
co_return;
};

auto batch_start = high_resolution_clock::now();

// Spawn ALL tasks to worker 0 to test work stealing
for (int i = 0; i < batch_size; ++i) {
auto t = heavy_task();
sched.spawn_to(0, t.release());
sched.go_to(0, taskdef);
}

while (completed.load(std::memory_order_relaxed) < batch_size) {
std::this_thread::sleep_for(microseconds(1));
}
Expand Down Expand Up @@ -364,13 +367,13 @@ void benchmark_scalability() {
runtime::scheduler sched(num_threads);
sched.start();

std::atomic<int> completed{0};
std::atomic<int> completed(0);

auto task_func = [&]() -> coro::task<void> {
auto taskdef = [&completed]() -> coro::task<void> {
// Larger CPU-bound work to minimize scheduling overhead ratio
volatile int sum = 0;
for (int i = 0; i < work_iterations; ++i) {
sum = sum + i * i;
for (int j = 0; j < work_iterations; ++j) {
sum = sum + j * j;
}
(void)sum;
completed.fetch_add(1, std::memory_order_relaxed);
Expand All @@ -381,8 +384,7 @@ void benchmark_scalability() {

// Distribute tasks evenly across workers for true parallel scaling test
for (int i = 0; i < batch_size; ++i) {
auto t = task_func();
sched.spawn(t.release()); // Round-robin distribution
sched.go(taskdef);
}

while (completed.load(std::memory_order_relaxed) < batch_size) {
Expand Down
42 changes: 16 additions & 26 deletions examples/debug_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ coro::task<void> signal_handler_task() {
} while(0)

// Helper awaitable to get promise reference
namespace detail {
namespace debug_detail {
struct get_promise {
bool await_ready() const noexcept { return false; }

Expand All @@ -66,7 +66,7 @@ struct get_promise {
// Level 3: Leaf coroutine that does some work
coro::task<int> compute_value(int x) {
// Set debug location
auto& p = co_await detail::get_promise{};
auto& p = co_await debug_detail::get_promise{};
p.set_location(__FILE__, "compute_value", __LINE__);
p.set_state(coro::coroutine_state::running);

Expand All @@ -79,7 +79,7 @@ coro::task<int> compute_value(int x) {

// Level 2: Middle coroutine
coro::task<int> process_data(int id) {
auto& p = co_await detail::get_promise{};
auto& p = co_await debug_detail::get_promise{};
p.set_location(__FILE__, "process_data", __LINE__);
p.set_state(coro::coroutine_state::running);

Expand All @@ -90,7 +90,7 @@ coro::task<int> process_data(int id) {

// Level 1: Outer coroutine (worker)
coro::task<void> worker_task(int worker_id) {
auto& p = co_await detail::get_promise{};
auto& p = co_await debug_detail::get_promise{};
p.set_location(__FILE__, "worker_task", __LINE__);
p.set_state(coro::coroutine_state::running);

Expand All @@ -107,7 +107,7 @@ coro::task<void> worker_task(int worker_id) {

// Long-running task for debugging
coro::task<void> long_running_task([[maybe_unused]] int id) {
auto& p = co_await detail::get_promise{};
auto& p = co_await debug_detail::get_promise{};
p.set_location(__FILE__, "long_running_task", __LINE__);
p.set_state(coro::coroutine_state::running);

Expand Down Expand Up @@ -141,37 +141,27 @@ coro::task<int> async_main(int argc, char* argv[]) {
std::cout << std::endl;
}

// Spawn some worker tasks
std::vector<coro::task<void>> workers;
for (int i = 0; i < 4; ++i) {
workers.push_back(worker_task(i));
}

// Spawn long-running tasks for debugging
std::vector<coro::task<void>> long_tasks;
for (int i = 0; i < 2; ++i) {
long_tasks.push_back(long_running_task(i));
}

// Get scheduler and spawn tasks
// Get scheduler
auto* sched = runtime::scheduler::current();
if (!sched) {
std::cerr << "Error: No scheduler" << std::endl;
co_return 1;
}

// Spawn signal handler coroutine
auto sig_handler = signal_handler_task();
sched->spawn(sig_handler.release());
sched->go(signal_handler_task);

for (auto& w : workers) {
sched->spawn(w.release());
// Spawn some worker tasks
for (int i = 0; i < 4; ++i) {
sched->go([i]() { return worker_task(i); });
}
for (auto& t : long_tasks) {
sched->spawn(t.release());

// Spawn long-running tasks for debugging
for (int i = 0; i < 2; ++i) {
sched->go([i]() { return long_running_task(i); });
}

std::cout << "Spawned " << workers.size() + long_tasks.size() << " tasks" << std::endl;
std::cout << "Spawned " << 4 + 2 << " tasks" << std::endl;
std::cout << std::endl;

if (pause_mode) {
Expand All @@ -198,5 +188,5 @@ int main(int argc, char* argv[]) {
sigs.block_all_threads();

// Use elio::run() with the async_main coroutine
return elio::run(async_main(argc, argv));
return elio::run([&]() { return async_main(argc, argv); });
}
3 changes: 1 addition & 2 deletions examples/dynamic_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ void run_batch(runtime::scheduler& sched, int num_tasks, const std::string& labe

// Spawn tasks
for (int i = 0; i < num_tasks; ++i) {
auto t = simple_task(completed);
sched.spawn(t.release());
sched.go([&completed]() { return simple_task(completed); });
}

// Wait for completion
Expand Down
4 changes: 2 additions & 2 deletions examples/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ coro::task<int> async_main(int argc, char* argv[]) {
try {
auto tls_ctx = tls::tls_context::make_server(cert_file, key_file);
ELIO_LOG_INFO("Starting HTTPS server on {}", bind_addr.to_string());
co_await elio::serve(srv, srv.listen_tls(bind_addr, tls_ctx, opts));
co_await elio::serve(srv, [&]() { return srv.listen_tls(bind_addr, tls_ctx, opts); });
} catch (const std::exception& e) {
ELIO_LOG_ERROR("Failed to start HTTPS server: {}", e.what());
co_return 1;
}
} else {
ELIO_LOG_INFO("Starting HTTP server on {}", bind_addr.to_string());
co_await elio::serve(srv, srv.listen(bind_addr, opts));
co_await elio::serve(srv, [&]() { return srv.listen(bind_addr, opts); });
}

co_return 0;
Expand Down
5 changes: 3 additions & 2 deletions examples/io_benchmark.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <elio/runtime/scheduler.hpp>
#include <elio/runtime/spawn.hpp>
#include <elio/coro/task.hpp>
#include <elio/io/io_awaitables.hpp>
#include <elio/log/macros.hpp>
Expand Down Expand Up @@ -48,7 +49,7 @@ void benchmark_file_io() {
};

auto start = high_resolution_clock::now();
io_task().go();
elio::go(io_task);

while (completed.load(std::memory_order_acquire) == 0) {
std::this_thread::sleep_for(microseconds(100));
Expand Down Expand Up @@ -104,7 +105,7 @@ void benchmark_concurrent_file_io() {

auto start = high_resolution_clock::now();
for (int i = 0; i < NUM_TASKS; ++i) {
io_task().go();
elio::go(io_task);
}

while (completed.load(std::memory_order_acquire) < NUM_TASKS) {
Expand Down
Loading
Loading