From 9371000919740ea8e90f1992c4ced28799a57ebd Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Fri, 10 Oct 2025 15:48:37 +0200 Subject: [PATCH 01/14] WIP: Add HashSet::par_iter and HashMap::par_iter(_mut) integration with paralight. --- Cargo.toml | 3 +- src/external_trait_impls/mod.rs | 2 + src/external_trait_impls/paralight.rs | 257 ++++++++++++++++++++++++++ 3 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 src/external_trait_impls/paralight.rs diff --git a/Cargo.toml b/Cargo.toml index e116d894b..54c82f780 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ redundant_else = "allow" foldhash = { version = "0.2.0", default-features = false, optional = true } # For external trait impls +paralight = { version = "0.0.6", optional = true } rayon = { version = "1.9.0", optional = true } serde_core = { version = "1.0.221", default-features = false, optional = true } @@ -107,5 +108,5 @@ default-hasher = ["dep:foldhash"] inline-more = [] [package.metadata.docs.rs] -features = ["nightly", "rayon", "serde", "raw-entry"] +features = ["nightly", "paralight", "rayon", "serde", "raw-entry"] rustdoc-args = ["--generate-link-to-definition"] diff --git a/src/external_trait_impls/mod.rs b/src/external_trait_impls/mod.rs index ef497836c..cfea0ffef 100644 --- a/src/external_trait_impls/mod.rs +++ b/src/external_trait_impls/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "paralight")] +mod paralight; #[cfg(feature = "rayon")] pub(crate) mod rayon; #[cfg(feature = "serde")] diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs new file mode 100644 index 000000000..592226f7c --- /dev/null +++ b/src/external_trait_impls/paralight.rs @@ -0,0 +1,257 @@ +use crate::raw::RawTable; +use crate::{HashMap, HashSet}; +use paralight::iter::{ + IntoParallelRefMutSource, IntoParallelRefSource, ParallelSource, SourceCleanup, + SourceDescriptor, +}; + +impl<'data, T: Sync + 'data> IntoParallelRefSource<'data> for HashSet { + type Item = Option<&'data T>; + type Source = HashSetRefParallelSource<'data, T>; + + fn par_iter(&'data self) -> Self::Source { + HashSetRefParallelSource { hash_set: self } + } +} + +#[must_use = "iterator adaptors are lazy"] +pub struct HashSetRefParallelSource<'data, T> { + hash_set: &'data HashSet, +} + +impl<'data, T: Sync> ParallelSource for HashSetRefParallelSource<'data, T> { + type Item = Option<&'data T>; + + fn descriptor(self) -> impl SourceDescriptor + Sync { + HashSetRefSourceDescriptor { + table: &self.hash_set.map.table, + } + } +} + +struct HashSetRefSourceDescriptor<'data, T: Sync> { + table: &'data RawTable<(T, ())>, +} + +impl SourceCleanup for HashSetRefSourceDescriptor<'_, T> { + const NEEDS_CLEANUP: bool = false; + + unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { + // Nothing to cleanup + } +} + +impl<'data, T: Sync> SourceDescriptor for HashSetRefSourceDescriptor<'data, T> { + type Item = Option<&'data T>; + + fn len(&self) -> usize { + self.table.buckets() + } + + unsafe fn fetch_item(&self, index: usize) -> Self::Item { + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + let (t, ()) = unsafe { bucket.as_ref() }; + Some(t) + } else { + None + } + } +} + +impl<'data, K: Sync + 'data, V: Sync + 'data> IntoParallelRefSource<'data> for HashMap { + type Item = Option<&'data (K, V)>; + type Source = HashMapRefParallelSource<'data, K, V>; + + fn par_iter(&'data self) -> Self::Source { + HashMapRefParallelSource { hash_map: self } + } +} + +#[must_use = "iterator adaptors are lazy"] +pub struct HashMapRefParallelSource<'data, K, V> { + hash_map: &'data HashMap, +} + +impl<'data, K: Sync, V: Sync> ParallelSource for HashMapRefParallelSource<'data, K, V> { + type Item = Option<&'data (K, V)>; + + fn descriptor(self) -> impl SourceDescriptor + Sync { + HashMapRefSourceDescriptor { + table: &self.hash_map.table, + } + } +} + +struct HashMapRefSourceDescriptor<'data, K: Sync, V: Sync> { + table: &'data RawTable<(K, V)>, +} + +impl SourceCleanup for HashMapRefSourceDescriptor<'_, K, V> { + const NEEDS_CLEANUP: bool = false; + + unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { + // Nothing to cleanup + } +} + +impl<'data, K: Sync, V: Sync> SourceDescriptor for HashMapRefSourceDescriptor<'data, K, V> { + type Item = Option<&'data (K, V)>; + + fn len(&self) -> usize { + self.table.buckets() + } + + unsafe fn fetch_item(&self, index: usize) -> Self::Item { + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + unsafe { Some(bucket.as_ref()) } + } else { + None + } + } +} + +// TODO: Remove Sync requirement on V. +impl<'data, K: Sync + 'data, V: Send + Sync + 'data> IntoParallelRefMutSource<'data> + for HashMap +{ + type Item = Option<(&'data K, &'data mut V)>; + type Source = HashMapRefMutParallelSource<'data, K, V>; + + fn par_iter_mut(&'data mut self) -> Self::Source { + HashMapRefMutParallelSource { hash_map: self } + } +} + +#[must_use = "iterator adaptors are lazy"] +pub struct HashMapRefMutParallelSource<'data, K, V> { + hash_map: &'data mut HashMap, +} + +impl<'data, K: Sync, V: Send + Sync> ParallelSource for HashMapRefMutParallelSource<'data, K, V> { + type Item = Option<(&'data K, &'data mut V)>; + + fn descriptor(self) -> impl SourceDescriptor + Sync { + HashMapRefMutSourceDescriptor { + table: &self.hash_map.table, + } + } +} + +struct HashMapRefMutSourceDescriptor<'data, K: Sync, V: Send + Sync> { + table: &'data RawTable<(K, V)>, +} + +impl SourceCleanup for HashMapRefMutSourceDescriptor<'_, K, V> { + const NEEDS_CLEANUP: bool = false; + + unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { + // Nothing to cleanup + } +} + +impl<'data, K: Sync, V: Send + Sync> SourceDescriptor + for HashMapRefMutSourceDescriptor<'data, K, V> +{ + type Item = Option<(&'data K, &'data mut V)>; + + fn len(&self) -> usize { + self.table.buckets() + } + + unsafe fn fetch_item(&self, index: usize) -> Self::Item { + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO + let (key, value) = unsafe { bucket.as_mut() }; + Some((key, value)) + } else { + None + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use alloc::boxed::Box; + use core::ops::Deref; + use paralight::iter::{ParallelIteratorExt, ParallelSourceExt}; + use paralight::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPoolBuilder}; + + #[test] + fn test_set_par_iter() { + let mut set = HashSet::new(); + for i in 1..=42 { + set.insert(Box::new(i)); + } + + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let sum = set + .par_iter() + .with_thread_pool(&mut thread_pool) + .filter_map(|x| x.map(|y| y.deref())) + .sum::(); + assert_eq!(sum, 21 * 43); + } + + #[test] + fn test_map_par_iter() { + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i * i)); + } + + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + map.par_iter() + .with_thread_pool(&mut thread_pool) + .filter_map(|x| x) + .for_each(|(k, v)| assert_eq!(**k * **k, **v)); + } + + #[test] + fn test_map_par_iter_mut() { + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i)); + } + + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + map.par_iter_mut() + .with_thread_pool(&mut thread_pool) + .filter_map(|x| x) + .for_each(|(k, v)| **v *= **k); + + for (k, v) in map.iter() { + assert_eq!(**k * **k, **v); + } + } +} From 889555ea2e7153b07f8e042519ce5b33a5887c46 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Fri, 10 Oct 2025 15:54:42 +0200 Subject: [PATCH 02/14] WIP: Support non-default hasher and allocator parameters. --- src/external_trait_impls/paralight.rs | 77 ++++++++++++++++----------- 1 file changed, 47 insertions(+), 30 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 592226f7c..065be5458 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -1,13 +1,16 @@ -use crate::raw::RawTable; +use crate::raw::{Allocator, RawTable}; use crate::{HashMap, HashSet}; use paralight::iter::{ IntoParallelRefMutSource, IntoParallelRefSource, ParallelSource, SourceCleanup, SourceDescriptor, }; -impl<'data, T: Sync + 'data> IntoParallelRefSource<'data> for HashSet { +// HashSet.par_iter() +impl<'data, T: Sync + 'data, S: 'data, A: Allocator + Sync + 'data> IntoParallelRefSource<'data> + for HashSet +{ type Item = Option<&'data T>; - type Source = HashSetRefParallelSource<'data, T>; + type Source = HashSetRefParallelSource<'data, T, S, A>; fn par_iter(&'data self) -> Self::Source { HashSetRefParallelSource { hash_set: self } @@ -15,11 +18,13 @@ impl<'data, T: Sync + 'data> IntoParallelRefSource<'data> for HashSet { } #[must_use = "iterator adaptors are lazy"] -pub struct HashSetRefParallelSource<'data, T> { - hash_set: &'data HashSet, +pub struct HashSetRefParallelSource<'data, T, S, A: Allocator> { + hash_set: &'data HashSet, } -impl<'data, T: Sync> ParallelSource for HashSetRefParallelSource<'data, T> { +impl<'data, T: Sync, S, A: Allocator + Sync> ParallelSource + for HashSetRefParallelSource<'data, T, S, A> +{ type Item = Option<&'data T>; fn descriptor(self) -> impl SourceDescriptor + Sync { @@ -29,11 +34,11 @@ impl<'data, T: Sync> ParallelSource for HashSetRefParallelSource<'data, T> { } } -struct HashSetRefSourceDescriptor<'data, T: Sync> { - table: &'data RawTable<(T, ())>, +struct HashSetRefSourceDescriptor<'data, T: Sync, A: Allocator> { + table: &'data RawTable<(T, ()), A>, } -impl SourceCleanup for HashSetRefSourceDescriptor<'_, T> { +impl SourceCleanup for HashSetRefSourceDescriptor<'_, T, A> { const NEEDS_CLEANUP: bool = false; unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -41,7 +46,7 @@ impl SourceCleanup for HashSetRefSourceDescriptor<'_, T> { } } -impl<'data, T: Sync> SourceDescriptor for HashSetRefSourceDescriptor<'data, T> { +impl<'data, T: Sync, A: Allocator> SourceDescriptor for HashSetRefSourceDescriptor<'data, T, A> { type Item = Option<&'data T>; fn len(&self) -> usize { @@ -62,9 +67,12 @@ impl<'data, T: Sync> SourceDescriptor for HashSetRefSourceDescriptor<'data, T> { } } -impl<'data, K: Sync + 'data, V: Sync + 'data> IntoParallelRefSource<'data> for HashMap { +// HashMap.par_iter() +impl<'data, K: Sync + 'data, V: Sync + 'data, S: 'data, A: Allocator + Sync + 'data> + IntoParallelRefSource<'data> for HashMap +{ type Item = Option<&'data (K, V)>; - type Source = HashMapRefParallelSource<'data, K, V>; + type Source = HashMapRefParallelSource<'data, K, V, S, A>; fn par_iter(&'data self) -> Self::Source { HashMapRefParallelSource { hash_map: self } @@ -72,11 +80,13 @@ impl<'data, K: Sync + 'data, V: Sync + 'data> IntoParallelRefSource<'data> for H } #[must_use = "iterator adaptors are lazy"] -pub struct HashMapRefParallelSource<'data, K, V> { - hash_map: &'data HashMap, +pub struct HashMapRefParallelSource<'data, K, V, S, A: Allocator> { + hash_map: &'data HashMap, } -impl<'data, K: Sync, V: Sync> ParallelSource for HashMapRefParallelSource<'data, K, V> { +impl<'data, K: Sync, V: Sync, S, A: Allocator + Sync> ParallelSource + for HashMapRefParallelSource<'data, K, V, S, A> +{ type Item = Option<&'data (K, V)>; fn descriptor(self) -> impl SourceDescriptor + Sync { @@ -86,11 +96,11 @@ impl<'data, K: Sync, V: Sync> ParallelSource for HashMapRefParallelSource<'data, } } -struct HashMapRefSourceDescriptor<'data, K: Sync, V: Sync> { - table: &'data RawTable<(K, V)>, +struct HashMapRefSourceDescriptor<'data, K: Sync, V: Sync, A: Allocator> { + table: &'data RawTable<(K, V), A>, } -impl SourceCleanup for HashMapRefSourceDescriptor<'_, K, V> { +impl SourceCleanup for HashMapRefSourceDescriptor<'_, K, V, A> { const NEEDS_CLEANUP: bool = false; unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -98,7 +108,9 @@ impl SourceCleanup for HashMapRefSourceDescriptor<'_, K, V> { } } -impl<'data, K: Sync, V: Sync> SourceDescriptor for HashMapRefSourceDescriptor<'data, K, V> { +impl<'data, K: Sync, V: Sync, A: Allocator> SourceDescriptor + for HashMapRefSourceDescriptor<'data, K, V, A> +{ type Item = Option<&'data (K, V)>; fn len(&self) -> usize { @@ -118,12 +130,13 @@ impl<'data, K: Sync, V: Sync> SourceDescriptor for HashMapRefSourceDescriptor<'d } } +// HashMap.par_iter_mut() // TODO: Remove Sync requirement on V. -impl<'data, K: Sync + 'data, V: Send + Sync + 'data> IntoParallelRefMutSource<'data> - for HashMap +impl<'data, K: Sync + 'data, V: Send + Sync + 'data, S: 'data, A: Allocator + Sync + 'data> + IntoParallelRefMutSource<'data> for HashMap { type Item = Option<(&'data K, &'data mut V)>; - type Source = HashMapRefMutParallelSource<'data, K, V>; + type Source = HashMapRefMutParallelSource<'data, K, V, S, A>; fn par_iter_mut(&'data mut self) -> Self::Source { HashMapRefMutParallelSource { hash_map: self } @@ -131,11 +144,13 @@ impl<'data, K: Sync + 'data, V: Send + Sync + 'data> IntoParallelRefMutSource<'d } #[must_use = "iterator adaptors are lazy"] -pub struct HashMapRefMutParallelSource<'data, K, V> { - hash_map: &'data mut HashMap, +pub struct HashMapRefMutParallelSource<'data, K, V, S, A: Allocator> { + hash_map: &'data mut HashMap, } -impl<'data, K: Sync, V: Send + Sync> ParallelSource for HashMapRefMutParallelSource<'data, K, V> { +impl<'data, K: Sync, V: Send + Sync, S, A: Allocator + Sync> ParallelSource + for HashMapRefMutParallelSource<'data, K, V, S, A> +{ type Item = Option<(&'data K, &'data mut V)>; fn descriptor(self) -> impl SourceDescriptor + Sync { @@ -145,11 +160,13 @@ impl<'data, K: Sync, V: Send + Sync> ParallelSource for HashMapRefMutParallelSou } } -struct HashMapRefMutSourceDescriptor<'data, K: Sync, V: Send + Sync> { - table: &'data RawTable<(K, V)>, +struct HashMapRefMutSourceDescriptor<'data, K: Sync, V: Send + Sync, A: Allocator> { + table: &'data RawTable<(K, V), A>, } -impl SourceCleanup for HashMapRefMutSourceDescriptor<'_, K, V> { +impl SourceCleanup + for HashMapRefMutSourceDescriptor<'_, K, V, A> +{ const NEEDS_CLEANUP: bool = false; unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -157,8 +174,8 @@ impl SourceCleanup for HashMapRefMutSourceDescriptor<'_ } } -impl<'data, K: Sync, V: Send + Sync> SourceDescriptor - for HashMapRefMutSourceDescriptor<'data, K, V> +impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor + for HashMapRefMutSourceDescriptor<'data, K, V, A> { type Item = Option<(&'data K, &'data mut V)>; From 4a77e084bbdfa45625027e9abab8c73c15eea13b Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Sat, 11 Oct 2025 19:54:23 +0200 Subject: [PATCH 03/14] Add debug assertions and missing SAFETY:TODO comments. --- src/external_trait_impls/paralight.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 065be5458..d0c74ab0b 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -54,11 +54,13 @@ impl<'data, T: Sync, A: Allocator> SourceDescriptor for HashSetRefSourceDescript } unsafe fn fetch_item(&self, index: usize) -> Self::Item { + debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; if full { // SAFETY: TODO let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO let (t, ()) = unsafe { bucket.as_ref() }; Some(t) } else { @@ -118,11 +120,13 @@ impl<'data, K: Sync, V: Sync, A: Allocator> SourceDescriptor } unsafe fn fetch_item(&self, index: usize) -> Self::Item { + debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; if full { // SAFETY: TODO let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO unsafe { Some(bucket.as_ref()) } } else { None @@ -184,6 +188,7 @@ impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor } unsafe fn fetch_item(&self, index: usize) -> Self::Item { + debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; if full { From 9e7890ea60f2aa9e2f8ed3046c65c1e7d7c21ade Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Sat, 11 Oct 2025 19:57:55 +0200 Subject: [PATCH 04/14] Implement IntoParallelSource for hash tables. --- src/external_trait_impls/paralight.rs | 223 +++++++++++++++++++++++++- 1 file changed, 221 insertions(+), 2 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index d0c74ab0b..2c6f0c746 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -1,8 +1,8 @@ use crate::raw::{Allocator, RawTable}; use crate::{HashMap, HashSet}; use paralight::iter::{ - IntoParallelRefMutSource, IntoParallelRefSource, ParallelSource, SourceCleanup, - SourceDescriptor, + IntoParallelRefMutSource, IntoParallelRefSource, IntoParallelSource, ParallelSource, + SourceCleanup, SourceDescriptor, }; // HashSet.par_iter() @@ -203,6 +203,183 @@ impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor } } +// HashSet.into_par_iter() +// TODO: Remove Sync requirement on T. +impl IntoParallelSource for HashSet { + type Item = Option; + type Source = HashSetParallelSource; + + fn into_par_iter(self) -> Self::Source { + HashSetParallelSource { hash_set: self } + } +} + +#[must_use = "iterator adaptors are lazy"] +pub struct HashSetParallelSource { + hash_set: HashSet, +} + +impl ParallelSource for HashSetParallelSource { + type Item = Option; + + fn descriptor(self) -> impl SourceDescriptor + Sync { + HashSetSourceDescriptor { + table: self.hash_set.map.table, + } + } +} + +struct HashSetSourceDescriptor { + table: RawTable<(T, ()), A>, +} + +impl SourceCleanup for HashSetSourceDescriptor { + const NEEDS_CLEANUP: bool = core::mem::needs_drop::(); + + unsafe fn cleanup_item_range(&self, range: core::ops::Range) { + if Self::NEEDS_CLEANUP { + debug_assert!(range.start <= range.end); + debug_assert!(range.start <= self.len()); + debug_assert!(range.end <= self.len()); + for index in range { + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO + let (t, ()) = unsafe { bucket.read() }; + drop(t); + } + } + } + } +} + +impl SourceDescriptor for HashSetSourceDescriptor { + type Item = Option; + + fn len(&self) -> usize { + self.table.buckets() + } + + unsafe fn fetch_item(&self, index: usize) -> Self::Item { + debug_assert!(index < self.len()); + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO + let (t, ()) = unsafe { bucket.read() }; + Some(t) + } else { + None + } + } +} + +impl Drop for HashSetSourceDescriptor { + fn drop(&mut self) { + // Paralight already dropped each missing bucket via calls to cleanup_item_range(), so we + // can simply mark all buckets as cleared and let the RawTable destructor do the rest. + // TODO: Optimize this to simply deallocate without touching the control bytes. + self.table.clear_no_drop(); + } +} + +// HashMap.into_par_iter() +// TODO: Remove Sync requirement on K and V. +impl IntoParallelSource + for HashMap +{ + type Item = Option<(K, V)>; + type Source = HashMapParallelSource; + + fn into_par_iter(self) -> Self::Source { + HashMapParallelSource { hash_map: self } + } +} + +#[must_use = "iterator adaptors are lazy"] +pub struct HashMapParallelSource { + hash_map: HashMap, +} + +impl ParallelSource + for HashMapParallelSource +{ + type Item = Option<(K, V)>; + + fn descriptor(self) -> impl SourceDescriptor + Sync { + HashMapSourceDescriptor { + table: self.hash_map.table, + } + } +} + +struct HashMapSourceDescriptor { + table: RawTable<(K, V), A>, +} + +impl SourceCleanup + for HashMapSourceDescriptor +{ + const NEEDS_CLEANUP: bool = core::mem::needs_drop::<(K, V)>(); + + unsafe fn cleanup_item_range(&self, range: core::ops::Range) { + if Self::NEEDS_CLEANUP { + debug_assert!(range.start <= range.end); + debug_assert!(range.start <= self.len()); + debug_assert!(range.end <= self.len()); + for index in range { + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO + let key_value = unsafe { bucket.read() }; + drop(key_value); + } + } + } + } +} + +impl SourceDescriptor + for HashMapSourceDescriptor +{ + type Item = Option<(K, V)>; + + fn len(&self) -> usize { + self.table.buckets() + } + + unsafe fn fetch_item(&self, index: usize) -> Self::Item { + debug_assert!(index < self.len()); + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO + unsafe { Some(bucket.read()) } + } else { + None + } + } +} + +impl Drop for HashMapSourceDescriptor { + fn drop(&mut self) { + // Paralight already dropped each missing bucket via calls to cleanup_item_range(), so we + // can simply mark all buckets as cleared and let the RawTable destructor do the rest. + // TODO: Optimize this to simply deallocate without touching the control bytes. + self.table.clear_no_drop(); + } +} + #[cfg(test)] mod test { use super::*; @@ -233,6 +410,28 @@ mod test { assert_eq!(sum, 21 * 43); } + #[test] + fn test_set_into_par_iter() { + let mut set = HashSet::new(); + for i in 1..=42 { + set.insert(Box::new(i)); + } + + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let sum = set + .into_par_iter() + .with_thread_pool(&mut thread_pool) + .filter_map(|x| x.map(|y| *y)) + .sum::(); + assert_eq!(sum, 21 * 43); + } + #[test] fn test_map_par_iter() { let mut map = HashMap::new(); @@ -276,4 +475,24 @@ mod test { assert_eq!(**k * **k, **v); } } + + #[test] + fn test_map_into_par_iter() { + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i * i)); + } + + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + map.into_par_iter() + .with_thread_pool(&mut thread_pool) + .filter_map(|x| x) + .for_each(|(k, v)| assert_eq!(*k * *k, *v)); + } } From eefe4df3f5a562263daadf71c6f78eca8926409c Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Sat, 11 Oct 2025 20:17:42 +0200 Subject: [PATCH 05/14] Move thread pool building at the start of the tests. --- src/external_trait_impls/paralight.rs | 50 +++++++++++++-------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 2c6f0c746..ce01ca507 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -390,11 +390,6 @@ mod test { #[test] fn test_set_par_iter() { - let mut set = HashSet::new(); - for i in 1..=42 { - set.insert(Box::new(i)); - } - let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy: RangeStrategy::WorkStealing, @@ -402,6 +397,11 @@ mod test { } .build(); + let mut set = HashSet::new(); + for i in 1..=42 { + set.insert(Box::new(i)); + } + let sum = set .par_iter() .with_thread_pool(&mut thread_pool) @@ -412,11 +412,6 @@ mod test { #[test] fn test_set_into_par_iter() { - let mut set = HashSet::new(); - for i in 1..=42 { - set.insert(Box::new(i)); - } - let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy: RangeStrategy::WorkStealing, @@ -424,6 +419,11 @@ mod test { } .build(); + let mut set = HashSet::new(); + for i in 1..=42 { + set.insert(Box::new(i)); + } + let sum = set .into_par_iter() .with_thread_pool(&mut thread_pool) @@ -434,11 +434,6 @@ mod test { #[test] fn test_map_par_iter() { - let mut map = HashMap::new(); - for i in 1..=42 { - map.insert(Box::new(i), Box::new(i * i)); - } - let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy: RangeStrategy::WorkStealing, @@ -446,6 +441,11 @@ mod test { } .build(); + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i * i)); + } + map.par_iter() .with_thread_pool(&mut thread_pool) .filter_map(|x| x) @@ -454,11 +454,6 @@ mod test { #[test] fn test_map_par_iter_mut() { - let mut map = HashMap::new(); - for i in 1..=42 { - map.insert(Box::new(i), Box::new(i)); - } - let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy: RangeStrategy::WorkStealing, @@ -466,6 +461,11 @@ mod test { } .build(); + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i)); + } + map.par_iter_mut() .with_thread_pool(&mut thread_pool) .filter_map(|x| x) @@ -478,11 +478,6 @@ mod test { #[test] fn test_map_into_par_iter() { - let mut map = HashMap::new(); - for i in 1..=42 { - map.insert(Box::new(i), Box::new(i * i)); - } - let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy: RangeStrategy::WorkStealing, @@ -490,6 +485,11 @@ mod test { } .build(); + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i * i)); + } + map.into_par_iter() .with_thread_pool(&mut thread_pool) .filter_map(|x| x) From 0d9005996dd3e4792a7f100de72be49a238ff37a Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Mon, 12 Jan 2026 10:16:58 +0100 Subject: [PATCH 06/14] Update to Paralight v0.0.10. --- Cargo.toml | 2 +- src/external_trait_impls/paralight.rs | 89 +++++++++++++-------------- 2 files changed, 44 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 54c82f780..664706a64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ redundant_else = "allow" foldhash = { version = "0.2.0", default-features = false, optional = true } # For external trait impls -paralight = { version = "0.0.6", optional = true } +paralight = { version = "0.0.10", optional = true } rayon = { version = "1.9.0", optional = true } serde_core = { version = "1.0.221", default-features = false, optional = true } diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index ce01ca507..15dd5ebbc 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -9,7 +9,7 @@ use paralight::iter::{ impl<'data, T: Sync + 'data, S: 'data, A: Allocator + Sync + 'data> IntoParallelRefSource<'data> for HashSet { - type Item = Option<&'data T>; + type Item = &'data T; type Source = HashSetRefParallelSource<'data, T, S, A>; fn par_iter(&'data self) -> Self::Source { @@ -25,7 +25,7 @@ pub struct HashSetRefParallelSource<'data, T, S, A: Allocator> { impl<'data, T: Sync, S, A: Allocator + Sync> ParallelSource for HashSetRefParallelSource<'data, T, S, A> { - type Item = Option<&'data T>; + type Item = &'data T; fn descriptor(self) -> impl SourceDescriptor + Sync { HashSetRefSourceDescriptor { @@ -41,19 +41,19 @@ struct HashSetRefSourceDescriptor<'data, T: Sync, A: Allocator> { impl SourceCleanup for HashSetRefSourceDescriptor<'_, T, A> { const NEEDS_CLEANUP: bool = false; + fn len(&self) -> usize { + self.table.buckets() + } + unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { // Nothing to cleanup } } impl<'data, T: Sync, A: Allocator> SourceDescriptor for HashSetRefSourceDescriptor<'data, T, A> { - type Item = Option<&'data T>; - - fn len(&self) -> usize { - self.table.buckets() - } + type Item = &'data T; - unsafe fn fetch_item(&self, index: usize) -> Self::Item { + unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; @@ -73,7 +73,7 @@ impl<'data, T: Sync, A: Allocator> SourceDescriptor for HashSetRefSourceDescript impl<'data, K: Sync + 'data, V: Sync + 'data, S: 'data, A: Allocator + Sync + 'data> IntoParallelRefSource<'data> for HashMap { - type Item = Option<&'data (K, V)>; + type Item = &'data (K, V); type Source = HashMapRefParallelSource<'data, K, V, S, A>; fn par_iter(&'data self) -> Self::Source { @@ -89,7 +89,7 @@ pub struct HashMapRefParallelSource<'data, K, V, S, A: Allocator> { impl<'data, K: Sync, V: Sync, S, A: Allocator + Sync> ParallelSource for HashMapRefParallelSource<'data, K, V, S, A> { - type Item = Option<&'data (K, V)>; + type Item = &'data (K, V); fn descriptor(self) -> impl SourceDescriptor + Sync { HashMapRefSourceDescriptor { @@ -105,6 +105,10 @@ struct HashMapRefSourceDescriptor<'data, K: Sync, V: Sync, A: Allocator> { impl SourceCleanup for HashMapRefSourceDescriptor<'_, K, V, A> { const NEEDS_CLEANUP: bool = false; + fn len(&self) -> usize { + self.table.buckets() + } + unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { // Nothing to cleanup } @@ -113,13 +117,9 @@ impl SourceCleanup for HashMapRefSourceDescripto impl<'data, K: Sync, V: Sync, A: Allocator> SourceDescriptor for HashMapRefSourceDescriptor<'data, K, V, A> { - type Item = Option<&'data (K, V)>; - - fn len(&self) -> usize { - self.table.buckets() - } + type Item = &'data (K, V); - unsafe fn fetch_item(&self, index: usize) -> Self::Item { + unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; @@ -139,7 +139,7 @@ impl<'data, K: Sync, V: Sync, A: Allocator> SourceDescriptor impl<'data, K: Sync + 'data, V: Send + Sync + 'data, S: 'data, A: Allocator + Sync + 'data> IntoParallelRefMutSource<'data> for HashMap { - type Item = Option<(&'data K, &'data mut V)>; + type Item = (&'data K, &'data mut V); type Source = HashMapRefMutParallelSource<'data, K, V, S, A>; fn par_iter_mut(&'data mut self) -> Self::Source { @@ -155,7 +155,7 @@ pub struct HashMapRefMutParallelSource<'data, K, V, S, A: Allocator> { impl<'data, K: Sync, V: Send + Sync, S, A: Allocator + Sync> ParallelSource for HashMapRefMutParallelSource<'data, K, V, S, A> { - type Item = Option<(&'data K, &'data mut V)>; + type Item = (&'data K, &'data mut V); fn descriptor(self) -> impl SourceDescriptor + Sync { HashMapRefMutSourceDescriptor { @@ -173,6 +173,10 @@ impl SourceCleanup { const NEEDS_CLEANUP: bool = false; + fn len(&self) -> usize { + self.table.buckets() + } + unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { // Nothing to cleanup } @@ -181,13 +185,9 @@ impl SourceCleanup impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor for HashMapRefMutSourceDescriptor<'data, K, V, A> { - type Item = Option<(&'data K, &'data mut V)>; + type Item = (&'data K, &'data mut V); - fn len(&self) -> usize { - self.table.buckets() - } - - unsafe fn fetch_item(&self, index: usize) -> Self::Item { + unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; @@ -206,7 +206,7 @@ impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor // HashSet.into_par_iter() // TODO: Remove Sync requirement on T. impl IntoParallelSource for HashSet { - type Item = Option; + type Item = T; type Source = HashSetParallelSource; fn into_par_iter(self) -> Self::Source { @@ -220,7 +220,7 @@ pub struct HashSetParallelSource { } impl ParallelSource for HashSetParallelSource { - type Item = Option; + type Item = T; fn descriptor(self) -> impl SourceDescriptor + Sync { HashSetSourceDescriptor { @@ -236,6 +236,10 @@ struct HashSetSourceDescriptor { impl SourceCleanup for HashSetSourceDescriptor { const NEEDS_CLEANUP: bool = core::mem::needs_drop::(); + fn len(&self) -> usize { + self.table.buckets() + } + unsafe fn cleanup_item_range(&self, range: core::ops::Range) { if Self::NEEDS_CLEANUP { debug_assert!(range.start <= range.end); @@ -257,13 +261,9 @@ impl SourceCleanup for HashSetSourceDescriptor SourceDescriptor for HashSetSourceDescriptor { - type Item = Option; - - fn len(&self) -> usize { - self.table.buckets() - } + type Item = T; - unsafe fn fetch_item(&self, index: usize) -> Self::Item { + unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; @@ -293,7 +293,7 @@ impl Drop for HashSetSourceDescriptor { impl IntoParallelSource for HashMap { - type Item = Option<(K, V)>; + type Item = (K, V); type Source = HashMapParallelSource; fn into_par_iter(self) -> Self::Source { @@ -309,7 +309,7 @@ pub struct HashMapParallelSource { impl ParallelSource for HashMapParallelSource { - type Item = Option<(K, V)>; + type Item = (K, V); fn descriptor(self) -> impl SourceDescriptor + Sync { HashMapSourceDescriptor { @@ -327,6 +327,10 @@ impl SourceCleanup { const NEEDS_CLEANUP: bool = core::mem::needs_drop::<(K, V)>(); + fn len(&self) -> usize { + self.table.buckets() + } + unsafe fn cleanup_item_range(&self, range: core::ops::Range) { if Self::NEEDS_CLEANUP { debug_assert!(range.start <= range.end); @@ -350,13 +354,9 @@ impl SourceCleanup impl SourceDescriptor for HashMapSourceDescriptor { - type Item = Option<(K, V)>; - - fn len(&self) -> usize { - self.table.buckets() - } + type Item = (K, V); - unsafe fn fetch_item(&self, index: usize) -> Self::Item { + unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; @@ -386,7 +386,7 @@ mod test { use alloc::boxed::Box; use core::ops::Deref; use paralight::iter::{ParallelIteratorExt, ParallelSourceExt}; - use paralight::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPoolBuilder}; + use paralight::threads::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPoolBuilder}; #[test] fn test_set_par_iter() { @@ -405,7 +405,7 @@ mod test { let sum = set .par_iter() .with_thread_pool(&mut thread_pool) - .filter_map(|x| x.map(|y| y.deref())) + .map(|x| x.deref()) .sum::(); assert_eq!(sum, 21 * 43); } @@ -427,7 +427,7 @@ mod test { let sum = set .into_par_iter() .with_thread_pool(&mut thread_pool) - .filter_map(|x| x.map(|y| *y)) + .map(|x| *x) .sum::(); assert_eq!(sum, 21 * 43); } @@ -448,7 +448,6 @@ mod test { map.par_iter() .with_thread_pool(&mut thread_pool) - .filter_map(|x| x) .for_each(|(k, v)| assert_eq!(**k * **k, **v)); } @@ -468,7 +467,6 @@ mod test { map.par_iter_mut() .with_thread_pool(&mut thread_pool) - .filter_map(|x| x) .for_each(|(k, v)| **v *= **k); for (k, v) in map.iter() { @@ -492,7 +490,6 @@ mod test { map.into_par_iter() .with_thread_pool(&mut thread_pool) - .filter_map(|x| x) .for_each(|(k, v)| assert_eq!(*k * *k, *v)); } } From c4846a8f61ec09a8886a8c88410f800ad20eb4e7 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Mon, 12 Jan 2026 18:21:11 +0100 Subject: [PATCH 07/14] Remove unnecessary Sync requirements and add tests with non-Sync types such as Cell. --- src/external_trait_impls/paralight.rs | 196 ++++++++++++++++++++------ 1 file changed, 151 insertions(+), 45 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 15dd5ebbc..d2cf234dc 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -135,8 +135,7 @@ impl<'data, K: Sync, V: Sync, A: Allocator> SourceDescriptor } // HashMap.par_iter_mut() -// TODO: Remove Sync requirement on V. -impl<'data, K: Sync + 'data, V: Send + Sync + 'data, S: 'data, A: Allocator + Sync + 'data> +impl<'data, K: Sync + 'data, V: Send + 'data, S: 'data, A: Allocator + Sync + 'data> IntoParallelRefMutSource<'data> for HashMap { type Item = (&'data K, &'data mut V); @@ -152,29 +151,29 @@ pub struct HashMapRefMutParallelSource<'data, K, V, S, A: Allocator> { hash_map: &'data mut HashMap, } -impl<'data, K: Sync, V: Send + Sync, S, A: Allocator + Sync> ParallelSource +impl<'data, K: Sync, V: Send, S, A: Allocator + Sync> ParallelSource for HashMapRefMutParallelSource<'data, K, V, S, A> { type Item = (&'data K, &'data mut V); fn descriptor(self) -> impl SourceDescriptor + Sync { HashMapRefMutSourceDescriptor { - table: &self.hash_map.table, + table: raw_table_wrapper::HashMapRefMut { + inner: &self.hash_map.table, + }, } } } -struct HashMapRefMutSourceDescriptor<'data, K: Sync, V: Send + Sync, A: Allocator> { - table: &'data RawTable<(K, V), A>, +struct HashMapRefMutSourceDescriptor<'data, K: Sync, V: Send, A: Allocator> { + table: raw_table_wrapper::HashMapRefMut<'data, K, V, A>, } -impl SourceCleanup - for HashMapRefMutSourceDescriptor<'_, K, V, A> -{ +impl SourceCleanup for HashMapRefMutSourceDescriptor<'_, K, V, A> { const NEEDS_CLEANUP: bool = false; fn len(&self) -> usize { - self.table.buckets() + self.table.inner.buckets() } unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -182,7 +181,7 @@ impl SourceCleanup } } -impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor +impl<'data, K: Sync, V: Send, A: Allocator> SourceDescriptor for HashMapRefMutSourceDescriptor<'data, K, V, A> { type Item = (&'data K, &'data mut V); @@ -190,10 +189,10 @@ impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); // SAFETY: TODO - let full = unsafe { self.table.is_bucket_full(index) }; + let full = unsafe { self.table.inner.is_bucket_full(index) }; if full { // SAFETY: TODO - let bucket = unsafe { self.table.bucket(index) }; + let bucket = unsafe { self.table.inner.bucket(index) }; // SAFETY: TODO let (key, value) = unsafe { bucket.as_mut() }; Some((key, value)) @@ -204,8 +203,7 @@ impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor } // HashSet.into_par_iter() -// TODO: Remove Sync requirement on T. -impl IntoParallelSource for HashSet { +impl IntoParallelSource for HashSet { type Item = T; type Source = HashSetParallelSource; @@ -219,25 +217,27 @@ pub struct HashSetParallelSource { hash_set: HashSet, } -impl ParallelSource for HashSetParallelSource { +impl ParallelSource for HashSetParallelSource { type Item = T; fn descriptor(self) -> impl SourceDescriptor + Sync { HashSetSourceDescriptor { - table: self.hash_set.map.table, + table: raw_table_wrapper::HashSet { + inner: self.hash_set.map.table, + }, } } } struct HashSetSourceDescriptor { - table: RawTable<(T, ()), A>, + table: raw_table_wrapper::HashSet, } -impl SourceCleanup for HashSetSourceDescriptor { +impl SourceCleanup for HashSetSourceDescriptor { const NEEDS_CLEANUP: bool = core::mem::needs_drop::(); fn len(&self) -> usize { - self.table.buckets() + self.table.inner.buckets() } unsafe fn cleanup_item_range(&self, range: core::ops::Range) { @@ -247,10 +247,10 @@ impl SourceCleanup for HashSetSourceDescriptor SourceCleanup for HashSetSourceDescriptor SourceDescriptor for HashSetSourceDescriptor { +impl SourceDescriptor for HashSetSourceDescriptor { type Item = T; unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); // SAFETY: TODO - let full = unsafe { self.table.is_bucket_full(index) }; + let full = unsafe { self.table.inner.is_bucket_full(index) }; if full { // SAFETY: TODO - let bucket = unsafe { self.table.bucket(index) }; + let bucket = unsafe { self.table.inner.bucket(index) }; // SAFETY: TODO let (t, ()) = unsafe { bucket.read() }; Some(t) @@ -284,15 +284,12 @@ impl Drop for HashSetSourceDescriptor { // Paralight already dropped each missing bucket via calls to cleanup_item_range(), so we // can simply mark all buckets as cleared and let the RawTable destructor do the rest. // TODO: Optimize this to simply deallocate without touching the control bytes. - self.table.clear_no_drop(); + self.table.inner.clear_no_drop(); } } // HashMap.into_par_iter() -// TODO: Remove Sync requirement on K and V. -impl IntoParallelSource - for HashMap -{ +impl IntoParallelSource for HashMap { type Item = (K, V); type Source = HashMapParallelSource; @@ -306,29 +303,29 @@ pub struct HashMapParallelSource { hash_map: HashMap, } -impl ParallelSource +impl ParallelSource for HashMapParallelSource { type Item = (K, V); fn descriptor(self) -> impl SourceDescriptor + Sync { HashMapSourceDescriptor { - table: self.hash_map.table, + table: raw_table_wrapper::HashMap { + inner: self.hash_map.table, + }, } } } struct HashMapSourceDescriptor { - table: RawTable<(K, V), A>, + table: raw_table_wrapper::HashMap, } -impl SourceCleanup - for HashMapSourceDescriptor -{ +impl SourceCleanup for HashMapSourceDescriptor { const NEEDS_CLEANUP: bool = core::mem::needs_drop::<(K, V)>(); fn len(&self) -> usize { - self.table.buckets() + self.table.inner.buckets() } unsafe fn cleanup_item_range(&self, range: core::ops::Range) { @@ -338,10 +335,10 @@ impl SourceCleanup debug_assert!(range.end <= self.len()); for index in range { // SAFETY: TODO - let full = unsafe { self.table.is_bucket_full(index) }; + let full = unsafe { self.table.inner.is_bucket_full(index) }; if full { // SAFETY: TODO - let bucket = unsafe { self.table.bucket(index) }; + let bucket = unsafe { self.table.inner.bucket(index) }; // SAFETY: TODO let key_value = unsafe { bucket.read() }; drop(key_value); @@ -351,18 +348,16 @@ impl SourceCleanup } } -impl SourceDescriptor - for HashMapSourceDescriptor -{ +impl SourceDescriptor for HashMapSourceDescriptor { type Item = (K, V); unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); // SAFETY: TODO - let full = unsafe { self.table.is_bucket_full(index) }; + let full = unsafe { self.table.inner.is_bucket_full(index) }; if full { // SAFETY: TODO - let bucket = unsafe { self.table.bucket(index) }; + let bucket = unsafe { self.table.inner.bucket(index) }; // SAFETY: TODO unsafe { Some(bucket.read()) } } else { @@ -376,17 +371,64 @@ impl Drop for HashMapSourceDescriptor { // Paralight already dropped each missing bucket via calls to cleanup_item_range(), so we // can simply mark all buckets as cleared and let the RawTable destructor do the rest. // TODO: Optimize this to simply deallocate without touching the control bytes. - self.table.clear_no_drop(); + self.table.inner.clear_no_drop(); } } +mod raw_table_wrapper { + use crate::raw::{Allocator, RawTable}; + + pub(super) struct HashSet { + pub(super) inner: RawTable<(T, ()), A>, + } + + // TODO: Does the Allocator need to be Sync too? + unsafe impl Sync for HashSet {} + + pub(super) struct HashMap { + pub(super) inner: RawTable<(K, V), A>, + } + + // TODO: Does the Allocator need to be Sync too? + unsafe impl Sync for HashMap {} + + pub(super) struct HashMapRefMut<'data, K, V, A: Allocator> { + pub(super) inner: &'data RawTable<(K, V), A>, + } + + // TODO: Does the Allocator need to be Sync too? + unsafe impl<'data, K: Sync, V: Send, A: Allocator + Sync> Sync for HashMapRefMut<'data, K, V, A> {} +} + #[cfg(test)] mod test { use super::*; use alloc::boxed::Box; + use core::cell::Cell; use core::ops::Deref; use paralight::iter::{ParallelIteratorExt, ParallelSourceExt}; use paralight::threads::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPoolBuilder}; + use std::hash::{Hash, Hasher}; + + // A cell that implements Hash. + #[derive(PartialEq, Eq)] + struct HashCell(Cell); + + impl HashCell { + fn new(t: T) -> Self { + Self(Cell::new(t)) + } + + fn get(&self) -> T { + self.0.get() + } + } + + impl Hash for HashCell { + fn hash(&self, state: &mut H) { + self.0.get().hash(state) + } + } #[test] fn test_set_par_iter() { @@ -432,6 +474,28 @@ mod test { assert_eq!(sum, 21 * 43); } + #[test] + fn test_set_into_par_iter_send() { + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let mut set = HashSet::new(); + for i in 1..=42 { + set.insert(HashCell::new(i)); + } + + let sum = set + .into_par_iter() + .with_thread_pool(&mut thread_pool) + .map(|x| x.get()) + .sum::(); + assert_eq!(sum, 21 * 43); + } + #[test] fn test_map_par_iter() { let mut thread_pool = ThreadPoolBuilder { @@ -474,6 +538,29 @@ mod test { } } + #[test] + fn test_map_par_iter_mut_send_sync() { + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Cell::new(i)); + } + + map.par_iter_mut() + .with_thread_pool(&mut thread_pool) + .for_each(|(k, v)| *v.get_mut() *= **k); + + for (k, v) in map.iter() { + assert_eq!(**k * **k, v.get()); + } + } + #[test] fn test_map_into_par_iter() { let mut thread_pool = ThreadPoolBuilder { @@ -492,4 +579,23 @@ mod test { .with_thread_pool(&mut thread_pool) .for_each(|(k, v)| assert_eq!(*k * *k, *v)); } + + #[test] + fn test_map_into_par_iter_send() { + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(HashCell::new(i), Cell::new(i * i)); + } + + map.into_par_iter() + .with_thread_pool(&mut thread_pool) + .for_each(|(k, v)| assert_eq!(k.get() * k.get(), v.get())); + } } From 4e444a6315aa16413d1bd9f121cb4ad34a454b09 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Mon, 12 Jan 2026 18:36:00 +0100 Subject: [PATCH 08/14] Add tests that exercise early exit on parallel iterators. --- src/external_trait_impls/paralight.rs | 50 +++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index d2cf234dc..c0b50332b 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -496,6 +496,30 @@ mod test { assert_eq!(sum, 21 * 43); } + #[test] + fn test_set_into_par_iter_find() { + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let mut set = HashSet::new(); + for i in 1..=42 { + set.insert(Box::new(i)); + } + + // The search will exit once an even number is found, this test checks + // (with Miri) that no memory leak happens as a result. + let any_even = set + .into_par_iter() + .with_thread_pool(&mut thread_pool) + .find_any(|x| **x % 2 == 0); + assert!(any_even.is_some()); + assert_eq!(*any_even.unwrap() % 2, 0); + } + #[test] fn test_map_par_iter() { let mut thread_pool = ThreadPoolBuilder { @@ -598,4 +622,30 @@ mod test { .with_thread_pool(&mut thread_pool) .for_each(|(k, v)| assert_eq!(k.get() * k.get(), v.get())); } + + #[test] + fn test_map_into_par_iter_find() { + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i * i)); + } + + // The search will exit once an match is found, this test checks (with + // Miri) that no memory leak happens as a result. + let needle = map + .into_par_iter() + .with_thread_pool(&mut thread_pool) + .find_any(|(k, v)| **k % 2 == 0 && **v % 3 == 0); + assert!(needle.is_some()); + let (k, v) = needle.unwrap(); + assert_eq!(*k % 2, 0); + assert_eq!(*v % 3, 0); + } } From dad1b5e49da2b6a164f319fb260af661ca7c52a0 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Mon, 12 Jan 2026 18:36:44 +0100 Subject: [PATCH 09/14] Write down safety comments. --- src/external_trait_impls/paralight.rs | 137 +++++++++++++++++++++----- 1 file changed, 114 insertions(+), 23 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index c0b50332b..926090621 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -55,12 +55,19 @@ impl<'data, T: Sync, A: Allocator> SourceDescriptor for HashSetRefSourceDescript unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); - // SAFETY: TODO + // SAFETY: The passed index is less than the number of buckets. This is + // ensured by the safety preconditions of `fetch_item()`, given that + // `len()` returned the number of buckets, and is further confirmed by + // the debug assertion. let full = unsafe { self.table.is_bucket_full(index) }; if full { - // SAFETY: TODO + // SAFETY: + // - The table is already allocated. + // - The index is in bounds (see previous safety comment). + // - The table contains elements of type (T, ()). let bucket = unsafe { self.table.bucket(index) }; - // SAFETY: TODO + // SAFETY: The bucket is full, so it's safe to derive a const + // reference from it. let (t, ()) = unsafe { bucket.as_ref() }; Some(t) } else { @@ -121,12 +128,19 @@ impl<'data, K: Sync, V: Sync, A: Allocator> SourceDescriptor unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); - // SAFETY: TODO + // SAFETY: The passed index is less than the number of buckets. This is + // ensured by the safety preconditions of `fetch_item()`, given that + // `len()` returned the number of buckets, and is further confirmed by + // the debug assertion. let full = unsafe { self.table.is_bucket_full(index) }; if full { - // SAFETY: TODO + // SAFETY: + // - The table is already allocated. + // - The index is in bounds (see previous safety comment). + // - The table contains elements of type (K, V). let bucket = unsafe { self.table.bucket(index) }; - // SAFETY: TODO + // SAFETY: The bucket is full, so it's safe to derive a const + // reference from it. unsafe { Some(bucket.as_ref()) } } else { None @@ -188,12 +202,27 @@ impl<'data, K: Sync, V: Send, A: Allocator> SourceDescriptor unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); - // SAFETY: TODO + // SAFETY: The passed index is less than the number of buckets. This is + // ensured by the safety preconditions of `fetch_item()`, given that + // `len()` returned the number of buckets, and is further confirmed by + // the debug assertion. let full = unsafe { self.table.inner.is_bucket_full(index) }; if full { - // SAFETY: TODO + // SAFETY: + // - The table is already allocated. + // - The index is in bounds (see previous safety comment). + // - The table contains elements of type (K, V). let bucket = unsafe { self.table.inner.bucket(index) }; - // SAFETY: TODO + // SAFETY: + // - The bucket is full, i.e. points to a valid value. + // - While the resulting reference is valid, the memory it points to + // isn't accessed through any other pointer. Indeed, the + // `SourceDescriptor` contract ensures that no other call to + // `fetch_item()` will be made at this index while the iterator is + // active. Furthermore, `HashMapRefMutParallelSource` holds a + // mutable reference to the hash map with the 'data lifetime, + // ensuring that no other part of the program accesses the hash + // map while the returned reference exists. let (key, value) = unsafe { bucket.as_mut() }; Some((key, value)) } else { @@ -246,12 +275,25 @@ impl SourceCleanup for HashSetSourceDescriptor { debug_assert!(range.start <= self.len()); debug_assert!(range.end <= self.len()); for index in range { - // SAFETY: TODO + // SAFETY: The passed index is less than the number of buckets. This is + // ensured by the safety preconditions of `cleanup_item_range()`, given + // that `len()` returned the number of buckets, and is further confirmed + // by the debug assertions. let full = unsafe { self.table.inner.is_bucket_full(index) }; if full { - // SAFETY: TODO + // SAFETY: + // - The table is already allocated. + // - The index is in bounds (see previous safety comment). + // - The table contains elements of type (T, ()). let bucket = unsafe { self.table.inner.bucket(index) }; - // SAFETY: TODO + // SAFETY: + // - The bucket points to an aligned value of type (T, ()). + // - The value is initialized, as the bucket is full. + // - No other part of the program reads it, as the `SourceCleanup` + // and `SourceDescriptor` contracts ensure that no other call to + // `fetch_item()` nor `cleanup_item_range()` is made for this + // index; and even though the bucket isn't marked as empty here, + // the Drop implementation clears the table without dropping. let (t, ()) = unsafe { bucket.read() }; drop(t); } @@ -265,12 +307,25 @@ impl SourceDescriptor for HashSetSourceDescriptor { unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); - // SAFETY: TODO + // SAFETY: The passed index is less than the number of buckets. This is + // ensured by the safety preconditions of `fetch_item()`, given that + // `len()` returned the number of buckets, and is further confirmed by + // the debug assertion. let full = unsafe { self.table.inner.is_bucket_full(index) }; if full { - // SAFETY: TODO + // SAFETY: + // - The table is already allocated. + // - The index is in bounds (see previous safety comment). + // - The table contains elements of type (T, ()). let bucket = unsafe { self.table.inner.bucket(index) }; - // SAFETY: TODO + // SAFETY: + // - The bucket points to an aligned value of type (T, ()). + // - The value is initialized, as the bucket is full. + // - No other part of the program reads it, as the `SourceCleanup` + // and `SourceDescriptor` contracts ensure that no other call to + // `fetch_item()` nor `cleanup_item_range()` is made for this + // index; and even though the bucket isn't marked as empty here, + // the Drop implementation clears the table without dropping. let (t, ()) = unsafe { bucket.read() }; Some(t) } else { @@ -281,8 +336,13 @@ impl SourceDescriptor for HashSetSourceDescriptor { impl Drop for HashSetSourceDescriptor { fn drop(&mut self) { - // Paralight already dropped each missing bucket via calls to cleanup_item_range(), so we + // Paralight already dropped each missing* bucket via calls to cleanup_item_range(), so we // can simply mark all buckets as cleared and let the RawTable destructor do the rest. + // + // *Some buckets may be missing because the iterator exited early (e.g. an item was found + // via the find_any() adaptor) or unexpectedly due to a panic (e.g. in the closure passed + // to the for_each() adaptor). + // // TODO: Optimize this to simply deallocate without touching the control bytes. self.table.inner.clear_no_drop(); } @@ -334,12 +394,25 @@ impl SourceCleanup for HashMapSourceDescriptor SourceDescriptor for HashMapSourceDescripto unsafe fn fetch_item(&self, index: usize) -> Option { debug_assert!(index < self.len()); - // SAFETY: TODO + // SAFETY: The passed index is less than the number of buckets. This is + // ensured by the safety preconditions of `fetch_item()`, given that + // `len()` returned the number of buckets, and is further confirmed by + // the debug assertion. let full = unsafe { self.table.inner.is_bucket_full(index) }; if full { - // SAFETY: TODO + // SAFETY: + // - The table is already allocated. + // - The index is in bounds (see previous safety comment). + // - The table contains elements of type (K, V). let bucket = unsafe { self.table.inner.bucket(index) }; - // SAFETY: TODO + // SAFETY: + // - The bucket points to an aligned value of type (K, V). + // - The value is initialized, as the bucket is full. + // - No other part of the program reads it, as the `SourceCleanup` + // and `SourceDescriptor` contracts ensure that no other call to + // `fetch_item()` nor `cleanup_item_range()` is made for this + // index; and even though the bucket isn't marked as empty here, + // the Drop implementation clears the table without dropping. unsafe { Some(bucket.read()) } } else { None @@ -368,8 +454,13 @@ impl SourceDescriptor for HashMapSourceDescripto impl Drop for HashMapSourceDescriptor { fn drop(&mut self) { - // Paralight already dropped each missing bucket via calls to cleanup_item_range(), so we + // Paralight already dropped each missing* bucket via calls to cleanup_item_range(), so we // can simply mark all buckets as cleared and let the RawTable destructor do the rest. + // + // *Some buckets may be missing because the iterator exited early (e.g. an item was found + // via the find_any() adaptor) or unexpectedly due to a panic (e.g. in the closure passed + // to the for_each() adaptor). + // // TODO: Optimize this to simply deallocate without touching the control bytes. self.table.inner.clear_no_drop(); } From a8f527abf27f3e7ce2146d094894963258f475f7 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Wed, 14 Jan 2026 10:23:34 +0100 Subject: [PATCH 10/14] Remove Sync requirement on allocator as the parallel iterators don't (de)allocate the hash table on other threads. --- src/external_trait_impls/paralight.rs | 116 ++++++++++++++++++-------- 1 file changed, 81 insertions(+), 35 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 926090621..4e872285b 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -1,4 +1,4 @@ -use crate::raw::{Allocator, RawTable}; +use crate::raw::Allocator; use crate::{HashMap, HashSet}; use paralight::iter::{ IntoParallelRefMutSource, IntoParallelRefSource, IntoParallelSource, ParallelSource, @@ -6,7 +6,7 @@ use paralight::iter::{ }; // HashSet.par_iter() -impl<'data, T: Sync + 'data, S: 'data, A: Allocator + Sync + 'data> IntoParallelRefSource<'data> +impl<'data, T: Sync + 'data, S: 'data, A: Allocator + 'data> IntoParallelRefSource<'data> for HashSet { type Item = &'data T; @@ -22,27 +22,27 @@ pub struct HashSetRefParallelSource<'data, T, S, A: Allocator> { hash_set: &'data HashSet, } -impl<'data, T: Sync, S, A: Allocator + Sync> ParallelSource - for HashSetRefParallelSource<'data, T, S, A> -{ +impl<'data, T: Sync, S, A: Allocator> ParallelSource for HashSetRefParallelSource<'data, T, S, A> { type Item = &'data T; fn descriptor(self) -> impl SourceDescriptor + Sync { HashSetRefSourceDescriptor { - table: &self.hash_set.map.table, + table: raw_table_wrapper::HashSetRef { + inner: &self.hash_set.map.table, + }, } } } struct HashSetRefSourceDescriptor<'data, T: Sync, A: Allocator> { - table: &'data RawTable<(T, ()), A>, + table: raw_table_wrapper::HashSetRef<'data, T, A>, } impl SourceCleanup for HashSetRefSourceDescriptor<'_, T, A> { const NEEDS_CLEANUP: bool = false; fn len(&self) -> usize { - self.table.buckets() + self.table.inner.buckets() } unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -59,13 +59,13 @@ impl<'data, T: Sync, A: Allocator> SourceDescriptor for HashSetRefSourceDescript // ensured by the safety preconditions of `fetch_item()`, given that // `len()` returned the number of buckets, and is further confirmed by // the debug assertion. - let full = unsafe { self.table.is_bucket_full(index) }; + let full = unsafe { self.table.inner.is_bucket_full(index) }; if full { // SAFETY: // - The table is already allocated. // - The index is in bounds (see previous safety comment). // - The table contains elements of type (T, ()). - let bucket = unsafe { self.table.bucket(index) }; + let bucket = unsafe { self.table.inner.bucket(index) }; // SAFETY: The bucket is full, so it's safe to derive a const // reference from it. let (t, ()) = unsafe { bucket.as_ref() }; @@ -77,7 +77,7 @@ impl<'data, T: Sync, A: Allocator> SourceDescriptor for HashSetRefSourceDescript } // HashMap.par_iter() -impl<'data, K: Sync + 'data, V: Sync + 'data, S: 'data, A: Allocator + Sync + 'data> +impl<'data, K: Sync + 'data, V: Sync + 'data, S: 'data, A: Allocator + 'data> IntoParallelRefSource<'data> for HashMap { type Item = &'data (K, V); @@ -93,27 +93,29 @@ pub struct HashMapRefParallelSource<'data, K, V, S, A: Allocator> { hash_map: &'data HashMap, } -impl<'data, K: Sync, V: Sync, S, A: Allocator + Sync> ParallelSource +impl<'data, K: Sync, V: Sync, S, A: Allocator> ParallelSource for HashMapRefParallelSource<'data, K, V, S, A> { type Item = &'data (K, V); fn descriptor(self) -> impl SourceDescriptor + Sync { HashMapRefSourceDescriptor { - table: &self.hash_map.table, + table: raw_table_wrapper::HashMapRef { + inner: &self.hash_map.table, + }, } } } struct HashMapRefSourceDescriptor<'data, K: Sync, V: Sync, A: Allocator> { - table: &'data RawTable<(K, V), A>, + table: raw_table_wrapper::HashMapRef<'data, K, V, A>, } impl SourceCleanup for HashMapRefSourceDescriptor<'_, K, V, A> { const NEEDS_CLEANUP: bool = false; fn len(&self) -> usize { - self.table.buckets() + self.table.inner.buckets() } unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -132,13 +134,13 @@ impl<'data, K: Sync, V: Sync, A: Allocator> SourceDescriptor // ensured by the safety preconditions of `fetch_item()`, given that // `len()` returned the number of buckets, and is further confirmed by // the debug assertion. - let full = unsafe { self.table.is_bucket_full(index) }; + let full = unsafe { self.table.inner.is_bucket_full(index) }; if full { // SAFETY: // - The table is already allocated. // - The index is in bounds (see previous safety comment). // - The table contains elements of type (K, V). - let bucket = unsafe { self.table.bucket(index) }; + let bucket = unsafe { self.table.inner.bucket(index) }; // SAFETY: The bucket is full, so it's safe to derive a const // reference from it. unsafe { Some(bucket.as_ref()) } @@ -149,7 +151,7 @@ impl<'data, K: Sync, V: Sync, A: Allocator> SourceDescriptor } // HashMap.par_iter_mut() -impl<'data, K: Sync + 'data, V: Send + 'data, S: 'data, A: Allocator + Sync + 'data> +impl<'data, K: Sync + 'data, V: Send + 'data, S: 'data, A: Allocator + 'data> IntoParallelRefMutSource<'data> for HashMap { type Item = (&'data K, &'data mut V); @@ -165,7 +167,7 @@ pub struct HashMapRefMutParallelSource<'data, K, V, S, A: Allocator> { hash_map: &'data mut HashMap, } -impl<'data, K: Sync, V: Send, S, A: Allocator + Sync> ParallelSource +impl<'data, K: Sync, V: Send, S, A: Allocator> ParallelSource for HashMapRefMutParallelSource<'data, K, V, S, A> { type Item = (&'data K, &'data mut V); @@ -232,7 +234,7 @@ impl<'data, K: Sync, V: Send, A: Allocator> SourceDescriptor } // HashSet.into_par_iter() -impl IntoParallelSource for HashSet { +impl IntoParallelSource for HashSet { type Item = T; type Source = HashSetParallelSource; @@ -246,7 +248,7 @@ pub struct HashSetParallelSource { hash_set: HashSet, } -impl ParallelSource for HashSetParallelSource { +impl ParallelSource for HashSetParallelSource { type Item = T; fn descriptor(self) -> impl SourceDescriptor + Sync { @@ -349,7 +351,7 @@ impl Drop for HashSetSourceDescriptor { } // HashMap.into_par_iter() -impl IntoParallelSource for HashMap { +impl IntoParallelSource for HashMap { type Item = (K, V); type Source = HashMapParallelSource; @@ -363,9 +365,7 @@ pub struct HashMapParallelSource { hash_map: HashMap, } -impl ParallelSource - for HashMapParallelSource -{ +impl ParallelSource for HashMapParallelSource { type Item = (K, V); fn descriptor(self) -> impl SourceDescriptor + Sync { @@ -469,26 +469,72 @@ impl Drop for HashMapSourceDescriptor { mod raw_table_wrapper { use crate::raw::{Allocator, RawTable}; - pub(super) struct HashSet { - pub(super) inner: RawTable<(T, ()), A>, + /// Helper to implement HashSet::par_iter(). + pub(super) struct HashSetRef<'data, T, A: Allocator> { + pub(super) inner: &'data RawTable<(T, ()), A>, } - // TODO: Does the Allocator need to be Sync too? - unsafe impl Sync for HashSet {} + // SAFETY: + // - This wrapper type is shared with worker threads, that extract references of type `&T`. + // This requires `&T: Send`. Therefore, this wrapper is Sync if and only if `T` is Sync. + // - The allocator doesn't need any Send/Sync bounds, because the parallel iterators neither + // allocate nor deallocate the hash table. + unsafe impl Sync for HashSetRef<'_, T, A> {} - pub(super) struct HashMap { - pub(super) inner: RawTable<(K, V), A>, + /// Helper to implement HashMap::par_iter(). + pub(super) struct HashMapRef<'data, K, V, A: Allocator> { + pub(super) inner: &'data RawTable<(K, V), A>, } - // TODO: Does the Allocator need to be Sync too? - unsafe impl Sync for HashMap {} + // SAFETY: + // - This wrapper type is shared with worker threads, that extract references of type + // `(&K, &V)`. This requires `(&K, &V): Send`. Therefore, this wrapper is Sync if and only + // if `K` and `V` are Sync. + // - The allocator doesn't need any Send/Sync bounds, because the parallel iterators neither + // allocate nor deallocate the hash table. + unsafe impl Sync for HashMapRef<'_, K, V, A> {} + /// Helper to implement HashMap::par_iter_mut(). pub(super) struct HashMapRefMut<'data, K, V, A: Allocator> { pub(super) inner: &'data RawTable<(K, V), A>, } - // TODO: Does the Allocator need to be Sync too? - unsafe impl<'data, K: Sync, V: Send, A: Allocator + Sync> Sync for HashMapRefMut<'data, K, V, A> {} + // SAFETY: + // - This wrapper type is shared with worker threads, that extract references of type + // `(&K, &mut V)`. This requires `(&K, &mut V): Send`. Therefore, this wrapper is Sync if + // and only if `K` is Sync and `V` is Send. + // - The allocator doesn't need any Send/Sync bounds, because the parallel iterators neither + // allocate nor deallocate the hash table. + unsafe impl Sync for HashMapRefMut<'_, K, V, A> {} + + /// Helper to implement HashSet::into_par_iter(). + pub(super) struct HashSet { + pub(super) inner: RawTable<(T, ()), A>, + } + + // SAFETY: + // - This wrapper type is shared with worker threads, that extract values of type `T`. + // Therefore, this wrapper is Sync if and only if `T` is Send. + // - The allocator doesn't need any Send/Sync bounds, because the parallel iterators neither + // allocate nor deallocate the hash table. Note that `HashSetSourceDescriptor::drop` + // deallocates the hash table, but that's unrelated to this Sync bound (`drop()` takes a + // `&mut self` input, so only Send bounds are relevant for that). + unsafe impl Sync for HashSet {} + + /// Helper to implement HashMap::into_par_iter(). + pub(super) struct HashMap { + pub(super) inner: RawTable<(K, V), A>, + } + + // SAFETY: + // - This wrapper type is shared with worker threads, that extract values of type `(K, V)`. + // This requires `(K, V): Send`. Therefore, this wrapper is Sync if and only if `K` and `V` + // are Send. + // - The allocator doesn't need any Send/Sync bounds, because the parallel iterators neither + // allocate nor deallocate the hash table. Note that `HashMapSourceDescriptor::drop` + // deallocates the hash table, but that's unrelated to this Sync bound (`drop()` takes a + // `&mut self` input, so only Send bounds are relevant for that). + unsafe impl Sync for HashMap {} } #[cfg(test)] From 0b102865a240c3a8c0dbd2e139904733c08d9fd4 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Wed, 14 Jan 2026 10:41:35 +0100 Subject: [PATCH 11/14] Directly deallocate the table without clearing the control bytes. --- src/external_trait_impls/paralight.rs | 14 ++++++++------ src/raw/mod.rs | 28 +++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 4e872285b..12abeeabb 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -338,15 +338,16 @@ impl SourceDescriptor for HashSetSourceDescriptor { impl Drop for HashSetSourceDescriptor { fn drop(&mut self) { + // SAFETY: // Paralight already dropped each missing* bucket via calls to cleanup_item_range(), so we // can simply mark all buckets as cleared and let the RawTable destructor do the rest. // // *Some buckets may be missing because the iterator exited early (e.g. an item was found // via the find_any() adaptor) or unexpectedly due to a panic (e.g. in the closure passed // to the for_each() adaptor). - // - // TODO: Optimize this to simply deallocate without touching the control bytes. - self.table.inner.clear_no_drop(); + unsafe { + self.table.inner.deallocate_cleared_table(); + } } } @@ -454,15 +455,16 @@ impl SourceDescriptor for HashMapSourceDescripto impl Drop for HashMapSourceDescriptor { fn drop(&mut self) { + // SAFETY: // Paralight already dropped each missing* bucket via calls to cleanup_item_range(), so we // can simply mark all buckets as cleared and let the RawTable destructor do the rest. // // *Some buckets may be missing because the iterator exited early (e.g. an item was found // via the find_any() adaptor) or unexpectedly due to a panic (e.g. in the closure passed // to the for_each() adaptor). - // - // TODO: Optimize this to simply deallocate without touching the control bytes. - self.table.inner.clear_no_drop(); + unsafe { + self.table.inner.deallocate_cleared_table(); + } } } diff --git a/src/raw/mod.rs b/src/raw/mod.rs index c9870bf4c..e8510c60d 100644 --- a/src/raw/mod.rs +++ b/src/raw/mod.rs @@ -1483,6 +1483,34 @@ impl RawTable { mem::forget(self); alloc } + + /// Deallocates the table without dropping any element. + /// + /// # Safety + /// + /// It's the responsibility of the caller to ensure that all elements + /// have been separately read and dropped. However, the caller doesn't + /// need to clear the control bytes beforehand. + #[cfg(feature = "paralight")] + #[cfg_attr(feature = "inline-more", inline)] + pub(crate) unsafe fn deallocate_cleared_table(&mut self) { + if self.table.is_empty_singleton() { + return; + } + + // Avoid `Option::unwrap_or_else` because it bloats LLVM IR. + let (layout, ctrl_offset) = + match Self::TABLE_LAYOUT.calculate_layout_for(self.table.buckets()) { + Some(lco) => lco, + None => unsafe { hint::unreachable_unchecked() }, + }; + let ptr = + unsafe { NonNull::new_unchecked(self.table.ctrl.as_ptr().sub(ctrl_offset).cast()) }; + self.alloc.deallocate(ptr, layout); + + // Reset the table, so that it can be dropped without double free. + self.table = RawTableInner::NEW; + } } unsafe impl Send for RawTable From cbd5d25466d4d21a46b847cc5cb74cfd13f07e9e Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Wed, 14 Jan 2026 10:55:07 +0100 Subject: [PATCH 12/14] Enable stricter lints for Paralight integration. --- src/external_trait_impls/paralight.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 12abeeabb..067de2b14 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -1,3 +1,9 @@ +#![forbid( + unsafe_op_in_unsafe_fn, + clippy::multiple_unsafe_ops_per_block, + clippy::undocumented_unsafe_blocks +)] + use crate::raw::Allocator; use crate::{HashMap, HashSet}; use paralight::iter::{ From 8494044f0cc92d692b1d1fc3f57c326d83dc15a9 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Wed, 14 Jan 2026 11:03:39 +0100 Subject: [PATCH 13/14] Add paralight feature to README and CI. --- README.md | 1 + ci/run.sh | 2 +- ci/tools.sh | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1a9de1881..79185500a 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,7 @@ This crate has the following Cargo features: - `nightly`: Enables nightly-only features including: `#[may_dangle]`. - `serde`: Enables serde serialization support. - `rayon`: Enables rayon parallel iterator support. +- `paralight`: Enables paralight parallel iterator support. - `equivalent`: Allows comparisons to be customized with the `Equivalent` trait. (enabled by default) - `raw-entry`: Enables access to the deprecated `RawEntry` API. - `inline-more`: Adds inline hints to most functions, improving run-time performance at the cost diff --git a/ci/run.sh b/ci/run.sh index 26899272e..e4caaa4c3 100644 --- a/ci/run.sh +++ b/ci/run.sh @@ -33,7 +33,7 @@ if [ "${NO_STD}" = "1" ]; then FEATURES="rustc-internal-api" OP="build" else - FEATURES="rustc-internal-api,serde,rayon" + FEATURES="rustc-internal-api,serde,rayon,paralight" OP="test" fi diff --git a/ci/tools.sh b/ci/tools.sh index ce3be2840..91b8a46e7 100644 --- a/ci/tools.sh +++ b/ci/tools.sh @@ -29,14 +29,14 @@ if rustc --version | grep --quiet nightly ; then export RUSTDOCFLAGS="-Zunstable-options --check" fi -cargo doc --no-deps --features serde,rayon +cargo doc --no-deps --features serde,rayon,paralight if retry rustup component add rustfmt ; then cargo fmt --all -- --check fi if retry rustup component add clippy ; then - cargo clippy --all --tests --features serde,rayon -- -D warnings + cargo clippy --all --tests --features serde,rayon,paralight -- -D warnings fi if command -v taplo ; then From a386031bc7d7b9610f0d775d0c34e456d105ce36 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Wed, 14 Jan 2026 11:11:25 +0100 Subject: [PATCH 14/14] Rebase on top of the main branch. --- src/external_trait_impls/paralight.rs | 10 +++++----- src/raw/mod.rs | 6 ++++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 067de2b14..d108c087f 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -48,7 +48,7 @@ impl SourceCleanup for HashSetRefSourceDescriptor<'_, T, const NEEDS_CLEANUP: bool = false; fn len(&self) -> usize { - self.table.inner.buckets() + self.table.inner.num_buckets() } unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -121,7 +121,7 @@ impl SourceCleanup for HashMapRefSourceDescripto const NEEDS_CLEANUP: bool = false; fn len(&self) -> usize { - self.table.inner.buckets() + self.table.inner.num_buckets() } unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -195,7 +195,7 @@ impl SourceCleanup for HashMapRefMutSourceDescri const NEEDS_CLEANUP: bool = false; fn len(&self) -> usize { - self.table.inner.buckets() + self.table.inner.num_buckets() } unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -274,7 +274,7 @@ impl SourceCleanup for HashSetSourceDescriptor { const NEEDS_CLEANUP: bool = core::mem::needs_drop::(); fn len(&self) -> usize { - self.table.inner.buckets() + self.table.inner.num_buckets() } unsafe fn cleanup_item_range(&self, range: core::ops::Range) { @@ -392,7 +392,7 @@ impl SourceCleanup for HashMapSourceDescriptor(); fn len(&self) -> usize { - self.table.inner.buckets() + self.table.inner.num_buckets() } unsafe fn cleanup_item_range(&self, range: core::ops::Range) { diff --git a/src/raw/mod.rs b/src/raw/mod.rs index e8510c60d..62e6f7ca9 100644 --- a/src/raw/mod.rs +++ b/src/raw/mod.rs @@ -1500,13 +1500,15 @@ impl RawTable { // Avoid `Option::unwrap_or_else` because it bloats LLVM IR. let (layout, ctrl_offset) = - match Self::TABLE_LAYOUT.calculate_layout_for(self.table.buckets()) { + match Self::TABLE_LAYOUT.calculate_layout_for(self.table.num_buckets()) { Some(lco) => lco, None => unsafe { hint::unreachable_unchecked() }, }; let ptr = unsafe { NonNull::new_unchecked(self.table.ctrl.as_ptr().sub(ctrl_offset).cast()) }; - self.alloc.deallocate(ptr, layout); + unsafe { + self.alloc.deallocate(ptr, layout); + } // Reset the table, so that it can be dropped without double free. self.table = RawTableInner::NEW;