Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ categories = ["asynchronous", "concurrency", "development-tools"]
[features]
default = []
logging = []
events = []
controller = []

[package.metadata.docs.rs]
features = ["logging", "events"]
Expand All @@ -28,3 +28,4 @@ thiserror = "2.0.16"
anyhow = "1.0.100"
futures = "0.3.31"
rand = "0.9.2"
dashmap = "6.1.0"
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ taskvisor = "0.0.7"

> Optional features:
> - `logging` enables the built-in [`LogWriter`], (demo logger);
> - `controller` enables the slot-based [`Controller`] with admission policies.

```toml
[dependencies]
Expand Down Expand Up @@ -189,7 +190,8 @@ Check out the [examples](./examples) directory for:
- [retry_with_backoff.rs](examples/retry_with_backoff.rs): retry loop with exponential backoff and jitter
- [dynamic_add_remove.rs](examples/dynamic_add_remove.rs): add/remove tasks at runtime via API
- [custom_subscriber.rs](examples/custom_subscriber.rs): custom subscriber reacting to events
- [task_cancel.rs](examples/task_cancel.rs): task cancellation from outside
- [task_cancel.rs](examples/task_cancel.rs): task cancellation from outside
- [controller.rs](examples/controller.rs): examples with `controller` feature

```bash
# basic / retry / dynamic do not require extra features
Expand All @@ -198,6 +200,7 @@ cargo run --example retry_with_backoff
cargo run --example dynamic_add_remove
cargo run --example custom_subscriber
cargo run --example task_cancel --features logging
cargo run --example controller --features controller
```

## 🀝 Contributing
Expand Down
169 changes: 169 additions & 0 deletions examples/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
//! # Example: Controller Demo
//! Visual demonstration of the controller’s slot-based admission model.
//!
//! ```text
//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
//! β”‚ application β”‚
//! β”‚ (user submits)β”‚
//! β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
//! submit(...)
//! β–Ό
//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
//! β”‚ controller β”‚
//! β”‚ (admission logic) β”‚
//! β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
//! publishes events
//! β–Ό
//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
//! β”‚ supervisor β”‚
//! β”‚ (orchestrator) β”‚
//! β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
//! spawns actors
//! β–Ό
//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
//! β”‚ task actor β”‚
//! β”‚ (run / retry loop)β”‚
//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
//! ```
//!
//! Demonstrates the controller's admission policies:
//! - Queue: tasks execute sequentially (same slot name)
//! - Replace: new submission cancels running task (latest wins)
//! - DropIfRunning: new submission ignored if slot busy
//!
//! Shows how controller events are published and can be observed via `LogWriter`.
//!
//! ## Run
//! ```bash
//! cargo run --example basic_controller --features "controller,logging"
//! ```
#[cfg(not(feature = "controller"))]
compile_error!(
"This example requires the 'controller' feature. Run with: --features controller,logging"
);

#[cfg(not(feature = "logging"))]
compile_error!(
"This example requires the 'logging' feature. Run with: --features controller,logging"
);

