Deadqueue is a dead simple async queue with back pressure support.
This crate provides three implementations:
-
Unlimited (
deadqueue::unlimited::Queue)- Based on
crossbeam_queue::SegQueue - Has unlimitied capacity and no back pressure on push
- Enabled via the
unlimitedfeature in yourCargo.toml
- Based on
-
Resizable (
deadqueue::resizable::Queue)- Based on
deadqueue::unlimited::Queue - Has limited capacity with back pressure on push
- Supports resizing
- Enabled via the
resizablefeature in yourCargo.toml
- Based on
-
Limited (
deadqueue::limited::Queue)- Based on
crossbeam_queue::ArrayQueue - Has limit capacity with back pressure on push
- Does not support resizing
- Enabled via the
limitedfeature in yourCargo.toml
- Based on
| Feature | Description | Extra dependencies | Default |
|---|---|---|---|
unlimited |
Enable unlimited queue implementation | – | yes |
resizable |
Enable resizable queue implementation | deadqueue/unlimited |
yes |
limited |
Enable limited queue implementation | – | yes |
use std::sync::Arc;
use tokio::time::{sleep, Duration};
const TASK_COUNT: usize = 1000;
const WORKER_COUNT: usize = 10;
type TaskQueue = deadqueue::limited::Queue<usize>;
#[tokio::main]
async fn main() {
let queue = Arc::new(TaskQueue::new(TASK_COUNT));
for i in 0..TASK_COUNT {
queue.try_push(i).unwrap();
}
for worker in 0..WORKER_COUNT {
let queue = queue.clone();
tokio::spawn(async move {
loop {
let task = queue.pop().await;
println!("worker[{}] processing task[{}] ...", worker, task);
}
});
}
println!("Waiting for workers to finish...");
queue.wait_empty().await;
println!("All tasks done. :-)");
}Deadqueue is by no means the only queue implementation available. It does things a little different and provides features that other implementations are lacking:
-
Resizable queue. Usually you have to pick between
limitedandunlimitedqueues. This crate features aresizableQueue which can be resized as needed. This is probably a big unique selling point of this crate. -
Introspection support. The methods
.len(),.capacity()and.available()provide access the current state of the queue. -
Fair scheduling. Tasks calling
popwill receive items in a first-come-first-serve fashion. This is mainly due to the use oftokio::sync::Semaphorewhich is fair by nature. -
One struct, not two. The channels of
tokio,async_stdandfutures-intrusivesplit the queue in two structs (SenderandReceiver) which makes the usage sligthly more complicated. -
Bring your own
Arc. Since there is no separation betweenSenderandReceiverthere is also no need for an internalArc. (All implementations that split the channel into aSenderandReceiverneed some kind ofArcinternally.) -
Fully concurrent access. No need to wrap the
Receiverpart in aMutex. All methods support concurrent accesswithout the need for an additional synchronization primitive. -
Support for
try__methods. The methodstry_pushandtry_popcan be used to access the queue from non-blocking synchroneous code. -
Support for detecting when the queue becomes empty or full, using the
wait_empty,subscribe_empty,wait_fullandsubscribe_fullmethods.
| Crate | Limitations | Documentation |
|---|---|---|
tokio |
No resizable queue. No introspection support. Synchronization of Receiver needed. |
tokio::sync::mpsc::channel, tokio::sync::mpsc::unbounded_channel |
async-std |
No resizable or unlimited queue. No introspection support. No try_send or try_recv methods. |
async_std::sync::channel |
futures |
No resizable queue. No introspection support. | futures::channel::mpsc::channel, futures::channel::mpsc::unbounded |
Licensed under either of
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.