Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 54 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ members = [
]

[workspace.package]
version = "0.1.10"
version = "0.1.11"
edition = "2024"
rust-version = "1.90"
authors = ["Theo M. Bulut <vertexclique@gmail.com>"]
Expand All @@ -20,12 +20,12 @@ readme = "README.md"
exclude = ["model_chk"]

[workspace.dependencies]
kovan = { version = "0.1.10", path = "kovan" }
kovan-channel = { version = "0.1.10", path = "kovan-channel" }
kovan-map = { version = "0.1.10", path = "kovan-map" }
kovan-mvcc = { version = "0.1.10", path = "kovan-mvcc" }
kovan-stm = { version = "0.1.10", path = "kovan-stm" }
kovan-queue = { version = "0.1.10", path = "kovan-queue" }
kovan = { version = "0.1.11", path = "kovan" }
kovan-channel = { version = "0.1.11", path = "kovan-channel" }
kovan-map = { version = "0.1.11", path = "kovan-map" }
kovan-mvcc = { version = "0.1.11", path = "kovan-mvcc" }
kovan-stm = { version = "0.1.11", path = "kovan-stm" }
kovan-queue = { version = "0.1.11", path = "kovan-queue" }

[workspace.metadata.docs.rs]
rustdoc-args = ["-C", "target-feature=+cmpxchg16b"]
Expand Down
58 changes: 55 additions & 3 deletions kovan-channel/src/flavors/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crossbeam_utils::Backoff;
use std::collections::LinkedList;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;

struct Channel<T: 'static> {
sender: unbounded::Sender<T>,
Expand All @@ -12,6 +13,10 @@ struct Channel<T: 'static> {
len: AtomicUsize,
senders: Mutex<LinkedList<Arc<dyn Notifier>>>,
receivers: Mutex<LinkedList<Arc<dyn Notifier>>>,
/// Number of live bounded Sender handles.
sender_count: AtomicUsize,
/// Set when all bounded senders are dropped.
disconnected: std::sync::atomic::AtomicBool,
}

/// The sending half of a bounded channel.
Expand All @@ -21,12 +26,28 @@ pub struct Sender<T: 'static> {

impl<T: 'static> Clone for Sender<T> {
fn clone(&self) -> Self {
self.inner.sender_count.fetch_add(1, Ordering::Relaxed);
Self {
inner: self.inner.clone(),
}
}
}

impl<T: 'static> Drop for Sender<T> {
fn drop(&mut self) {
if self.inner.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.inner.disconnected.store(true, Ordering::Release);
// Wake all blocked receivers
{
let mut receivers = self.inner.receivers.lock().unwrap();
while let Some(signal) = receivers.pop_front() {
signal.notify();
}
}
}
}
}

unsafe impl<T: 'static + Send> Send for Sender<T> {}
unsafe impl<T: 'static + Send> Sync for Sender<T> {}

Expand Down Expand Up @@ -56,6 +77,8 @@ impl<T: 'static> Channel<T> {
len: AtomicUsize::new(0),
senders: Mutex::new(LinkedList::new()),
receivers: Mutex::new(LinkedList::new()),
sender_count: AtomicUsize::new(1),
disconnected: std::sync::atomic::AtomicBool::new(false),
}
}
}
Expand Down Expand Up @@ -242,30 +265,59 @@ impl<T: 'static> Receiver<T> {
}
}

/// Returns `true` if all senders have been dropped.
pub fn is_disconnected(&self) -> bool {
self.inner.disconnected.load(Ordering::Acquire)
}

/// Receives a message from the channel, blocking if empty.
///
/// Returns `None` when the channel is empty **and** all senders have been dropped.
pub fn recv(&self) -> Option<T> {
if let Some(msg) = self.try_recv() {
return Some(msg);
}

if self.is_disconnected() {
return self.try_recv();
}

loop {
let signal = Arc::new(Signal::new());
// Register signal
{
let mut receivers = self.inner.receivers.lock().unwrap();
receivers.push_back(signal.clone());
}

// Re-check
if let Some(msg) = self.try_recv() {
return Some(msg);
}

signal.wait();
if self.is_disconnected() {
return self.try_recv();
}

// Wait, checking the queue on every wakeup.
loop {
if signal.is_notified() {
break;
}
thread::park();
if let Some(msg) = self.try_recv() {
return Some(msg);
}
if self.is_disconnected() {
return self.try_recv();
}
}

if let Some(msg) = self.try_recv() {
return Some(msg);
}

if self.is_disconnected() {
return None;
}
}
}

Expand Down
Loading