use std::{sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;

use taskvisor::LogWriter;
use taskvisor::{
BackoffPolicy, Config, RestartPolicy, Supervisor, TaskError, TaskFn, TaskRef, TaskSpec,
};
use taskvisor::{ControllerConfig, ControllerSpec};

/// Creates a task that simulates work.
fn make_worker(name: &'static str, work_ms: u64) -> TaskSpec {
let task_name = name.to_string();

let task: TaskRef = TaskFn::arc(name, move |ctx: CancellationToken| {
let name = task_name.clone();

async move {
println!("[{name}] started (work={work_ms}ms)");
let start = tokio::time::Instant::now();

loop {
if ctx.is_cancelled() {
println!("[{name}] cancelled after {:?}", start.elapsed());
return Ok::<(), TaskError>(());
}
if start.elapsed().as_millis() >= work_ms as u128 {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}

println!("[{name}] completed in {:?}", start.elapsed());
Ok(())
}
});
TaskSpec::new(
task,
RestartPolicy::Never,
BackoffPolicy::default(),
Some(Duration::from_secs(10)),
)
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
println!("=== Controller Demo ===\n");

let sup = Supervisor::builder(Config::default())
.with_subscribers(vec![Arc::new(LogWriter::default())])
.with_controller(ControllerConfig {
queue_capacity: 100,
slot_capacity: 10,
})
.build();

// Spawn supervisor in background
let sup_clone = Arc::clone(&sup);
let sup_task = tokio::spawn(async move {
if let Err(e) = sup_clone.run(vec![]).await {
eprintln!("Supervisor error: {e}");
}
});
tokio::time::sleep(Duration::from_millis(200)).await;

// === Demo 1: Queue Policy (sequential execution in the SAME slot) ===
println!("\n--- Demo 1: Queue Policy (sequential execution) ---");
sup.submit(ControllerSpec::queue(make_worker("build", 500)))
.await?;
sup.submit(ControllerSpec::queue(make_worker("build", 500)))
.await?;
sup.submit(ControllerSpec::queue(make_worker("build", 500)))
.await?;
tokio::time::sleep(Duration::from_millis(2000)).await;

// === Demo 2: Replace Policy (cancel current and start latest on terminal) ===
println!("\n--- Demo 2: Replace Policy (cancel and restart) ---");
sup.submit(ControllerSpec::replace(make_worker("deploy", 1000)))
.await?;
tokio::time::sleep(Duration::from_millis(300)).await;

sup.submit(ControllerSpec::replace(make_worker("deploy", 1000)))
.await?;
tokio::time::sleep(Duration::from_millis(300)).await;

sup.submit(ControllerSpec::replace(make_worker("deploy", 1000)))
.await?;
tokio::time::sleep(Duration::from_millis(1500)).await;

// === Demo 3: DropIfRunning Policy (ignore if busy) ===
println!("\n--- Demo 3: DropIfRunning Policy (ignore if busy) ---");
sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800)))
.await?;
tokio::time::sleep(Duration::from_millis(200)).await;

// These will be ignored (same slot is running)
sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800)))
.await?;
sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800)))
.await?;
tokio::time::sleep(Duration::from_millis(1000)).await;

// After slot is idle, this will execute
sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800)))
.await?;
tokio::time::sleep(Duration::from_millis(1000)).await;

// === Demo 4: Queue Full (rejections) ===
println!("\n--- Demo 4: Queue Full (rejection) ---");
// Same slot "batch": slot_capacity = 10 β†’ 2 submissions will be rejected.
for _ in 1..=12 {
sup.submit(ControllerSpec::queue(make_worker("batch", 200)))
.await?;
}
tokio::time::sleep(Duration::from_secs(4)).await;

println!("\n--- Demo Complete ---");
drop(sup);
let _ = sup_task.await;
Ok(())
}
5 changes: 1 addition & 4 deletions examples/custom_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,7 @@ impl Subscribe for ConsoleSubscriber {
}

// === Ignored ===
EventKind::SubscriberPanicked
| EventKind::SubscriberOverflow
| EventKind::TaskAddRequested
| EventKind::TaskRemoveRequested => {}
_ => {}
}
}

Expand Down
42 changes: 42 additions & 0 deletions src/controller/admission.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//! # Per-task admission policy
//!
//! Controller treats tasks as **slots** identified by `name`.
//! At any given time **one** task may run in a slot.
//! When a new request for the same slot arrives, the admission policy decides what to do.
//!
//! ## Variants
//! - `DropIfRunning`: If the slot is already running, **ignore** the new request.
//! - `Replace`: **Stop** the running task (cancel/remove) and start the new one.
//! - `Queue`: **Enqueue** the new request (FIFO).
//!
//! ## Invariants
//! - Tasks within the same slot never run in parallel (use dynamic names if you need parallel execution).
//! - Queued requests are executed strictly in submission order.

/// Policy controlling how new submissions are handled when a slot is busy.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ControllerAdmission {
/// Skip task if already running.
///
/// Use when:
/// - You only care about the latest state
/// - Redundant work should be avoided
/// - Example: periodic health checks
DropIfRunning,

/// Stop current task and start new one immediately.
///
/// Use when:
/// - New request invalidates old one
/// - Priority to latest submission
/// - Example: deployment pipeline (new commit cancels old build)
Replace,

/// Queue the task (FIFO order).
///
/// Use when:
/// - All submissions must execute
/// - Order matters
/// - Example: sequential processing pipeline
Queue,
}
20 changes: 20 additions & 0 deletions src/controller/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/// Configuration for the controller.
#[derive(Clone, Debug)]
pub struct ControllerConfig {
/// Capacity of the submission queue.
///
/// When full, `submit()` will wait and `try_submit()` will return `Full` error.
pub queue_capacity: usize,

/// Capacity of the slots.
pub slot_capacity: usize,
}

impl Default for ControllerConfig {
fn default() -> Self {
Self {
queue_capacity: 1024,
slot_capacity: 100,
}
}
}
Loading