diff --git a/Cargo.lock b/Cargo.lock index 33d4cd5..9ca61fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,6 +10,12 @@ version = "0.1.0" name = "elementary" version = "0.0.1" +[[package]] +name = "libc" +version = "0.2.183" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" + [[package]] name = "log_builtin" version = "0.0.1" @@ -100,6 +106,13 @@ dependencies = [ "elementary", ] +[[package]] +name = "thread" +version = "0.1.0" +dependencies = [ + "libc", +] + [[package]] name = "unicode-ident" version = "1.0.22" diff --git a/Cargo.toml b/Cargo.toml index 8aa01c8..2a1bb1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ default-members = [ "src/containers", "src/sync", "src/elementary", + "src/thread", "src/log/score_log_fmt_macro", "src/log/stdout_logger", "src/testing_macros", @@ -28,6 +29,7 @@ members = [ "src/containers", "src/sync", "src/elementary", + "src/thread", "src/log/score_log", "src/log/score_log_fmt", "src/log/score_log_fmt_macro", @@ -44,12 +46,15 @@ license-file = "LICENSE.md" authors = ["S-CORE Contributors"] [workspace.dependencies] +libc = "0.2.177" + score_log = { path = "src/log/score_log" } score_log_fmt = { path = "src/log/score_log_fmt" } score_log_fmt_macro = { path = "src/log/score_log_fmt_macro" } stdout_logger = { path = "src/log/stdout_logger" } elementary = { path = "src/elementary" } testing_macros = { path = "src/testing_macros" } +thread = { path = "src/thread" } [workspace.lints.clippy] std_instead_of_core = "warn" diff --git a/src/thread/BUILD b/src/thread/BUILD new file mode 100644 index 0000000..464e56a --- /dev/null +++ b/src/thread/BUILD @@ -0,0 +1,31 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + +load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test") + +rust_library( + name = "thread", + srcs = glob(["**/*.rs"]), + edition = "2021", + visibility = ["//visibility:public"], + deps = ["@score_crates//:libc"], +) + +rust_test( + name = "tests", + crate = "thread", + tags = [ + "unit_tests", + "ut", + ], +) diff --git a/src/thread/Cargo.toml b/src/thread/Cargo.toml new file mode 100644 index 0000000..e5fd456 --- /dev/null +++ b/src/thread/Cargo.toml @@ -0,0 +1,26 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + +[package] +name = "thread" +description = "`pthread`-based parametrizable threading module." +version = "0.1.0" +authors = ["Contributors to the Eclipse Foundation"] +edition = "2021" +license-file = "../../LICENSE.md" + +[lib] +path = "lib.rs" + +[dependencies] +libc.workspace = true diff --git a/src/thread/lib.rs b/src/thread/lib.rs new file mode 100644 index 0000000..0ba50e3 --- /dev/null +++ b/src/thread/lib.rs @@ -0,0 +1,20 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +//! `pthread`-based parametrizable threading module. + +mod parameters; +mod thread; + +pub use parameters::{SchedulerParameters, SchedulerPolicy, ThreadParameters}; +pub use thread::{spawn, JoinHandle}; diff --git a/src/thread/parameters.rs b/src/thread/parameters.rs new file mode 100644 index 0000000..912a1ca --- /dev/null +++ b/src/thread/parameters.rs @@ -0,0 +1,182 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +/// Scheduler policy. +#[repr(i32)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum SchedulerPolicy { + Other = libc::SCHED_OTHER, + Fifo = libc::SCHED_FIFO, + RoundRobin = libc::SCHED_RR, +} + +impl SchedulerPolicy { + /// Get min thread priority for this policy. + pub fn priority_min(&self) -> i32 { + let policy_native = *self as i32; + unsafe { libc::sched_get_priority_min(policy_native) } + } + + /// Get max thread priority for this policy. + pub fn priority_max(&self) -> i32 { + let policy_native = *self as i32; + unsafe { libc::sched_get_priority_max(policy_native) } + } +} + +impl From for SchedulerPolicy { + fn from(value: i32) -> Self { + match value { + libc::SCHED_FIFO => SchedulerPolicy::Fifo, + libc::SCHED_RR => SchedulerPolicy::RoundRobin, + libc::SCHED_OTHER => SchedulerPolicy::Other, + _ => panic!("unknown or unsupported scheduler policy"), + } + } +} + +/// Scheduler parameters. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct SchedulerParameters { + policy: SchedulerPolicy, + priority: i32, +} + +impl SchedulerParameters { + /// Create a new [`SchedulerParameters`]. + /// + /// # Panics + /// + /// Priority must be in allowed range for the scheduler policy. + pub fn new(policy: SchedulerPolicy, priority: i32) -> Self { + let allowed_priority_range = policy.priority_min()..=policy.priority_max(); + if !allowed_priority_range.contains(&priority) { + panic!("priority is not in allowed range for the scheduler policy") + } + + Self { policy, priority } + } + + /// Scheduler policy. + pub fn policy(&self) -> SchedulerPolicy { + self.policy + } + + /// Thread priority. + pub fn priority(&self) -> i32 { + self.priority + } +} + +/// Thread parameters. +#[derive(Clone, Default, Debug, PartialEq, Eq)] +pub struct ThreadParameters { + pub(crate) scheduler_parameters: Option, + pub(crate) affinity: Option>, + pub(crate) stack_size: Option, +} + +impl ThreadParameters { + /// Create a new [`ThreadParameters`] containing default values. + pub fn new() -> Self { + Self::default() + } + + /// Scheduler parameters, including scheduler policy and thread priority. + pub fn scheduler_parameters(mut self, scheduler_parameters: SchedulerParameters) -> Self { + self.scheduler_parameters = Some(scheduler_parameters); + self + } + + /// Set thread affinity - array of CPU core IDs that the thread can run on. + pub fn affinity(mut self, affinity: &[usize]) -> Self { + self.affinity = Some(Box::from(affinity)); + self + } + + /// Set stack size. + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.stack_size = Some(stack_size); + self + } +} + +#[cfg(test)] +mod tests { + use crate::{ + parameters::{SchedulerParameters, SchedulerPolicy}, + ThreadParameters, + }; + + #[test] + fn scheduler_policy_min_max_priority() { + let policy = SchedulerPolicy::Fifo; + assert_eq!(policy.priority_min(), 1); + assert_eq!(policy.priority_max(), 99); + } + + #[test] + fn scheduler_policy_from_i32_succeeds() { + let policy = SchedulerPolicy::from(0); + assert_eq!(policy, SchedulerPolicy::Other); + } + + #[test] + #[should_panic(expected = "unknown or unsupported scheduler policy")] + fn scheduler_policy_from_i32_unknown() { + let _ = SchedulerPolicy::from(123); + } + + #[test] + fn scheduler_parameters_new_succeeds() { + let policy = SchedulerPolicy::Fifo; + let priority = 40; + let params = SchedulerParameters::new(policy, priority); + assert_eq!(params.policy(), policy); + assert_eq!(params.priority(), priority); + } + + #[test] + #[should_panic(expected = "priority is not in allowed range for the scheduler policy")] + fn scheduler_parameters_new_priority_out_of_range() { + let policy = SchedulerPolicy::Other; + let priority = 1; + let _ = SchedulerParameters::new(policy, priority); + } + + #[test] + fn thread_parameters_new_succeeds() { + let new_tp = ThreadParameters::new(); + let def_tp = ThreadParameters::default(); + + assert_eq!(new_tp.scheduler_parameters, None); + assert_eq!(new_tp.affinity, None); + assert_eq!(new_tp.stack_size, None); + assert_eq!(new_tp, def_tp); + } + + #[test] + fn thread_parameters_parameters_succeeds() { + let exp_scheduler_parameters = SchedulerParameters::new(SchedulerPolicy::Fifo, 50); + let exp_affinity = vec![1, 2, 3]; + let exp_stack_size = 1024 * 1024; + let thread_parameters = ThreadParameters::new() + .scheduler_parameters(exp_scheduler_parameters) + .affinity(&exp_affinity) + .stack_size(exp_stack_size); + + assert_eq!(thread_parameters.scheduler_parameters, Some(exp_scheduler_parameters)); + assert_eq!(thread_parameters.affinity, Some(Box::from(exp_affinity))); + assert_eq!(thread_parameters.stack_size, Some(exp_stack_size)); + } +} diff --git a/src/thread/thread.rs b/src/thread/thread.rs new file mode 100644 index 0000000..c75817f --- /dev/null +++ b/src/thread/thread.rs @@ -0,0 +1,551 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::parameters::{SchedulerPolicy, ThreadParameters}; +use core::marker::PhantomData; +use core::mem::MaybeUninit; + +/// Expected [`libc::cpu_set_t`] size in bits. +const CPU_SET_SIZE: usize = 1024; + +const _U8_SIZE: usize = u8::BITS as usize; +const _DATA_SIZE: usize = CPU_SET_SIZE / _U8_SIZE; + +/// Wrapper for [`libc::cpu_set_t`]. +struct CpuSet { + data: [u8; CPU_SET_SIZE / _U8_SIZE], +} + +impl CpuSet { + /// Create a new [`CpuSet`] based on provided `affinity`. + fn new(affinity: &[usize]) -> Self { + let mut data = [0; CPU_SET_SIZE / _U8_SIZE]; + + // Set affinity as bits. + for cpu_id in affinity.iter().copied() { + const MAX_ID: usize = CPU_SET_SIZE - 1; + if cpu_id > MAX_ID { + panic!("CPU ID provided to affinity exceeds max supported size, provided: {cpu_id}, max: {MAX_ID}"); + } + + let index = cpu_id / _U8_SIZE; + let offset = cpu_id % _U8_SIZE; + data[index] |= 1 << offset; + } + + Self { data } + } + + /// Get inner representation. + fn get(self) -> libc::cpu_set_t { + unsafe { core::mem::transmute(self.data) } + } +} + +/// `pthread` attributes object. +struct Attributes { + attr_handle: libc::pthread_attr_t, +} + +impl Attributes { + /// Create `pthread` attributes object. + fn new() -> Self { + let mut attr = MaybeUninit::uninit(); + let rc = unsafe { libc::pthread_attr_init(attr.as_mut_ptr()) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_init failed, rc: {rc}, errno: {errno}"); + } + + let attr_handle = unsafe { attr.assume_init() }; + Self { attr_handle } + } + + /// Pointer to mutable internal handle. + fn ptr(&mut self) -> *mut libc::pthread_attr_t { + &mut self.attr_handle as *mut _ + } + + /// Set inherit scheduling attributes. + fn inherit_scheduling_attributes(&mut self, inherit: bool) { + let inherit_native = if inherit { + libc::PTHREAD_INHERIT_SCHED + } else { + libc::PTHREAD_EXPLICIT_SCHED + }; + let rc = unsafe { libc::pthread_attr_setinheritsched(self.ptr(), inherit_native) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_setinheritsched failed, rc: {rc}, errno: {errno}"); + } + } + + /// Set thread priority. + fn priority(&mut self, priority: i32) { + let params = libc::sched_param { + sched_priority: priority, + }; + let rc = unsafe { libc::pthread_attr_setschedparam(self.ptr(), ¶ms as *const _) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_setschedparam failed, rc: {rc}, errno: {errno}"); + } + } + + /// Set scheduler policy. + fn scheduler_policy(&mut self, scheduler_policy: SchedulerPolicy) { + let policy = scheduler_policy as i32; + let rc = unsafe { libc::pthread_attr_setschedpolicy(self.ptr(), policy) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_setschedpolicy failed, rc: {rc}, errno: {errno}"); + } + } + + /// Set thread affinity - array of CPU core IDs that the thread can run on. + fn affinity(&mut self, affinity: &[usize]) { + let cpu_set = CpuSet::new(affinity).get(); + let cpu_set_size = size_of::(); + + let rc = unsafe { libc::pthread_attr_setaffinity_np(self.ptr(), cpu_set_size, &cpu_set as *const _) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_setaffinity_np failed, rc: {rc}, errno: {errno}"); + } + } + + /// Set stack size. + fn stack_size(&mut self, stack_size: usize) { + let rc = unsafe { libc::pthread_attr_setstacksize(self.ptr(), stack_size) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_setstacksize failed, rc: {rc}, errno: {errno}"); + } + } + + /// Get reference to inner handle. + fn get(&self) -> &libc::pthread_attr_t { + &self.attr_handle + } +} + +impl Drop for Attributes { + fn drop(&mut self) { + let rc = unsafe { libc::pthread_attr_destroy(self.ptr()) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_destroy failed, rc: {rc}, errno: {errno}"); + } + } +} + +struct ThreadData T> { + f: F, +} + +/// `pthread` thread object. +struct Thread { + thread_handle: libc::pthread_t, +} + +impl Thread { + fn new(attributes: Attributes, f: F) -> Self + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let mut thread_handle = MaybeUninit::uninit(); + + extern "C" fn start_routine T>(data: *mut libc::c_void) -> *mut libc::c_void { + let data = unsafe { core::ptr::read(data as *const ThreadData) }; + let result = (data.f)(); + Box::into_raw(Box::new(result)).cast() + } + + let data = Box::into_raw(Box::new(ThreadData { f })); + let rc = unsafe { + libc::pthread_create( + thread_handle.as_mut_ptr(), + attributes.get() as *const _, + start_routine::, + data as *mut _, + ) + }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_create failed, rc: {rc}, errno: {errno}"); + } + + Self { + thread_handle: unsafe { thread_handle.assume_init() }, + } + } +} + +/// An owned permission to join on a thread (block on its termination). +pub struct JoinHandle { + thread: Thread, + _marker: PhantomData, +} + +impl JoinHandle { + fn new(thread: Thread) -> Self { + Self { + thread, + _marker: PhantomData, + } + } + + /// Wait for the associated thread to finish. + /// + /// This function will return immediately if the associated thread has already finished. + pub fn join(self) -> T { + let mut result = MaybeUninit::<*mut libc::c_void>::uninit(); + let thread_handle = self.thread.thread_handle; + let rc = unsafe { libc::pthread_join(thread_handle, result.as_mut_ptr().cast()) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_join failed, rc: {rc}, errno: {errno}"); + } + + let result_box = unsafe { Box::from_raw(result.assume_init().cast()) }; + *result_box + } +} + +/// Spawn a new thread, returning [`JoinHandle`] for it. +pub fn spawn(f: F, thread_parameters: ThreadParameters) -> JoinHandle +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + // Construct attributes based on provided parameters. + let mut attributes = Attributes::new(); + + if let Some(scheduler_parameters) = thread_parameters.scheduler_parameters { + attributes.inherit_scheduling_attributes(false); + attributes.scheduler_policy(scheduler_parameters.policy()); + attributes.priority(scheduler_parameters.priority()); + } + if let Some(affinity) = thread_parameters.affinity { + attributes.affinity(&affinity); + } + if let Some(stack_size) = thread_parameters.stack_size { + attributes.stack_size(stack_size); + } + + // Create a `Thread` and place it in a `JoinHandle`. + let thread = Thread::new(attributes, f); + JoinHandle::new(thread) +} + +#[cfg(test)] +mod tests { + use crate::parameters::{SchedulerParameters, SchedulerPolicy, ThreadParameters}; + use crate::thread::{spawn, Attributes, CpuSet}; + use core::mem::MaybeUninit; + use std::sync::mpsc::channel; + + #[test] + fn cpu_set_new_empty() { + let cpu_set = CpuSet::new(&[]); + + // Check all zeroed. + assert!(cpu_set.data.iter().all(|x| *x == 0)); + } + + #[test] + fn cpu_set_new_some() { + let cpu_set = CpuSet::new(&[0, 123, 1023]); + let mut data_vec = cpu_set.data.to_vec(); + // Test removes from `Vec`, start from the end. + assert_eq!(data_vec.remove(127), 0x80); + assert_eq!(data_vec.remove(15), 0x08); + assert_eq!(data_vec.remove(0), 0x01); + } + + #[test] + fn cpu_set_new_all() { + let all_ids: Vec<_> = (0..1024).collect(); + let cpu_set = CpuSet::new(&all_ids); + + // Check all maxed. + assert!(cpu_set.data.iter().all(|x| *x == 0xFF)); + } + + #[test] + #[should_panic(expected = "CPU ID provided to affinity exceeds max supported size, provided: 1024, max: 1023")] + fn cpu_set_new_out_of_range() { + let _ = CpuSet::new(&[0, 123, 1023, 1024]); + } + + fn attr_inherit_scheduling_attributes(attrs: &Attributes) -> bool { + let mut native = MaybeUninit::uninit(); + let rc = unsafe { libc::pthread_attr_getinheritsched(attrs.get(), native.as_mut_ptr()) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_getinheritsched failed, rc: {rc}, errno: {errno}"); + } + + match unsafe { native.assume_init() } { + libc::PTHREAD_INHERIT_SCHED => true, + libc::PTHREAD_EXPLICIT_SCHED => false, + _ => panic!("unknown inherit scheduling attributes value"), + } + } + + fn attr_priority(attrs: &Attributes) -> i32 { + let mut param_native = MaybeUninit::uninit(); + let rc = unsafe { libc::pthread_attr_getschedparam(attrs.get(), param_native.as_mut_ptr()) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_getschedparam failed, rc: {rc}, errno: {errno}"); + } + + unsafe { param_native.assume_init().sched_priority } + } + + fn attr_policy(attrs: &Attributes) -> SchedulerPolicy { + let mut policy_native = MaybeUninit::uninit(); + let rc = unsafe { libc::pthread_attr_getschedpolicy(attrs.get(), policy_native.as_mut_ptr()) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_getschedpolicy failed, rc: {rc}, errno: {errno}"); + } + + SchedulerPolicy::from(unsafe { policy_native.assume_init() }) + } + + fn attr_affinity(attrs: &Attributes) -> Vec { + let mut cpu_set = MaybeUninit::uninit(); + let cpu_set_size = size_of::(); + let rc = unsafe { libc::pthread_attr_getaffinity_np(attrs.get(), cpu_set_size, cpu_set.as_mut_ptr()) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_getaffinity_np failed, rc: {rc}, errno: {errno}"); + } + let cpu_set = unsafe { cpu_set.assume_init() }; + + let mut affinity = Vec::new(); + for i in 0..libc::CPU_SETSIZE as usize { + if unsafe { libc::CPU_ISSET(i, &cpu_set) } { + affinity.push(i); + } + } + + affinity + } + + fn attr_stack_size(attrs: &Attributes) -> usize { + let mut stack_size = MaybeUninit::uninit(); + let rc = unsafe { libc::pthread_attr_getstacksize(attrs.get(), stack_size.as_mut_ptr()) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_attr_getstacksize failed, rc: {rc}, errno: {errno}"); + } + + unsafe { stack_size.assume_init() } + } + + #[test] + fn attributes_new_succeeds() { + // Also checks `Drop` on exit. + let _ = Attributes::new(); + } + + #[test] + fn attributes_inherit_scheduling_attributes_succeeds() { + let mut attrs = Attributes::new(); + + attrs.inherit_scheduling_attributes(true); + assert!(attr_inherit_scheduling_attributes(&attrs)); + + attrs.inherit_scheduling_attributes(false); + assert!(!attr_inherit_scheduling_attributes(&attrs)); + } + + #[test] + fn attributes_priority_succeeds() { + let mut attrs = Attributes::new(); + + attrs.scheduler_policy(SchedulerPolicy::Fifo); + attrs.priority(50); + assert_eq!(attr_priority(&attrs), 50); + } + + #[test] + #[should_panic(expected = "libc::pthread_attr_setschedparam failed, rc: 22, errno: 0")] + fn attributes_priority_wrong_scheduler_policy() { + let mut attrs = Attributes::new(); + attrs.priority(50); + } + + #[test] + fn attributes_scheduler_policy_succeeds() { + let mut attrs = Attributes::new(); + + attrs.scheduler_policy(SchedulerPolicy::Fifo); + assert_eq!(attr_policy(&attrs), SchedulerPolicy::Fifo); + } + + #[test] + fn attributes_affinity_succeeds() { + let mut attrs = Attributes::new(); + + let expected_affinity = vec![0, 123, 1023]; + attrs.affinity(&expected_affinity); + assert_eq!(attr_affinity(&attrs), expected_affinity); + } + + #[test] + #[should_panic(expected = "CPU ID provided to affinity exceeds max supported size, provided: 1024, max: 1023")] + fn attributes_affinity_out_of_range() { + let mut attrs = Attributes::new(); + attrs.affinity(&[1024]); + } + + #[test] + fn attributes_stack_size_succeeds() { + let mut attrs = Attributes::new(); + + let expected_stack_size = 1024 * 1024; + attrs.stack_size(expected_stack_size); + assert_eq!(attr_stack_size(&attrs), expected_stack_size); + } + + #[test] + #[should_panic(expected = "libc::pthread_attr_setstacksize failed, rc: 22, errno: 0")] + fn attributes_stack_size_too_small() { + let mut attrs = Attributes::new(); + attrs.stack_size(4 * 1024); + } + + #[test] + fn spawn_succeeds() { + let thread_parameters = ThreadParameters::default(); + let (tx, rx) = channel(); + let join_handle = spawn( + move || { + tx.send(654321).unwrap(); + 123 + }, + thread_parameters, + ); + let join_result = join_handle.join(); + + assert_eq!(join_result, 123); + assert_eq!(rx.recv().unwrap(), 654321) + } + + fn current_sched_params() -> (SchedulerPolicy, i32) { + let thread = unsafe { libc::pthread_self() }; + let mut policy = MaybeUninit::uninit(); + let mut param = MaybeUninit::uninit(); + let rc = unsafe { libc::pthread_getschedparam(thread, policy.as_mut_ptr(), param.as_mut_ptr()) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::pthread_getschedparam failed, rc: {rc}, errno: {errno}"); + } + + let policy_native = unsafe { policy.assume_init() }; + let scheduler_policy = match policy_native { + 0 => SchedulerPolicy::Other, + 1 => SchedulerPolicy::Fifo, + 2 => SchedulerPolicy::RoundRobin, + _ => panic!("Unknown scheduler type"), + }; + + let priority = unsafe { param.assume_init().sched_priority }; + + (scheduler_policy, priority) + } + + #[test] + #[ignore = "test requires cap_sys_nice=ep"] + fn spawn_scheduler_params_succeeds() { + let exp_scheduler_parameters = SchedulerParameters::new(SchedulerPolicy::Fifo, 10); + let thread_parameters = ThreadParameters::new().scheduler_parameters(exp_scheduler_parameters); + let (tx, rx) = channel(); + let join_handle = spawn( + move || { + let sched_params = current_sched_params(); + tx.send(sched_params).unwrap(); + }, + thread_parameters, + ); + join_handle.join(); + + let (scheduler_policy, priority) = rx.recv().unwrap(); + assert_eq!(scheduler_policy, exp_scheduler_parameters.policy()); + assert_eq!(priority, exp_scheduler_parameters.priority()); + } + + fn current_thread_affinity() -> Vec { + let current_thread = 0; + let mut cpu_set = MaybeUninit::uninit(); + let cpu_set_size = size_of::(); + let rc = unsafe { libc::sched_getaffinity(current_thread, cpu_set_size, cpu_set.as_mut_ptr()) }; + if rc != 0 { + let errno = unsafe { *libc::__errno_location() }; + panic!("libc::sched_getaffinity failed, rc: {rc}, errno: {errno}"); + } + let cpu_set = unsafe { cpu_set.assume_init() }; + + let mut affinity = Vec::new(); + for i in 0..libc::CPU_SETSIZE as usize { + if unsafe { libc::CPU_ISSET(i, &cpu_set) } { + affinity.push(i); + } + } + + affinity + } + + #[test] + fn spawn_affinity_succeeds() { + let exp_affinity = vec![1]; + let thread_parameters = ThreadParameters::new().affinity(&exp_affinity); + let (tx, rx) = channel(); + let join_handle = spawn( + move || { + let affinity = current_thread_affinity(); + tx.send(affinity).unwrap(); + }, + thread_parameters, + ); + join_handle.join(); + + assert_eq!(rx.recv().unwrap(), exp_affinity); + } + + #[test] + #[should_panic(expected = "CPU ID provided to affinity exceeds max supported size, provided: 1234, max: 1023")] + fn spawn_affinity_out_of_range() { + let thread_parameters = ThreadParameters::new().affinity(&[1234]); + let _ = spawn(|| {}, thread_parameters); + } + + #[test] + fn spawn_stack_size_succeeds() { + // Check that nothing fails - cannot check stack size from within. + let stack_size = 1024 * 1024; + let thread_parameters = ThreadParameters::new().stack_size(stack_size); + let join_handle = spawn( + || { + // Do nothing. + }, + thread_parameters, + ); + join_handle.join(); + } +}