diff --git a/Cargo.toml b/Cargo.toml index 68cbc43..caa4669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ categories = ["asynchronous", "concurrency", "development-tools"] [features] default = [] logging = [] -events = [] +controller = [] [package.metadata.docs.rs] features = ["logging", "events"] @@ -28,3 +28,4 @@ thiserror = "2.0.16" anyhow = "1.0.100" futures = "0.3.31" rand = "0.9.2" +dashmap = "6.1.0" diff --git a/README.md b/README.md index 8a699a0..800061f 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ taskvisor = "0.0.7" > Optional features: > - `logging` enables the built-in [`LogWriter`], (demo logger); +> - `controller` enables the slot-based [`Controller`] with admission policies. ```toml [dependencies] @@ -189,7 +190,8 @@ Check out the [examples](./examples) directory for: - [retry_with_backoff.rs](examples/retry_with_backoff.rs): retry loop with exponential backoff and jitter - [dynamic_add_remove.rs](examples/dynamic_add_remove.rs): add/remove tasks at runtime via API - [custom_subscriber.rs](examples/custom_subscriber.rs): custom subscriber reacting to events -- [task_cancel.rs](examples/task_cancel.rs): task cancellation from outside +- [task_cancel.rs](examples/task_cancel.rs): task cancellation from outside +- [controller.rs](examples/controller.rs): examples with `controller` feature ```bash # basic / retry / dynamic do not require extra features @@ -198,6 +200,7 @@ cargo run --example retry_with_backoff cargo run --example dynamic_add_remove cargo run --example custom_subscriber cargo run --example task_cancel --features logging +cargo run --example controller --features controller ``` ## 🀝 Contributing diff --git a/examples/controller.rs b/examples/controller.rs new file mode 100644 index 0000000..1109780 --- /dev/null +++ b/examples/controller.rs @@ -0,0 +1,169 @@ +//! # Example: Controller Demo +//! Visual demonstration of the controller’s slot-based admission model. +//! +//! ```text +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ application β”‚ +//! β”‚ (user submits)β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ +//! submit(...) +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ controller β”‚ +//! β”‚ (admission logic) β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! publishes events +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ supervisor β”‚ +//! β”‚ (orchestrator) β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! spawns actors +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ task actor β”‚ +//! β”‚ (run / retry loop)β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! ``` +//! +//! Demonstrates the controller's admission policies: +//! - Queue: tasks execute sequentially (same slot name) +//! - Replace: new submission cancels running task (latest wins) +//! - DropIfRunning: new submission ignored if slot busy +//! +//! Shows how controller events are published and can be observed via `LogWriter`. +//! +//! ## Run +//! ```bash +//! cargo run --example basic_controller --features "controller,logging" +//! ``` +#[cfg(not(feature = "controller"))] +compile_error!( + "This example requires the 'controller' feature. Run with: --features controller,logging" +); + +#[cfg(not(feature = "logging"))] +compile_error!( + "This example requires the 'logging' feature. Run with: --features controller,logging" +); + +use std::{sync::Arc, time::Duration}; +use tokio_util::sync::CancellationToken; + +use taskvisor::LogWriter; +use taskvisor::{ + BackoffPolicy, Config, RestartPolicy, Supervisor, TaskError, TaskFn, TaskRef, TaskSpec, +}; +use taskvisor::{ControllerConfig, ControllerSpec}; + +/// Creates a task that simulates work. +fn make_worker(name: &'static str, work_ms: u64) -> TaskSpec { + let task_name = name.to_string(); + + let task: TaskRef = TaskFn::arc(name, move |ctx: CancellationToken| { + let name = task_name.clone(); + + async move { + println!("[{name}] started (work={work_ms}ms)"); + let start = tokio::time::Instant::now(); + + loop { + if ctx.is_cancelled() { + println!("[{name}] cancelled after {:?}", start.elapsed()); + return Ok::<(), TaskError>(()); + } + if start.elapsed().as_millis() >= work_ms as u128 { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + println!("[{name}] completed in {:?}", start.elapsed()); + Ok(()) + } + }); + TaskSpec::new( + task, + RestartPolicy::Never, + BackoffPolicy::default(), + Some(Duration::from_secs(10)), + ) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + println!("=== Controller Demo ===\n"); + + let sup = Supervisor::builder(Config::default()) + .with_subscribers(vec![Arc::new(LogWriter::default())]) + .with_controller(ControllerConfig { + queue_capacity: 100, + slot_capacity: 10, + }) + .build(); + + // Spawn supervisor in background + let sup_clone = Arc::clone(&sup); + let sup_task = tokio::spawn(async move { + if let Err(e) = sup_clone.run(vec![]).await { + eprintln!("Supervisor error: {e}"); + } + }); + tokio::time::sleep(Duration::from_millis(200)).await; + + // === Demo 1: Queue Policy (sequential execution in the SAME slot) === + println!("\n--- Demo 1: Queue Policy (sequential execution) ---"); + sup.submit(ControllerSpec::queue(make_worker("build", 500))) + .await?; + sup.submit(ControllerSpec::queue(make_worker("build", 500))) + .await?; + sup.submit(ControllerSpec::queue(make_worker("build", 500))) + .await?; + tokio::time::sleep(Duration::from_millis(2000)).await; + + // === Demo 2: Replace Policy (cancel current and start latest on terminal) === + println!("\n--- Demo 2: Replace Policy (cancel and restart) ---"); + sup.submit(ControllerSpec::replace(make_worker("deploy", 1000))) + .await?; + tokio::time::sleep(Duration::from_millis(300)).await; + + sup.submit(ControllerSpec::replace(make_worker("deploy", 1000))) + .await?; + tokio::time::sleep(Duration::from_millis(300)).await; + + sup.submit(ControllerSpec::replace(make_worker("deploy", 1000))) + .await?; + tokio::time::sleep(Duration::from_millis(1500)).await; + + // === Demo 3: DropIfRunning Policy (ignore if busy) === + println!("\n--- Demo 3: DropIfRunning Policy (ignore if busy) ---"); + sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800))) + .await?; + tokio::time::sleep(Duration::from_millis(200)).await; + + // These will be ignored (same slot is running) + sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800))) + .await?; + sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800))) + .await?; + tokio::time::sleep(Duration::from_millis(1000)).await; + + // After slot is idle, this will execute + sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800))) + .await?; + tokio::time::sleep(Duration::from_millis(1000)).await; + + // === Demo 4: Queue Full (rejections) === + println!("\n--- Demo 4: Queue Full (rejection) ---"); + // Same slot "batch": slot_capacity = 10 β†’ 2 submissions will be rejected. + for _ in 1..=12 { + sup.submit(ControllerSpec::queue(make_worker("batch", 200))) + .await?; + } + tokio::time::sleep(Duration::from_secs(4)).await; + + println!("\n--- Demo Complete ---"); + drop(sup); + let _ = sup_task.await; + Ok(()) +} diff --git a/examples/custom_subscriber.rs b/examples/custom_subscriber.rs index fa3a6f0..d0fd569 100644 --- a/examples/custom_subscriber.rs +++ b/examples/custom_subscriber.rs @@ -133,10 +133,7 @@ impl Subscribe for ConsoleSubscriber { } // === Ignored === - EventKind::SubscriberPanicked - | EventKind::SubscriberOverflow - | EventKind::TaskAddRequested - | EventKind::TaskRemoveRequested => {} + _ => {} } } diff --git a/src/controller/admission.rs b/src/controller/admission.rs new file mode 100644 index 0000000..8f84f65 --- /dev/null +++ b/src/controller/admission.rs @@ -0,0 +1,42 @@ +//! # Per-task admission policy +//! +//! Controller treats tasks as **slots** identified by `name`. +//! At any given time **one** task may run in a slot. +//! When a new request for the same slot arrives, the admission policy decides what to do. +//! +//! ## Variants +//! - `DropIfRunning`: If the slot is already running, **ignore** the new request. +//! - `Replace`: **Stop** the running task (cancel/remove) and start the new one. +//! - `Queue`: **Enqueue** the new request (FIFO). +//! +//! ## Invariants +//! - Tasks within the same slot never run in parallel (use dynamic names if you need parallel execution). +//! - Queued requests are executed strictly in submission order. + +/// Policy controlling how new submissions are handled when a slot is busy. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ControllerAdmission { + /// Skip task if already running. + /// + /// Use when: + /// - You only care about the latest state + /// - Redundant work should be avoided + /// - Example: periodic health checks + DropIfRunning, + + /// Stop current task and start new one immediately. + /// + /// Use when: + /// - New request invalidates old one + /// - Priority to latest submission + /// - Example: deployment pipeline (new commit cancels old build) + Replace, + + /// Queue the task (FIFO order). + /// + /// Use when: + /// - All submissions must execute + /// - Order matters + /// - Example: sequential processing pipeline + Queue, +} diff --git a/src/controller/config.rs b/src/controller/config.rs new file mode 100644 index 0000000..d1c8482 --- /dev/null +++ b/src/controller/config.rs @@ -0,0 +1,20 @@ +/// Configuration for the controller. +#[derive(Clone, Debug)] +pub struct ControllerConfig { + /// Capacity of the submission queue. + /// + /// When full, `submit()` will wait and `try_submit()` will return `Full` error. + pub queue_capacity: usize, + + /// Capacity of the slots. + pub slot_capacity: usize, +} + +impl Default for ControllerConfig { + fn default() -> Self { + Self { + queue_capacity: 1024, + slot_capacity: 100, + } + } +} diff --git a/src/controller/core.rs b/src/controller/core.rs new file mode 100644 index 0000000..c304b94 --- /dev/null +++ b/src/controller/core.rs @@ -0,0 +1,320 @@ +use std::{ + sync::{Arc, Weak}, + time::Instant, +}; + +use dashmap::DashMap; +use tokio::sync::{Mutex, RwLock, mpsc}; +use tokio_util::sync::CancellationToken; + +use crate::{ + Supervisor, + events::{Bus, Event, EventKind}, +}; + +use super::{ + admission::ControllerAdmission, + config::ControllerConfig, + error::ControllerError, + slot::{SlotState, SlotStatus}, + spec::ControllerSpec, +}; + +/// Handle for submitting tasks to the controller. +#[derive(Clone)] +pub struct ControllerHandle { + tx: mpsc::Sender, +} + +impl ControllerHandle { + /// Submit a task (async, waits if queue is full). + pub async fn submit(&self, spec: ControllerSpec) -> Result<(), ControllerError> { + self.tx + .send(spec) + .await + .map_err(|_| ControllerError::Closed) + } + + /// Try to submit without blocking (fails if queue full). + pub fn try_submit(&self, spec: ControllerSpec) -> Result<(), ControllerError> { + self.tx.try_send(spec).map_err(|e| match e { + mpsc::error::TrySendError::Full(_) => ControllerError::Full, + mpsc::error::TrySendError::Closed(_) => ControllerError::Closed, + }) + } +} + +/// Controller manages task slots with admission policies. +/// +/// Each slot can run at most one task at a time. New submissions are handled +/// according to the configured admission policy. +pub struct Controller { + config: ControllerConfig, + supervisor: Weak, + bus: Bus, + + // Concurrent slots map. + slots: DashMap>>, + + // Submission queue. + tx: mpsc::Sender, + rx: RwLock>>, +} + +impl Controller { + /// Creates a new controller (must call .run() to start). + pub fn new(config: ControllerConfig, supervisor: &Arc, bus: Bus) -> Arc { + let (tx, rx) = mpsc::channel(config.queue_capacity); + + Arc::new(Self { + config, + supervisor: Arc::downgrade(supervisor), + bus, + slots: DashMap::new(), + tx, + rx: RwLock::new(Some(rx)), + }) + } + + /// Returns a handle for submitting tasks. + pub fn handle(&self) -> ControllerHandle { + ControllerHandle { + tx: self.tx.clone(), + } + } + + /// Starts the controller loop (spawns in background). + pub fn run(self: Arc, token: CancellationToken) { + tokio::spawn(async move { + if let Err(e) = self.run_inner(token).await { + eprintln!("[controller] error: {e:?}"); + } + }); + } + + async fn run_inner(&self, token: CancellationToken) -> anyhow::Result<()> { + let mut rx = self + .rx + .write() + .await + .take() + .ok_or_else(|| anyhow::anyhow!("controller already running"))?; + + let mut bus_rx = self.bus.subscribe(); + loop { + tokio::select! { + _ = token.cancelled() => break, + + Some(spec) = rx.recv() => { + self.handle_submission(spec).await; + } + Ok(event) = bus_rx.recv() => { + self.handle_event(event).await; + } + } + } + Ok(()) + } + + /// Handles a new task submission. + /// + /// Replace semantics (latest-wins): + /// - Replace does NOT grow the queue; it replaces the head (the immediate successor). + /// - If slot was Running β†’ transition to Terminating and request remove once. + /// - If already Terminating β†’ do NOT call remove again; just replace the head. + /// - The next task actually starts in `on_task_finished` upon terminal event. + async fn handle_submission(&self, spec: ControllerSpec) { + let Some(sup) = self.supervisor.upgrade() else { + return; + }; + + let slot_name = spec.slot_name().to_string(); + let admission = spec.admission; + let task_spec = spec.task_spec; + + let slot_arc = self.get_or_create_slot(&slot_name); + let mut slot = slot_arc.lock().await; + + match (&slot.status, admission) { + (SlotStatus::Idle, _) => { + if let Err(e) = sup.add_task(task_spec) { + self.bus.publish( + Event::new(EventKind::ControllerRejected) + .with_task(slot_name.as_str()) + .with_reason(format!("add_failed: {}", e)), + ); + return; + } + slot.status = SlotStatus::Running { + started_at: Instant::now(), + }; + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!("admission={:?} status=running", admission)), + ); + } + (SlotStatus::Running { .. }, ControllerAdmission::Replace) => { + Self::replace_head_or_push(&mut slot, task_spec); + slot.status = SlotStatus::Terminating { + cancelled_at: Instant::now(), + }; + self.bus.publish( + Event::new(EventKind::ControllerSlotTransition) + .with_task(slot_name.as_str()) + .with_reason("runningβ†’terminating (replace)"), + ); + if let Err(e) = sup.remove_task(&slot_name) { + self.bus.publish( + Event::new(EventKind::ControllerRejected) + .with_task(slot_name.as_str()) + .with_reason(format!("remove_failed: {}", e)), + ); + } + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!("admission=Replace depth={}", slot.queue.len())), + ); + } + (SlotStatus::Running { .. }, ControllerAdmission::Queue) => { + if self.reject_if_full(&slot_name, slot.queue.len()) { + return; + } + slot.queue.push_back(task_spec); + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!("admission=Queue depth={}", slot.queue.len())), + ); + } + (SlotStatus::Terminating { .. }, ControllerAdmission::Replace) => { + Self::replace_head_or_push(&mut slot, task_spec); + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!( + "admission=Replace status=terminating depth={}", + slot.queue.len() + )), + ); + } + (SlotStatus::Terminating { .. }, ControllerAdmission::Queue) => { + if self.reject_if_full(&slot_name, slot.queue.len()) { + return; + } + slot.queue.push_back(task_spec); + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!( + "admission=Queue status=terminating depth={}", + slot.queue.len() + )), + ); + } + (SlotStatus::Running { .. }, ControllerAdmission::DropIfRunning) => {} + _ => { + if self.reject_if_full(&slot_name, slot.queue.len()) { + return; + } + slot.queue.push_back(task_spec); + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!( + "admission={:?} depth={}", + admission, + slot.queue.len() + )), + ); + } + } + } + + /// Handles bus events (terminal only). + async fn handle_event(&self, event: Arc) { + if event.kind == EventKind::TaskRemoved { + self.on_task_finished(&event).await + } + } + + /// Handles `TaskRemoved` for a task; frees the slot and optionally starts the queued next. + /// + /// Rationale: `ActorExhausted`/`ActorDead` may arrive before the actor is + /// deregistered in the Supervisor’s Registry. Starting the next task before + /// `TaskRemoved` can race and produce `task_already_exists`. By gating on + /// `TaskRemoved` we avoid double-adds and registry races. + /// TODO: maybe add `slot_name` with task_name as default. + async fn on_task_finished(&self, event: &Event) { + let Some(task_name) = event.task.as_deref() else { + return; + }; + let Some(sup) = self.supervisor.upgrade() else { + return; + }; + let Some(slot_arc) = self.slots.get(task_name).map(|e| e.clone()) else { + return; + }; + let mut slot = slot_arc.lock().await; + + if matches!(slot.status, SlotStatus::Idle) { + return; + } + slot.status = SlotStatus::Idle; + + if let Some(next_spec) = slot.queue.pop_front() { + if let Err(e) = sup.add_task(next_spec) { + eprintln!("[controller] failed to start next task '{task_name}': {e}"); + return; + } + slot.status = SlotStatus::Running { + started_at: Instant::now(), + }; + + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(task_name) + .with_reason(format!("started_from_queue depth={}", slot.queue.len())), + ); + } + if matches!(slot.status, SlotStatus::Idle) && slot.queue.is_empty() { + drop(slot); + self.slots.remove(task_name); + } + } + + #[inline] + fn get_or_create_slot(&self, slot_name: &str) -> Arc> { + self.slots + .entry(slot_name.to_string()) + .or_insert_with(|| Arc::new(Mutex::new(SlotState::new()))) + .clone() + } + + #[inline] + fn reject_if_full(&self, slot_name: &str, slot_len: usize) -> bool { + if slot_len >= self.config.slot_capacity { + self.bus.publish( + Event::new(EventKind::ControllerRejected) + .with_task(slot_name) + .with_reason(format!( + "queue_full: {}/{}", + slot_len, self.config.slot_capacity + )), + ); + true + } else { + false + } + } + + #[inline] + fn replace_head_or_push(slot: &mut SlotState, task_spec: crate::TaskSpec) { + if let Some(head) = slot.queue.front_mut() { + *head = task_spec; + } else { + slot.queue.push_front(task_spec); + } + } +} diff --git a/src/controller/error.rs b/src/controller/error.rs new file mode 100644 index 0000000..8a95b5e --- /dev/null +++ b/src/controller/error.rs @@ -0,0 +1,17 @@ +use thiserror::Error; + +/// Error returned by [`Supervisor::submit`](crate::Supervisor::submit). +#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)] +pub enum ControllerError { + /// Controller is not configured (builder didn't call `with_controller`). + #[error("controller not configured")] + NotConfigured, + + /// Submission queue is full (try again later or use async `submit`). + #[error("submission queue full")] + Full, + + /// Controller channel is closed (controller task died). + #[error("controller channel closed")] + Closed, +} diff --git a/src/controller/mod.rs b/src/controller/mod.rs new file mode 100644 index 0000000..9c18b70 --- /dev/null +++ b/src/controller/mod.rs @@ -0,0 +1,124 @@ +//! # Controller: slot-based admission & start-on-`TaskRemoved` +//! +//! The **controller** is a thin policy layer (wrapper) over `TaskSpec` submission. +//! It enforces per-slot admission rules (`Queue`, `Replace`, `DropIfRunning`) and +//! starts the *next* task **only after** a terminal `TaskRemoved` event is observed +//! on the runtime bus. In this model, **one slot = one task name** +//! (`slot key = TaskSpec::name()`). +//! +//! --- +//! +//! ## Role in Taskvisor +//! +//! The controller accepts `ControllerSpec`, unwraps its `TaskSpec`, applies admission +//! rules (including Replace latest-wins), and delegates `add/remove` to the Supervisor. +//! It subscribes to the Bus and advances slots strictly on `TaskRemoved` to avoid +//! registry races and double starts. +//! +//! ```text +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ Application code β”‚ +//! β”‚ sup.submit(ControllerSpec) β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! β”‚ +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ Controller β”‚ (per-slot admission FSM) +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! unwraps & applies policy +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ TaskSpec β”‚ (from ControllerSpec) +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! add/remove +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ Supervisor β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! publishes runtime events +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” subscribes +//! β”‚ Bus │◄────────────── Controller +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! β–Ό +//! Task Actors +//! ``` +//! +//! **Flow summary** +//! 1. Application calls `sup.submit(ControllerSpec)`. +//! 2. Controller unwraps `TaskSpec` and applies `Admission` rules. +//! 3. If accepted, controller calls `Supervisor::add_task(TaskSpec)` or requests remove. +//! 4. On terminal `TaskRemoved` (via Bus), the slot becomes `Idle` and the next queued +//! task (if any) is started. +//! +//! --- +//! +//! ## Per-slot model +//! +//! - **Key**: `task_spec.name()` (exactly one running task per slot). +//! - **State**: `Idle | Running | Terminating`, with a FIFO queue per slot. +//! - **Replace (latest-wins)**: does **not** grow the queue; it **replaces the head** +//! (the immediate successor). The next task actually starts **only after `TaskRemoved`**. +//! +//! ```text +//! State machine (per task name) +//! +//! Idle ── submit β†’ start β†’ Running ── submit(Replace) β†’ Terminating +//! β–² (Supervisor.add) β”‚ +//! └─────────── on TaskRemoved β—„β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! β”œβ”€ if queue empty β†’ Idle +//! └─ if queue head exists β†’ start head β†’ Running +//! ``` +//! +//! Queue operations +//! - **Queue**: push to tail (FIFO). +//! - **Replace**: replace head (latest-wins), not increasing depth. +//! +//! ```text +//! Queue example (head on the left): +//! [ next, a, b ] --(Replace c)--> [ c, a, b ] --(Replace d)--> [ d, a, b ] +//! (head replaced) (head replaced) +//! ``` +//! +//! --- +//! +//! ## Why gate on `TaskRemoved` +//! +//! `ActorExhausted/ActorDead` may arrive **before** full deregistration of the actor. +//! Starting the next task on those signals can race the registry and cause +//! `task_already_exists`. Gating advancement on **`TaskRemoved`** prevents double-adds. +//! +//! --- +//! +//! ## Concurrency & scalability +//! +//! - `DashMap>>` avoids global map lock contention. +//! - Per-slot `Mutex` ensures updates to one slot don’t block others. +//! +//! --- +//! +//! ## Public surface +//! +//! - Configure via `Supervisor::builder(..).with_controller(ControllerConfig)`. +//! - Submit via `sup.submit(ControllerSpec::{queue, replace, drop_if_running}(...))`. +//! - Policies: [`ControllerAdmission`] = `Queue | Replace | DropIfRunning`. +//! - Controller emits `ControllerSubmitted`, `ControllerRejected`, and +//! `ControllerSlotTransition` (feature `"controller"`); readable with `"logging"`’s `LogWriter`. +//! +//! ## Invariants +//! - At most **one** running task per slot. +//! - Slots advance to next task **only** after `TaskRemoved`. +//! - `Replace` is **latest-wins** (head replace); `Queue` is FIFO. + +mod admission; +mod config; +mod core; +mod error; +mod slot; +mod spec; + +pub use admission::ControllerAdmission; +pub use config::ControllerConfig; +pub use core::Controller; +pub use error::ControllerError; +pub use spec::ControllerSpec; diff --git a/src/controller/slot.rs b/src/controller/slot.rs new file mode 100644 index 0000000..c17aee7 --- /dev/null +++ b/src/controller/slot.rs @@ -0,0 +1,41 @@ +use std::{collections::VecDeque, time::Instant}; + +use crate::TaskSpec; + +/// State of a single task slot. +pub(super) struct SlotState { + /// Current status (idle, running, or terminating). + pub status: SlotStatus, + + /// Queue of pending tasks (FIFO order). + pub queue: VecDeque, +} + +/// Status of a task slot. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum SlotStatus { + /// No task running, ready to accept new submissions. + Idle, + + /// Task currently running. + Running { + /// When the task started. + started_at: Instant, + }, + + /// Task is being cancelled (waiting for TaskRemoved event). + Terminating { + /// When cancellation was requested. + cancelled_at: Instant, + }, +} + +impl SlotState { + /// Creates a new idle slot. + pub fn new() -> Self { + Self { + status: SlotStatus::Idle, + queue: VecDeque::new(), + } + } +} diff --git a/src/controller/spec.rs b/src/controller/spec.rs new file mode 100644 index 0000000..76ad7d3 --- /dev/null +++ b/src/controller/spec.rs @@ -0,0 +1,49 @@ +use super::admission::ControllerAdmission; +use crate::TaskSpec; + +/// Request to submit a task to the controller. +/// +/// Combines a slot name, admission policy, and the actual task specification. +#[derive(Clone)] +pub struct ControllerSpec { + /// Admission policy. + pub admission: ControllerAdmission, + + /// Task specification to run. + pub task_spec: TaskSpec, +} + +impl ControllerSpec { + /// Creates a new controller submission specification. + /// + /// ## Parameters + /// - `admission`: How to handle concurrent submissions + /// - `task_spec`: The task to execute + pub fn new(admission: ControllerAdmission, task_spec: TaskSpec) -> Self { + Self { + admission, + task_spec, + } + } + + /// Returns the slot name. + pub fn slot_name(&self) -> &str { + self.task_spec.name() + } + + /// Convenience: Queue admission. + #[inline] + pub fn queue(task_spec: TaskSpec) -> Self { + Self::new(ControllerAdmission::Queue, task_spec) + } + + #[inline] + pub fn replace(task_spec: TaskSpec) -> Self { + Self::new(ControllerAdmission::Replace, task_spec) + } + + #[inline] + pub fn drop_if_running(task_spec: TaskSpec) -> Self { + Self::new(ControllerAdmission::DropIfRunning, task_spec) + } +} diff --git a/src/core/builder.rs b/src/core/builder.rs new file mode 100644 index 0000000..1e1509a --- /dev/null +++ b/src/core/builder.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; +use tokio::sync; + +use crate::{ + config::Config, + events::Bus, + subscribers::{Subscribe, SubscriberSet}, +}; + +use super::{alive::AliveTracker, registry::Registry, supervisor::Supervisor}; + +/// Builder for constructing a Supervisor with optional features. +pub struct SupervisorBuilder { + cfg: Config, + subscribers: Vec>, + + #[cfg(feature = "controller")] + controller_config: Option, +} + +impl SupervisorBuilder { + /// Creates a new builder with the given configuration. + pub fn new(cfg: Config) -> Self { + Self { + cfg, + subscribers: Vec::new(), + + #[cfg(feature = "controller")] + controller_config: None, + } + } + + /// Sets event subscribers for observability. + /// + /// Subscribers receive runtime events (task lifecycle, failures, etc.) + /// through dedicated workers with bounded queues. + pub fn with_subscribers(mut self, subscribers: Vec>) -> Self { + self.subscribers = subscribers; + self + } + + /// Enables the controller with the given configuration. + /// + /// The controller manages task slots with admission policies + /// (Queue, Replace, DropIfRunning). + /// + /// Requires the `controller` feature flag. + #[cfg(feature = "controller")] + #[cfg_attr(docsrs, doc(cfg(feature = "controller")))] + pub fn with_controller(mut self, config: crate::controller::ControllerConfig) -> Self { + self.controller_config = Some(config); + self + } + + /// Builds and returns the Supervisor instance. + /// + /// This consumes the builder and initializes all runtime components: + /// - Event bus for broadcasting + /// - Registry for task lifecycle management + /// - Subscriber workers + /// - Optional controller (if configured) + // src/core/builder.rs + pub fn build(self) -> Arc { + let bus = Bus::new(self.cfg.bus_capacity_clamped()); + let subs = Arc::new(SubscriberSet::new(self.subscribers, bus.clone())); + let runtime_token = tokio_util::sync::CancellationToken::new(); + + let semaphore = self + .cfg + .concurrency_limit() + .map(sync::Semaphore::new) + .map(Arc::new); + + let registry = Registry::new(bus.clone(), runtime_token.clone(), semaphore); + let alive = Arc::new(AliveTracker::new()); + + let sup = Arc::new(Supervisor::new_internal( + self.cfg, + bus.clone(), + subs, + alive, + registry, + runtime_token.clone(), + )); + + #[cfg(feature = "controller")] + if let Some(ctrl_cfg) = self.controller_config { + let controller = crate::controller::Controller::new(ctrl_cfg, &sup, bus.clone()); + + let _ = sup.controller.set(Arc::clone(&controller)); + controller.run(runtime_token.clone()); + } + sup + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs index 924b7b4..feb2836 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -101,6 +101,7 @@ mod actor; mod alive; +mod builder; mod registry; mod runner; mod shutdown; diff --git a/src/core/supervisor.rs b/src/core/supervisor.rs index 92e6338..3edc512 100644 --- a/src/core/supervisor.rs +++ b/src/core/supervisor.rs @@ -91,10 +91,10 @@ //! ``` use std::{sync::Arc, time::Duration}; -use tokio::{sync::Semaphore, sync::broadcast, time::timeout}; +use tokio::{sync::OnceCell, sync::broadcast, time::timeout}; use tokio_util::sync::CancellationToken; -use crate::core::{alive::AliveTracker, registry::Registry}; +use crate::core::{alive::AliveTracker, builder::SupervisorBuilder, registry::Registry}; use crate::{ config::Config, error::RuntimeError, @@ -118,28 +118,53 @@ pub struct Supervisor { alive: Arc, registry: Arc, runtime_token: CancellationToken, + + #[cfg(feature = "controller")] + pub(super) controller: OnceCell>, } impl Supervisor { - /// Creates a new supervisor with the given config and subscribers (maybe empty). - pub fn new(cfg: Config, subscribers: Vec>) -> Self { - let bus = Bus::new(cfg.bus_capacity_clamped()); - let subs = Arc::new(SubscriberSet::new(subscribers, bus.clone())); - let runtime_token = CancellationToken::new(); - let semaphore = Self::build_semaphore_static(&cfg); - - let registry = Registry::new(bus.clone(), runtime_token.clone(), semaphore); - + /// Internal constructor used by builder (not public API). + pub(crate) fn new_internal( + cfg: Config, + bus: Bus, + subs: Arc, + alive: Arc, + registry: Arc, + runtime_token: CancellationToken, + ) -> Self { Self { cfg, bus, subs, - alive: Arc::new(AliveTracker::new()), + alive, registry, runtime_token, + + #[cfg(feature = "controller")] + controller: OnceCell::new(), } } + /// Creates a new supervisor with the given config and subscribers (maybe empty). + pub fn new(cfg: Config, subscribers: Vec>) -> Arc { + Self::builder(cfg).with_subscribers(subscribers).build() + } + + /// Creates a builder for constructing a Supervisor. + /// + /// ## Example + /// ```rust + /// use taskvisor::{Config, Supervisor}; + /// + /// let sup = Supervisor::builder(Config::default()) + /// .with_subscribers(vec![]) + /// .build(); + /// ``` + pub fn builder(cfg: Config) -> SupervisorBuilder { + SupervisorBuilder::new(cfg) + } + /// Adds a new task to the supervisor at runtime. /// /// Publishes `TaskAddRequested` with the spec to the bus. @@ -229,6 +254,43 @@ impl Supervisor { self.wait_task_removed(&mut rx, name, wait_for).await } + /// Submits a task to the controller (if enabled). + /// + /// Returns an error if: + /// - Controller feature is disabled + /// - Controller is not configured + /// - Submission queue is full (use `try_submit` for non-blocking) + /// + /// Requires the `controller` feature flag. + #[cfg(feature = "controller")] + #[cfg_attr(docsrs, doc(cfg(feature = "controller")))] + pub async fn submit( + &self, + spec: crate::controller::ControllerSpec, + ) -> Result<(), crate::controller::ControllerError> { + match self.controller.get() { + Some(ctrl) => ctrl.handle().submit(spec).await, + None => Err(crate::controller::ControllerError::NotConfigured), + } + } + + /// Tries to submit a task without blocking. + /// + /// Returns `TrySubmitError::Full` if the queue is full. + /// + /// Requires the `controller` feature flag. + #[cfg(feature = "controller")] + #[cfg_attr(docsrs, doc(cfg(feature = "controller")))] + pub fn try_submit( + &self, + spec: crate::controller::ControllerSpec, + ) -> Result<(), crate::controller::ControllerError> { + match self.controller.get() { + Some(ctrl) => ctrl.handle().try_submit(spec), + None => Err(crate::controller::ControllerError::NotConfigured), + } + } + /// Listens to the internal event bus and fans out each received [`Event`] to all active subscribers. /// /// The listener also updates the [`AliveTracker`] before fan-out to keep @@ -268,13 +330,6 @@ impl Supervisor { }); } - /// Builds global semaphore for concurrency limiting. - /// - /// Returns `None` if unlimited. - fn build_semaphore_static(cfg: &Config) -> Option> { - cfg.concurrency_limit().map(Semaphore::new).map(Arc::new) - } - /// Waits for either shutdown signal or natural completion of all tasks. async fn drive_shutdown(&self) -> Result<(), RuntimeError> { tokio::select! { diff --git a/src/events/event.rs b/src/events/event.rs index 4b27e02..1c1ccd7 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -191,6 +191,30 @@ pub enum EventKind { /// - `at`: wall-clock timestamp /// - `seq`: global sequence ActorDead, + + #[cfg(feature = "controller")] + /// Controller submission rejected (queue full, add failed, etc). + /// + /// Sets: + /// - `task`: slot name + /// - `reason`: rejection reason ("queue_full", "add_failed: ...", etc) + ControllerRejected, + + #[cfg(feature = "controller")] + /// Task submitted successfully to controller slot. + /// + /// Sets: + /// - `task`: slot name + /// - `reason`: "admission={admission} status={status} depth={N}" + ControllerSubmitted, + + #[cfg(feature = "controller")] + /// Slot transitioned state (Running β†’ Terminating, etc). + /// + /// Sets: + /// - `task`: slot name + /// - `reason`: "runningβ†’terminating" (Replace), "terminatingβ†’idle", etc + ControllerSlotTransition, } /// Reason for schedule the next run/backoff. diff --git a/src/lib.rs b/src/lib.rs index d36f6c1..8250c46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,7 @@ //! β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ //! β–Ό β–Ό β–Ό //! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” -//! β”‚ Supervisor (runtime orchestrator) β”‚ +//! β”‚ Supervisor (runtime orchestrator) β”‚ //! β”‚ - Bus (broadcast events) β”‚ //! β”‚ - AliveTracker (tracks task state with sequence numbers) β”‚ //! β”‚ - SubscriberSet (fans out to user subscribers) β”‚ @@ -104,7 +104,7 @@ //! //! ## Optional features //! - `logging`: exports a simple built-in [`LogWriter`] _(demo/reference only)_. -//! - `events`: exports [`Event`] and [`EventKind`] for advanced integrations. +//! - `controller`: exposes controller runtime and admission types. //! //! ## Example //! ```rust @@ -122,13 +122,16 @@ //! #[cfg(feature = "logging")] //! let subs: Vec> = { //! use taskvisor::LogWriter; -//! vec![Arc::new(LogWriter)] +//! vec![Arc::new(LogWriter::default())] //! }; //! #[cfg(not(feature = "logging"))] //! let subs: Vec> = Vec::new(); //! //! // Create supervisor -//! let sup = Supervisor::new(cfg, subs); +//! let sup = taskvisor::Supervisor::builder(cfg) +//! .with_subscribers(subs) +//! // .with_controller(ControllerConfig { ... }) // if feature = "controller" +//! .build(); //! //! // Define a simple task that runs once and exits //! let hello: TaskRef = TaskFn::arc("hello", |ctx: CancellationToken| async move { @@ -168,6 +171,13 @@ pub use policies::{BackoffPolicy, JitterPolicy, RestartPolicy}; pub use subscribers::{Subscribe, SubscriberSet}; pub use tasks::{Task, TaskFn, TaskRef, TaskSpec}; +// Optional: expose a controller object. +// Enable with: `--features controller` +#[cfg(feature = "controller")] +mod controller; +#[cfg(feature = "controller")] +pub use controller::{ControllerAdmission, ControllerConfig, ControllerError, ControllerSpec}; + // Optional: expose a simple built-in logger subscriber (demo/reference). // Enable with: `--features logging` #[cfg(feature = "logging")] diff --git a/src/subscribers/embedded/log.rs b/src/subscribers/embedded/log.rs index d0db55d..a3d7660 100644 --- a/src/subscribers/embedded/log.rs +++ b/src/subscribers/embedded/log.rs @@ -180,6 +180,36 @@ impl Subscribe for LogWriter { or(e.reason.as_deref(), "fatal") ); } + + #[cfg(feature = "controller")] + EventKind::ControllerRejected => { + println!( + "{} [controller-rejected] slot={} reason=\"{}\"", + seq, + or(e.task.as_deref(), "none"), + or(e.reason.as_deref(), "unknown") + ); + } + + #[cfg(feature = "controller")] + EventKind::ControllerSubmitted => { + println!( + "{} [controller-submitted] slot={} {}", + seq, + or(e.task.as_deref(), "none"), + or(e.reason.as_deref(), "") + ); + } + + #[cfg(feature = "controller")] + EventKind::ControllerSlotTransition => { + println!( + "{} [controller-transition] slot={} transition=\"{}\"", + seq, + or(e.task.as_deref(), "none"), + or(e.reason.as_deref(), "unknown") + ); + } } }