diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index 11a7e1b6..6231f3bf 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -165,6 +165,9 @@ jobs: cmake --build "${build_dir}" --parallel ctest --test-dir "${build_dir}" --verbose + echo "--- Running tests in parallel mode ---" + "${build_dir}/unit_tests" --parallel + - name: Show compiler cache stats shell: bash run: | diff --git a/include/eventide/async/io/stream.h b/include/eventide/async/io/stream.h index 2e0ea33d..b47052e8 100644 --- a/include/eventide/async/io/stream.h +++ b/include/eventide/async/io/stream.h @@ -197,6 +197,9 @@ class tcp : public stream { int port, options opts = options(), event_loop& loop = event_loop::current()); + + /// Query the local address/port of a listening acceptor. + static result local_port(acceptor& acc); }; /// TTY/console wrapper. diff --git a/include/eventide/zest/detail/registry.h b/include/eventide/zest/detail/registry.h index ca1c921f..9d3ca351 100644 --- a/include/eventide/zest/detail/registry.h +++ b/include/eventide/zest/detail/registry.h @@ -19,6 +19,7 @@ enum class TestState { struct TestAttrs { bool skip = false; bool focus = false; + bool serial = false; }; struct TestCase { diff --git a/include/eventide/zest/detail/suite.h b/include/eventide/zest/detail/suite.h index ee32c45d..897879e4 100644 --- a/include/eventide/zest/detail/suite.h +++ b/include/eventide/zest/detail/suite.h @@ -5,6 +5,16 @@ namespace eventide::zest { +/// Merge suite-level and case-level test attributes. +/// Case-level flags override suite defaults when explicitly set to true. +constexpr TestAttrs merge_attrs(TestAttrs suite, TestAttrs test_case) { + return { + .skip = suite.skip || test_case.skip, + .focus = suite.focus || test_case.focus, + .serial = suite.serial || test_case.serial, + }; +} + template struct TestSuiteDef { using Self = Derived; @@ -30,6 +40,14 @@ struct TestSuiteDef { std::size_t line, TestAttrs attrs = {}> inline static bool _register_test_case = [] { + constexpr auto effective_attrs = [] { + if constexpr(requires { Derived::suite_attrs; }) { + return merge_attrs(Derived::suite_attrs, attrs); + } else { + return attrs; + } + }(); + auto run_test = +[] -> TestState { current_test_state() = TestState::Passed; Derived test; @@ -46,7 +64,7 @@ struct TestSuiteDef { return current_test_state(); }; - test_cases().emplace_back(case_name.data(), path.data(), line, attrs, run_test); + test_cases().emplace_back(case_name.data(), path.data(), line, effective_attrs, run_test); return true; }(); }; diff --git a/include/eventide/zest/macro.h b/include/eventide/zest/macro.h index 2c4e9284..f0421e71 100644 --- a/include/eventide/zest/macro.h +++ b/include/eventide/zest/macro.h @@ -7,16 +7,30 @@ #define TEST_SUITE(name, ...) \ struct name##TEST : __VA_OPT__(__VA_ARGS__, )::eventide::zest::TestSuiteDef<#name, name##TEST> +// clang-format off +#define ZEST_MAKE_ATTRS(...) \ + [] constexpr { \ + ::eventide::zest::TestAttrs _a{}; \ + auto& [skip, focus, serial] = _a; \ + __VA_ARGS__; \ + return _a; \ + }() +// clang-format on + +#define TEST_SUITE_ATTRS(...) \ + constexpr static ::eventide::zest::TestAttrs suite_attrs = ZEST_MAKE_ATTRS(__VA_ARGS__) + #define TEST_CASE(name, ...) \ void _register_##name() { \ constexpr auto file_name = std::source_location::current().file_name(); \ constexpr auto file_len = std::string_view(file_name).size(); \ (void)_register_suites<>; \ + constexpr auto _zest_attrs_ = ZEST_MAKE_ATTRS(__VA_OPT__(__VA_ARGS__)); \ (void)_register_test_case<#name, \ &Self::test_##name, \ ::eventide::fixed_string(file_name), \ - std::source_location::current().line() __VA_OPT__(, ) \ - __VA_ARGS__>; \ + std::source_location::current().line(), \ + _zest_attrs_>; \ } \ void test_##name() diff --git a/include/eventide/zest/run.h b/include/eventide/zest/run.h index fa67ccca..635c8e04 100644 --- a/include/eventide/zest/run.h +++ b/include/eventide/zest/run.h @@ -11,6 +11,10 @@ struct RunnerOptions { std::string filter; /// When true, per-test output is limited to failing cases; the final summary is still printed. bool only_failed_output = false; + /// When true, test cases are executed in parallel across a thread pool. + bool parallel = false; + /// Number of worker threads for parallel mode (0 = hardware_concurrency). + unsigned parallel_workers = 0; }; /// Parse CLI arguments into RunnerOptions and execute registered tests. diff --git a/src/async/io/acceptor.cpp b/src/async/io/acceptor.cpp index 46a9cca5..72b5b38e 100644 --- a/src/async/io/acceptor.cpp +++ b/src/async/io/acceptor.cpp @@ -430,4 +430,25 @@ result return tcp::acceptor(std::move(self)); } +result tcp::local_port(tcp::acceptor& acc) { + if(!acc.self) { + return outcome_error(error::invalid_argument); + } + + sockaddr_storage storage{}; + int namelen = sizeof(storage); + int err = uv_tcp_getsockname(&acc->tcp, reinterpret_cast(&storage), &namelen); + if(err != 0) { + return outcome_error(uv::status_to_error(err)); + } + + if(storage.ss_family == AF_INET) { + return ntohs(reinterpret_cast(&storage)->sin_port); + } else if(storage.ss_family == AF_INET6) { + return ntohs(reinterpret_cast(&storage)->sin6_port); + } + + return outcome_error(error::invalid_argument); +} + } // namespace eventide diff --git a/src/async/io/fs.cpp b/src/async/io/fs.cpp index aafcd8e9..603f284b 100644 --- a/src/async/io/fs.cpp +++ b/src/async/io/fs.cpp @@ -617,11 +617,29 @@ task, error> fs::readdir(fs::dir_handle& dir, event_loop namespace { -/// Returns the default loop handle for synchronous libuv fs calls. -/// libuv sync operations (cb = nullptr) don't actually interact with the -/// event loop, so uv_default_loop() is safe to use from any thread. +/// Returns a thread-local loop handle for synchronous libuv fs calls. +/// libuv sync operations (cb = nullptr) don't actually run the event loop, +/// but uv_default_loop() is a process-global singleton and is NOT thread-safe. +/// Using a thread-local loop avoids data races when sync fs calls are made +/// from multiple threads concurrently. uv_loop_t* sync_loop() noexcept { - return uv_default_loop(); + static thread_local struct sync_loop_holder { + uv_loop_t loop{}; + + sync_loop_holder() { + if(uv_loop_init(&loop) != 0) { + std::terminate(); + } + } + + ~sync_loop_holder() { + if(uv_loop_close(&loop) != 0) { + std::terminate(); + } + } + } holder; + + return &holder.loop; } } // namespace diff --git a/src/async/libuv.h b/src/async/libuv.h index 43f36bab..17c5e98b 100644 --- a/src/async/libuv.h +++ b/src/async/libuv.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -546,10 +547,15 @@ ALWAYS_INLINE std::size_t udp_get_send_queue_count(const uv_udp_t& handle) noexc return ::uv_udp_get_send_queue_count(&handle); } -ALWAYS_INLINE error spawn(uv_loop_t& loop, - uv_process_t& process, - const uv_process_options_t& options) noexcept { +/// uv_spawn internally registers a SIGCHLD handler in a process-global +/// red-black tree that is NOT thread-safe. Serialise all spawn calls +/// so that concurrent event-loops on different threads do not race. +inline error spawn(uv_loop_t& loop, + uv_process_t& process, + const uv_process_options_t& options) noexcept { assert(options.file != nullptr && "uv::spawn requires options.file"); + static std::mutex spawn_mutex; + std::lock_guard lock(spawn_mutex); return status_to_error(::uv_spawn(&loop, &process, &options)); } diff --git a/src/zest/runner.cpp b/src/zest/runner.cpp index f9f0cea9..f7c4bdc4 100644 --- a/src/zest/runner.cpp +++ b/src/zest/runner.cpp @@ -1,10 +1,15 @@ +#include +#include #include #include +#include #include #include #include #include +#include #include +#include #include #include "eventide/deco/deco.h" @@ -34,6 +39,17 @@ struct ZestCliOptions { DecoFlag(names = {"--only-failed"}; help = "Only print failed test cases"; required = false) only_failed = false; + + DecoFlag(names = {"--parallel"}; help = "Run test cases in parallel"; required = false) + parallel = false; + + DecoKVStyled( + style = deco::decl::KVStyle::Joined | deco::decl::KVStyle::Separate, + names = {"--parallel-workers"}; + meta_var = ""; + help = "Number of worker threads for parallel mode (default: hardware_concurrency)"; + required = false) + parallel_workers = 0; }; auto to_runner_options(ZestCliOptions options) @@ -44,6 +60,8 @@ auto to_runner_options(ZestCliOptions options) eventide::zest::RunnerOptions runner_options; runner_options.only_failed_output = *options.only_failed; + runner_options.parallel = *options.parallel; + runner_options.parallel_workers = *options.parallel_workers; if(options.test_filter_input.has_value()) { runner_options.filter = std::move(*options.test_filter_input); } else { @@ -72,6 +90,14 @@ struct RunSummary { std::vector failed_tests; }; +struct TestResult { + std::string display_name; + std::string path; + std::size_t line; + eventide::zest::TestState state; + std::chrono::milliseconds duration; +}; + using SuiteMap = std::unordered_map>; bool matches_pattern(std::string_view text, std::string_view pattern) { @@ -274,20 +300,29 @@ int Runner::run_tests(RunnerOptions options) { std::println("{}[ FOCUS ] Running in focus-only mode.{}", yellow, clear); } + // Collect all runnable test cases. + struct RunnableTest { + std::string display_name; + std::string path; + std::size_t line; + bool serial; + std::function test; + }; + + std::vector runnable; + std::unordered_set active_suites; + for(auto& [suite_name, test_cases]: grouped_suites) { if(!matches_suite_filter(suite_name, patterns)) { continue; } - bool suite_has_tests = false; - for(auto& test_case: test_cases) { if(!matches_test_filter(suite_name, test_case.name, patterns)) { continue; } const auto display_name = make_display_name(suite_name, test_case.name); - suite_has_tests = true; if(focus_mode && !test_case.attrs.focus) { summary.skipped += 1; @@ -302,30 +337,124 @@ int Runner::run_tests(RunnerOptions options) { continue; } - if(!options.only_failed_output) { - std::println("{}[ RUN ] {}{}", green, display_name, clear); + active_suites.insert(std::string(suite_name)); + runnable.push_back(RunnableTest{ + .display_name = display_name, + .path = test_case.path, + .line = test_case.line, + .serial = test_case.attrs.serial, + .test = std::move(test_case.test), + }); + } + } + + summary.suites = static_cast(active_suites.size()); + summary.tests = static_cast(runnable.size()); + + auto run_single = [&](const RunnableTest& test, bool show_run_line) -> TestResult { + if(show_run_line && !options.only_failed_output) { + std::println("{}[ RUN ] {}{}", green, test.display_name, clear); + } + + using namespace std::chrono; + auto begin = system_clock::now(); + auto state = test.test(); + auto end = system_clock::now(); + + return TestResult{ + .display_name = test.display_name, + .path = test.path, + .line = test.line, + .state = state, + .duration = duration_cast(end - begin), + }; + }; + + auto record_result = [&](const TestResult& result) { + const bool failed = is_failure(result.state); + print_run_result(result.display_name, failed, result.duration, options.only_failed_output); + summary.duration += result.duration; + if(failed) { + summary.failed += 1; + summary.failed_tests.push_back( + FailedTest{result.display_name, result.path, result.line}); + } + }; + + // Execute tests. + std::vector results(runnable.size()); + + if(options.parallel) { + using namespace std::chrono; + auto wall_begin = system_clock::now(); + + // Partition: parallel-safe tests first, serial tests after. + std::vector parallel_indices; + std::vector serial_indices; + for(std::size_t i = 0; i < runnable.size(); ++i) { + if(runnable[i].serial) { + serial_indices.push_back(i); + } else { + parallel_indices.push_back(i); } - summary.tests += 1; + } - using namespace std::chrono; - auto begin = system_clock::now(); - auto state = test_case.test(); - auto end = system_clock::now(); + // Run parallel-safe tests across the thread pool. + const auto num_workers = + std::min(static_cast(std::max(1u, + options.parallel_workers + ? options.parallel_workers + : std::thread::hardware_concurrency())), + parallel_indices.size()); + + std::atomic next_task{0}; + + auto worker = [&]() { + while(true) { + auto idx = next_task.fetch_add(1, std::memory_order_relaxed); + if(idx >= parallel_indices.size()) { + break; + } + auto i = parallel_indices[idx]; + results[i] = run_single(runnable[i], false); + } + }; - auto duration = duration_cast(end - begin); - const bool failed = is_failure(state); - print_run_result(display_name, failed, duration, options.only_failed_output); + { + std::vector pool; + pool.reserve(num_workers); + for(unsigned w = 0; w < num_workers; ++w) { + pool.emplace_back(worker); + } + for(auto& t: pool) { + t.join(); + } + } + + // Run serial tests sequentially after the parallel batch. + for(auto i: serial_indices) { + results[i] = run_single(runnable[i], false); + } - summary.duration += duration; + summary.duration = duration_cast(system_clock::now() - wall_begin); + + // Print all results in original order. + for(const auto& result: results) { + const bool failed = is_failure(result.state); + print_run_result(result.display_name, + failed, + result.duration, + options.only_failed_output); if(failed) { summary.failed += 1; summary.failed_tests.push_back( - FailedTest{display_name, test_case.path, test_case.line}); + FailedTest{result.display_name, result.path, result.line}); } } - - if(suite_has_tests) { - summary.suites += 1; + } else { + for(std::size_t i = 0; i < runnable.size(); ++i) { + results[i] = run_single(runnable[i], true); + record_result(results[i]); } } diff --git a/tests/unit/async/cancellation_tests.cpp b/tests/unit/async/cancellation_tests.cpp index 2189a00e..e9b0a391 100644 --- a/tests/unit/async/cancellation_tests.cpp +++ b/tests/unit/async/cancellation_tests.cpp @@ -121,7 +121,7 @@ TEST_CASE(token_share_state) { EXPECT_TRUE(token_b.cancelled()); } -TEST_CASE(queue_cancel_resume) { +TEST_CASE(queue_cancel_resume, serial = true) { cancellation_source source; event start_target; event target_submitted; @@ -207,7 +207,7 @@ TEST_CASE(queue_cancel_resume) { EXPECT_FALSE(target_started.load(std::memory_order_acquire)); } -TEST_CASE(fs_cancel_resume) { +TEST_CASE(fs_cancel_resume, serial = true) { cancellation_source source; event start_target; event target_submitted; diff --git a/tests/unit/async/process_tests.cpp b/tests/unit/async/process_tests.cpp index 2b466487..79ab320d 100644 --- a/tests/unit/async/process_tests.cpp +++ b/tests/unit/async/process_tests.cpp @@ -52,6 +52,8 @@ task, result>> read_two_chunks(pipe p TEST_SUITE(process_io, loop_fixture) { +TEST_SUITE_ATTRS(serial = true); + TEST_CASE(spawn_wait_simple) { process::options opts; #ifdef _WIN32 @@ -290,13 +292,16 @@ TEST_CASE(query_info_child) { process::options opts; #ifdef _WIN32 opts.file = "cmd.exe"; - // Run a command that takes a moment so we can query it while alive. - opts.args = {opts.file, "/c", "ping -n 2 127.0.0.1 >nul"}; + // Echo a ready marker then block on stdin until the parent closes it. + opts.args = {opts.file, "/c", "echo x & set /p dummy="}; #else opts.file = "/bin/sh"; - opts.args = {opts.file, "-c", "sleep 0.2"}; + // Print a ready marker then block on stdin until the parent closes the pipe. + opts.args = {opts.file, "-c", "printf x; read _"}; #endif - opts.streams = {process::stdio::ignore(), process::stdio::ignore(), process::stdio::ignore()}; + opts.streams = {process::stdio::pipe(true, false), + process::stdio::pipe(false, true), + process::stdio::ignore()}; auto spawn_res = process::spawn(opts, loop); ASSERT_TRUE(spawn_res.has_value()); @@ -304,19 +309,30 @@ TEST_CASE(query_info_child) { auto pid = spawn_res->proc.pid(); EXPECT_GT(pid, 0); - // Query via the instance method. - auto info = spawn_res->proc.query_info(); - ASSERT_TRUE(info.has_value()); - EXPECT_EQ(info->pid, pid); - EXPECT_GT(info->rss, std::size_t{0}); - - // Query via the static overload. - auto info2 = process::query_info(pid); - ASSERT_TRUE(info2.has_value()); - EXPECT_EQ(info2->pid, pid); + auto verify = [&]() -> task { + // Wait until the child has written to stdout — it is definitely running. + auto data = co_await spawn_res->stdout_pipe.read(); + EXPECT_TRUE(data.has_value()); + + // Query via the instance method. + auto info = spawn_res->proc.query_info(); + CO_ASSERT_TRUE(info.has_value()); + EXPECT_EQ(info->pid, pid); + EXPECT_GT(info->rss, std::size_t{0}); + + // Query via the static overload. + auto info2 = process::query_info(pid); + CO_ASSERT_TRUE(info2.has_value()); + EXPECT_EQ(info2->pid, pid); + + // Destroy stdin pipe so the child gets EOF and exits. + { auto drop = std::move(spawn_res->stdin_pipe); } + co_await spawn_res->proc.wait(); + event_loop::current().stop(); + }; - auto worker = wait_for_exit(spawn_res->proc); - schedule_all(worker); + auto task = verify(); + schedule_all(task); } TEST_CASE(query_info_invalid_pid) { diff --git a/tests/unit/async/stream_tests.cpp b/tests/unit/async/stream_tests.cpp index 56938817..8409a37e 100644 --- a/tests/unit/async/stream_tests.cpp +++ b/tests/unit/async/stream_tests.cpp @@ -412,7 +412,7 @@ TEST_CASE(connect_and_accept) { } } -TEST_CASE(connect_failure) { +TEST_CASE(connect_failure, serial = true) { #ifdef _WIN32 const std::string name = "\\\\.\\pipe\\eventide-test-pipe-missing"; #else @@ -431,7 +431,7 @@ TEST_CASE(connect_failure) { EXPECT_FALSE(client_res.has_value()); } -TEST_CASE(stop) { +TEST_CASE(stop, serial = true) { #ifdef _WIN32 const std::string name = "\\\\.\\pipe\\eventide-test-pipe-missing"; #else diff --git a/tests/unit/async/when_tests.cpp b/tests/unit/async/when_tests.cpp index 3278091d..e29ddd2f 100644 --- a/tests/unit/async/when_tests.cpp +++ b/tests/unit/async/when_tests.cpp @@ -9,14 +9,17 @@ namespace eventide { namespace { struct deferred_cancel_await : system_op { - inline static deferred_cancel_await* pending = nullptr; + static deferred_cancel_await*& pending() { + thread_local deferred_cancel_await* p = nullptr; + return p; + } int* destroyed = nullptr; explicit deferred_cancel_await(int& destroyed_count) : destroyed(&destroyed_count) { - assert(pending == nullptr && "only one deferred_cancel_await may be pending at a time"); + assert(pending() == nullptr && "only one deferred_cancel_await may be pending at a time"); action = &on_cancel; - pending = this; + pending() = this; } deferred_cancel_await(const deferred_cancel_await&) = delete; @@ -28,8 +31,8 @@ struct deferred_cancel_await : system_op { if(destroyed) { *destroyed += 1; } - if(pending == this) { - pending = nullptr; + if(pending() == this) { + pending() = nullptr; } } @@ -49,9 +52,9 @@ struct deferred_cancel_await : system_op { void await_resume() const noexcept {} static void finish_pending_cancel() { - auto* op = pending; + auto* op = pending(); assert(op != nullptr && "finish_pending_cancel requires a pending awaiter"); - pending = nullptr; + pending() = nullptr; op->complete(); } }; diff --git a/tests/unit/common/small_vector_tests.cpp b/tests/unit/common/small_vector_tests.cpp index 44953164..448ece1b 100644 --- a/tests/unit/common/small_vector_tests.cpp +++ b/tests/unit/common/small_vector_tests.cpp @@ -44,7 +44,7 @@ struct move_only { } }; -static int nontrivial_alive = 0; +static thread_local int nontrivial_alive = 0; struct nontrivial { int value; @@ -1389,7 +1389,7 @@ TEST_CASE(assign_from_empty_hybrid_vector) { #ifdef __cpp_exceptions struct throwing_copy { - inline static int throw_after = -1; + inline thread_local static int throw_after = -1; int value = 0; throwing_copy() = default; diff --git a/tests/unit/deco/runtime.cc b/tests/unit/deco/runtime.cc index 93eabe5a..471b6ac2 100644 --- a/tests/unit/deco/runtime.cc +++ b/tests/unit/deco/runtime.cc @@ -134,10 +134,10 @@ struct TrailingOnlyOpt { }; struct CallbackStopState { - inline static unsigned arg_index = 0; - inline static unsigned next_cursor = 0; - inline static std::size_t argv_size = 0; - inline static std::string value; + inline thread_local static unsigned arg_index = 0; + inline thread_local static unsigned next_cursor = 0; + inline thread_local static std::size_t argv_size = 0; + inline thread_local static std::string value; static void reset() { arg_index = 0; @@ -162,9 +162,9 @@ struct CallbackStopOpt { }; struct CallbackRestartState { - inline static unsigned arg_index = 0; - inline static unsigned next_cursor = 0; - inline static std::string value; + inline thread_local static unsigned arg_index = 0; + inline thread_local static unsigned next_cursor = 0; + inline thread_local static std::string value; static void reset() { arg_index = 0; @@ -190,9 +190,9 @@ struct CallbackRestartOpt { }; struct CallbackRestartOwnedState { - inline static unsigned arg_index = 0; - inline static unsigned next_cursor = 0; - inline static std::string value; + inline thread_local static unsigned arg_index = 0; + inline thread_local static unsigned next_cursor = 0; + inline thread_local static std::string value; static void reset() { arg_index = 0; @@ -217,7 +217,7 @@ struct CallbackRestartOwnedOpt { }; struct CallbackRestartTwiceState { - inline static unsigned restart_count = 0; + inline thread_local static unsigned restart_count = 0; static void reset() { restart_count = 0; @@ -251,11 +251,11 @@ struct CallbackShortcutOpt { }; struct CallbackComposeState { - inline static unsigned arg_index = 0; - inline static unsigned next_cursor = 0; - inline static std::size_t argv_size = 0; - inline static bool value = false; - inline static unsigned count = 0; + inline thread_local static unsigned arg_index = 0; + inline thread_local static unsigned next_cursor = 0; + inline thread_local static std::size_t argv_size = 0; + inline thread_local static bool value = false; + inline thread_local static unsigned count = 0; static void reset() { arg_index = 0; @@ -318,7 +318,7 @@ namespace { template std::span into_deco_args(Args&&... args) { - static std::vector res; + static thread_local std::vector res; res.clear(); res.reserve(sizeof...(args)); (res.emplace_back(std::forward(args)), ...);