From bc7a5059dbdd4906dbd4814cf2733ff7e85a2e94 Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Tue, 21 Oct 2025 16:17:12 +0300 Subject: [PATCH 01/11] draft --- Cargo.toml | 2 +- examples/basic_controller.rs | 123 ++++++++++++++++++++++ src/controller/admission.rs | 42 ++++++++ src/controller/config.rs | 16 +++ src/controller/core.rs | 195 +++++++++++++++++++++++++++++++++++ src/controller/error.rs | 17 +++ src/controller/mod.rs | 13 +++ src/controller/slot.rs | 41 ++++++++ src/controller/spec.rs | 42 ++++++++ src/core/builder.rs | 118 +++++++++++++++++++++ src/core/mod.rs | 1 + src/core/supervisor.rs | 93 +++++++++++++---- src/lib.rs | 2 +- 13 files changed, 684 insertions(+), 21 deletions(-) create mode 100644 examples/basic_controller.rs create mode 100644 src/controller/admission.rs create mode 100644 src/controller/config.rs create mode 100644 src/controller/core.rs create mode 100644 src/controller/error.rs create mode 100644 src/controller/mod.rs create mode 100644 src/controller/slot.rs create mode 100644 src/controller/spec.rs create mode 100644 src/core/builder.rs diff --git a/Cargo.toml b/Cargo.toml index 68cbc43..3142f5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ categories = ["asynchronous", "concurrency", "development-tools"] [features] default = [] logging = [] -events = [] +controller = [] [package.metadata.docs.rs] features = ["logging", "events"] diff --git a/examples/basic_controller.rs b/examples/basic_controller.rs new file mode 100644 index 0000000..e0f13ed --- /dev/null +++ b/examples/basic_controller.rs @@ -0,0 +1,123 @@ +//! # Example: Controller Pipeline + +use std::{sync::Arc, time::Duration}; +use tokio_util::sync::CancellationToken; + +use taskvisor::{ + BackoffPolicy, Config, RestartPolicy, Supervisor, TaskError, TaskFn, TaskRef, TaskSpec, +}; + +#[cfg(feature = "controller")] +use taskvisor::{controller::Admission, controller::ControllerConfig, controller::ControllerSpec}; + +/// One-shot task factory. +fn make_task(name: &'static str, work_ms: u64) -> TaskSpec { + let n = name.to_string(); + let task: TaskRef = TaskFn::arc(name, move |_ctx: CancellationToken| { + let n = n.clone(); + async move { + println!("[{n}] start (work {work_ms}ms)"); + tokio::time::sleep(Duration::from_millis(work_ms)).await; + println!("[{n}] done"); + Ok::<(), TaskError>(()) + } + }); + + TaskSpec::new( + task, + RestartPolicy::Never, + BackoffPolicy::default(), + Some(Duration::from_secs(5)), + ) +} + +#[cfg(feature = "controller")] +fn make_producer() -> TaskSpec { + let task: TaskRef = TaskFn::arc("producer", |ctx: CancellationToken| async move { + // Give controller time to start + tokio::time::sleep(Duration::from_millis(200)).await; + + // Get supervisor handle (need to pass via closure) + // For now, we'll submit via a shared channel or just demo the API + + println!("[producer] submitting build (Queue)"); + // sup.submit(ControllerSpec::new("build", Admission::Queue, make_task("build", 800))).await?; + + println!("[producer] done"); + Ok::<(), TaskError>(()) + }); + + TaskSpec::new( + task, + RestartPolicy::Never, + BackoffPolicy::default(), + Some(Duration::from_secs(5)), + ) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + #[cfg(not(feature = "controller"))] + { + eprintln!("This example requires the 'controller' feature."); + eprintln!("Run with: cargo run --example basic_controller --features controller"); + return Ok(()); + } + + #[cfg(feature = "controller")] + { + let mut cfg = Config::default(); + cfg.grace = Duration::from_secs(5); + + let sup = Supervisor::builder(cfg) + .with_subscribers(vec![]) + .with_controller(ControllerConfig::default()) + .build(); + + // Spawn supervisor in background + let sup_clone = Arc::clone(&sup); + let sup_task = tokio::spawn(async move { + if let Err(e) = sup_clone.run(vec![]).await { + eprintln!("[supervisor] error: {e}"); + } + }); + + // Submit tasks via controller + tokio::time::sleep(Duration::from_millis(100)).await; + + println!("[main] submitting build (Queue)"); + sup.submit(ControllerSpec::new( + "build", + Admission::Queue, + make_task("build", 800), + )) + .await?; + + println!("[main] submitting test (DropIfRunning)"); + sup.submit(ControllerSpec::new( + "test", + Admission::DropIfRunning, + make_task("test", 600), + )) + .await?; + + println!("[main] submitting deploy (Replace)"); + sup.submit(ControllerSpec::new( + "deploy", + Admission::Replace, + make_task("deploy", 1000), + )) + .await?; + + // Wait a bit for tasks to complete + tokio::time::sleep(Duration::from_secs(3)).await; + + // Shutdown + drop(sup); + let _ = sup_task.await; + + println!("[main] finished"); + } + + Ok(()) +} diff --git a/src/controller/admission.rs b/src/controller/admission.rs new file mode 100644 index 0000000..5c9da22 --- /dev/null +++ b/src/controller/admission.rs @@ -0,0 +1,42 @@ +//! # Per-task admission policy +//! +//! Controller treats tasks as **slots** identified by `name`. +//! At any given time **one** task may run in a slot. +//! When a new request for the same slot arrives, the admission policy decides what to do. +//! +//! ## Variants +//! - `DropIfRunning`: If the slot is already running, **ignore** the new request. +//! - `Replace`: **Stop** the running task (cancel/remove) and start the new one. +//! - `Queue`: **Enqueue** the new request (FIFO). +//! +//! ## Invariants +//! - Tasks within the same slot never run in parallel (use dynamic names if you need parallel execution). +//! - Queued requests are executed strictly in submission order. + +/// Policy controlling how new submissions are handled when a slot is busy. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Admission { + /// Skip task if already running. + /// + /// Use when: + /// - You only care about the latest state + /// - Redundant work should be avoided + /// - Example: periodic health checks + DropIfRunning, + + /// Stop current task and start new one immediately. + /// + /// Use when: + /// - New request invalidates old one + /// - Priority to latest submission + /// - Example: deployment pipeline (new commit cancels old build) + Replace, + + /// Queue the task (FIFO order). + /// + /// Use when: + /// - All submissions must execute + /// - Order matters + /// - Example: sequential processing pipeline + Queue, +} diff --git a/src/controller/config.rs b/src/controller/config.rs new file mode 100644 index 0000000..316330f --- /dev/null +++ b/src/controller/config.rs @@ -0,0 +1,16 @@ +/// Configuration for the controller. +#[derive(Clone, Debug)] +pub struct ControllerConfig { + /// Capacity of the submission queue. + /// + /// When full, `submit()` will wait and `try_submit()` will return `Full` error. + pub queue_capacity: usize, +} + +impl Default for ControllerConfig { + fn default() -> Self { + Self { + queue_capacity: 1024, + } + } +} diff --git a/src/controller/core.rs b/src/controller/core.rs new file mode 100644 index 0000000..de0606c --- /dev/null +++ b/src/controller/core.rs @@ -0,0 +1,195 @@ +use std::collections::HashMap; +use std::sync::{Arc, Weak}; +use std::time::Instant; + +use tokio::sync::{RwLock, mpsc}; +use tokio_util::sync::CancellationToken; + +use crate::{ + Supervisor, + events::{Bus, Event, EventKind}, +}; + +use super::{ + admission::Admission, + config::ControllerConfig, + error::SubmitError, + slot::{SlotState, SlotStatus}, + spec::ControllerSpec, +}; + +/// Handle for submitting tasks to the controller. +#[derive(Clone)] +pub struct ControllerHandle { + tx: mpsc::Sender, +} + +impl ControllerHandle { + /// Submit a task (async, waits if queue is full). + pub async fn submit(&self, spec: ControllerSpec) -> Result<(), SubmitError> { + self.tx.send(spec).await.map_err(|_| SubmitError::Closed) + } + + /// Try to submit without blocking (fails if queue full). + pub fn try_submit(&self, spec: ControllerSpec) -> Result<(), SubmitError> { + self.tx.try_send(spec).map_err(|e| match e { + mpsc::error::TrySendError::Full(_) => SubmitError::Full, + mpsc::error::TrySendError::Closed(_) => SubmitError::Closed, + }) + } +} + +/// Controller manages task slots with admission policies. +/// +/// Each slot can run at most one task at a time. New submissions are handled +/// according to the configured admission policy. +pub struct Controller { + #[allow(dead_code)] + config: ControllerConfig, + supervisor: Weak, + bus: Bus, + + // Internal state (protected by RwLock). + slots: RwLock>, + + // Submission queue. + tx: mpsc::Sender, + rx: RwLock>>, +} + +impl Controller { + /// Creates a new controller (must call .run() to start). + pub fn new(config: ControllerConfig, supervisor: &Arc, bus: Bus) -> Arc { + let (tx, rx) = mpsc::channel(config.queue_capacity); + + Arc::new(Self { + config, + supervisor: Arc::downgrade(supervisor), + bus, + slots: RwLock::new(HashMap::new()), + tx, + rx: RwLock::new(Some(rx)), + }) + } + + /// Returns a handle for submitting tasks. + pub fn handle(&self) -> ControllerHandle { + ControllerHandle { + tx: self.tx.clone(), + } + } + + /// Starts the controller loop (spawns in background). + pub fn run(self: Arc, token: CancellationToken) { + tokio::spawn(async move { + if let Err(e) = self.run_inner(token).await { + eprintln!("[controller] error: {e:?}"); + } + }); + } + + async fn run_inner(&self, token: CancellationToken) -> anyhow::Result<()> { + let mut rx = self + .rx + .write() + .await + .take() + .ok_or_else(|| anyhow::anyhow!("controller already running"))?; + + let mut bus_rx = self.bus.subscribe(); + + loop { + tokio::select! { + _ = token.cancelled() => break, + + Some(spec) = rx.recv() => { + self.handle_submission(spec).await; + } + Ok(event) = bus_rx.recv() => { + self.handle_event(event).await; + } + } + } + + Ok(()) + } + + /// Handles a new task submission. + async fn handle_submission(&self, spec: ControllerSpec) { + let Some(sup) = self.supervisor.upgrade() else { + return; + }; + + let slot_name = spec.slot_name().to_string(); + let admission = spec.admission; + let task_spec = spec.task_spec; + + let mut slots = self.slots.write().await; + let slot = slots + .entry(slot_name.clone()) + .or_insert_with(SlotState::new); + + match (&slot.status, admission) { + (SlotStatus::Idle, _) => { + if let Err(e) = sup.add_task(task_spec) { + eprintln!("[controller] failed to add task '{slot_name}': {e}"); + return; + } + slot.status = SlotStatus::Running { + started_at: Instant::now(), + }; + } + (SlotStatus::Running { .. }, Admission::Replace) => { + slot.queue.push_front(task_spec); + slot.status = SlotStatus::Terminating { + cancelled_at: Instant::now(), + }; + if let Err(e) = sup.remove_task(&slot_name) { + eprintln!("[controller] failed to cancel task '{slot_name}': {e}"); + } + } + (SlotStatus::Running { .. }, Admission::Queue) => { + slot.queue.push_back(task_spec); + } + (SlotStatus::Terminating { .. }, _) => { + slot.queue.push_back(task_spec); + } + (SlotStatus::Running { .. }, Admission::DropIfRunning) => {} + } + } + + /// Handles bus events (TaskStopped, TaskRemoved). + async fn handle_event(&self, event: Arc) { + match event.kind { + EventKind::TaskStopped | EventKind::TaskRemoved => { + self.on_task_finished(&event).await; + } + _ => {} + } + } + + /// Called when a task finishes (stopped or removed). + async fn on_task_finished(&self, event: &Event) { + let Some(task_name) = event.task.as_deref() else { + return; + }; + let Some(sup) = self.supervisor.upgrade() else { + return; + }; + let mut slots = self.slots.write().await; + let Some(slot) = slots.get_mut(task_name) else { + return; + }; + + slot.status = SlotStatus::Idle; + if let Some(next_spec) = slot.queue.pop_front() { + if let Err(e) = sup.add_task(next_spec) { + eprintln!("[controller] failed to start next task '{task_name}': {e}"); + return; + } + slot.status = SlotStatus::Running { + started_at: Instant::now(), + }; + } + } +} diff --git a/src/controller/error.rs b/src/controller/error.rs new file mode 100644 index 0000000..369e9fd --- /dev/null +++ b/src/controller/error.rs @@ -0,0 +1,17 @@ +use thiserror::Error; + +/// Error returned by [`Supervisor::submit`](crate::Supervisor::submit). +#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)] +pub enum SubmitError { + /// Controller is not configured (builder didn't call `with_controller`). + #[error("controller not configured")] + NotConfigured, + + /// Submission queue is full (try again later or use async `submit`). + #[error("submission queue full")] + Full, + + /// Controller channel is closed (controller task died). + #[error("controller channel closed")] + Closed, +} diff --git a/src/controller/mod.rs b/src/controller/mod.rs new file mode 100644 index 0000000..7f21b68 --- /dev/null +++ b/src/controller/mod.rs @@ -0,0 +1,13 @@ +pub mod admission; +pub mod config; +pub mod error; +pub mod spec; + +mod core; +mod slot; + +pub use admission::Admission; +pub use config::ControllerConfig; +pub use core::{Controller, ControllerHandle}; +pub use error::SubmitError; +pub use spec::ControllerSpec; diff --git a/src/controller/slot.rs b/src/controller/slot.rs new file mode 100644 index 0000000..c17aee7 --- /dev/null +++ b/src/controller/slot.rs @@ -0,0 +1,41 @@ +use std::{collections::VecDeque, time::Instant}; + +use crate::TaskSpec; + +/// State of a single task slot. +pub(super) struct SlotState { + /// Current status (idle, running, or terminating). + pub status: SlotStatus, + + /// Queue of pending tasks (FIFO order). + pub queue: VecDeque, +} + +/// Status of a task slot. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum SlotStatus { + /// No task running, ready to accept new submissions. + Idle, + + /// Task currently running. + Running { + /// When the task started. + started_at: Instant, + }, + + /// Task is being cancelled (waiting for TaskRemoved event). + Terminating { + /// When cancellation was requested. + cancelled_at: Instant, + }, +} + +impl SlotState { + /// Creates a new idle slot. + pub fn new() -> Self { + Self { + status: SlotStatus::Idle, + queue: VecDeque::new(), + } + } +} diff --git a/src/controller/spec.rs b/src/controller/spec.rs new file mode 100644 index 0000000..3b6bc4b --- /dev/null +++ b/src/controller/spec.rs @@ -0,0 +1,42 @@ +use super::admission::Admission; +use crate::TaskSpec; + +/// Request to submit a task to the controller. +/// +/// Combines a slot name, admission policy, and the actual task specification. +#[derive(Clone)] +pub struct ControllerSpec { + /// Logical slot name (tasks with same name share a slot). + slot_name: String, + + /// Admission policy. + pub admission: Admission, + + /// Task specification to run. + pub task_spec: TaskSpec, +} + +impl ControllerSpec { + /// Creates a new controller submission specification. + /// + /// ## Parameters + /// - `slot_name`: Logical slot identifier (tasks with same name share a slot) + /// - `admission`: How to handle concurrent submissions + /// - `task_spec`: The task to execute + /// + /// ## Note + /// `slot_name` is independent of `task_spec.name()` β€” you can run the same + /// task in different slots by using different slot names. + pub fn new(slot_name: impl Into, admission: Admission, task_spec: TaskSpec) -> Self { + Self { + slot_name: slot_name.into(), + admission, + task_spec, + } + } + + /// Returns the slot name. + pub fn slot_name(&self) -> &str { + &self.slot_name + } +} diff --git a/src/core/builder.rs b/src/core/builder.rs new file mode 100644 index 0000000..9a30158 --- /dev/null +++ b/src/core/builder.rs @@ -0,0 +1,118 @@ +use std::sync::Arc; + +use crate::{ + config::Config, + events::Bus, + subscribers::{Subscribe, SubscriberSet}, +}; + +use super::{alive::AliveTracker, registry::Registry, supervisor::Supervisor}; + +/// Builder for constructing a Supervisor with optional features. +/// +/// ## Example +/// ```rust +/// use taskvisor::{Config, Supervisor}; +/// +/// // Without controller +/// let sup = Supervisor::builder(Config::default()) +/// .with_subscribers(vec![]) +/// .build(); +/// +/// // With controller (requires "controller" feature) +/// #[cfg(feature = "controller")] +/// { +/// use taskvisor::controller::ControllerConfig; +/// +/// let sup = Supervisor::builder(Config::default()) +/// .with_subscribers(vec![]) +/// .with_controller(ControllerConfig::default()) +/// .build(); +/// } +/// ``` +pub struct SupervisorBuilder { + cfg: Config, + subscribers: Vec>, + + #[cfg(feature = "controller")] + controller_config: Option, +} + +impl SupervisorBuilder { + /// Creates a new builder with the given configuration. + pub fn new(cfg: Config) -> Self { + Self { + cfg, + subscribers: Vec::new(), + + #[cfg(feature = "controller")] + controller_config: None, + } + } + + /// Sets event subscribers for observability. + /// + /// Subscribers receive runtime events (task lifecycle, failures, etc.) + /// through dedicated workers with bounded queues. + pub fn with_subscribers(mut self, subscribers: Vec>) -> Self { + self.subscribers = subscribers; + self + } + + /// Enables the controller with the given configuration. + /// + /// The controller manages task slots with admission policies + /// (Queue, Replace, DropIfRunning). + /// + /// Requires the `controller` feature flag. + #[cfg(feature = "controller")] + #[cfg_attr(docsrs, doc(cfg(feature = "controller")))] + pub fn with_controller(mut self, config: crate::controller::ControllerConfig) -> Self { + self.controller_config = Some(config); + self + } + + /// Builds and returns the Supervisor instance. + /// + /// This consumes the builder and initializes all runtime components: + /// - Event bus for broadcasting + /// - Registry for task lifecycle management + /// - Subscriber workers + /// - Optional controller (if configured) + // src/core/builder.rs + pub fn build(self) -> Arc { + let bus = Bus::new(self.cfg.bus_capacity_clamped()); + let subs = Arc::new(SubscriberSet::new(self.subscribers, bus.clone())); + let runtime_token = tokio_util::sync::CancellationToken::new(); + + let semaphore = self + .cfg + .concurrency_limit() + .map(tokio::sync::Semaphore::new) + .map(Arc::new); + + let registry = Registry::new(bus.clone(), runtime_token.clone(), semaphore); + + let alive = Arc::new(AliveTracker::new()); + + // Use internal constructor + let sup = Arc::new(Supervisor::new_internal( + self.cfg, + bus.clone(), + subs, + alive, + registry, + runtime_token.clone(), + )); + + // Initialize controller if configured + #[cfg(feature = "controller")] + if let Some(ctrl_cfg) = self.controller_config { + let controller = crate::controller::Controller::new(ctrl_cfg, &sup, bus); + + let _ = sup.controller.set(controller); + } + + sup + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs index 924b7b4..feb2836 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -101,6 +101,7 @@ mod actor; mod alive; +mod builder; mod registry; mod runner; mod shutdown; diff --git a/src/core/supervisor.rs b/src/core/supervisor.rs index 92e6338..83ce021 100644 --- a/src/core/supervisor.rs +++ b/src/core/supervisor.rs @@ -91,10 +91,10 @@ //! ``` use std::{sync::Arc, time::Duration}; -use tokio::{sync::Semaphore, sync::broadcast, time::timeout}; +use tokio::{sync::OnceCell, sync::broadcast, time::timeout}; use tokio_util::sync::CancellationToken; -use crate::core::{alive::AliveTracker, registry::Registry}; +use crate::core::{alive::AliveTracker, builder::SupervisorBuilder, registry::Registry}; use crate::{ config::Config, error::RuntimeError, @@ -118,28 +118,53 @@ pub struct Supervisor { alive: Arc, registry: Arc, runtime_token: CancellationToken, + + #[cfg(feature = "controller")] + pub(super) controller: OnceCell>, } impl Supervisor { - /// Creates a new supervisor with the given config and subscribers (maybe empty). - pub fn new(cfg: Config, subscribers: Vec>) -> Self { - let bus = Bus::new(cfg.bus_capacity_clamped()); - let subs = Arc::new(SubscriberSet::new(subscribers, bus.clone())); - let runtime_token = CancellationToken::new(); - let semaphore = Self::build_semaphore_static(&cfg); - - let registry = Registry::new(bus.clone(), runtime_token.clone(), semaphore); - + /// Internal constructor used by builder (not public API). + pub(crate) fn new_internal( + cfg: Config, + bus: Bus, + subs: Arc, + alive: Arc, + registry: Arc, + runtime_token: CancellationToken, + ) -> Self { Self { cfg, bus, subs, - alive: Arc::new(AliveTracker::new()), + alive, registry, runtime_token, + + #[cfg(feature = "controller")] + controller: OnceCell::new(), } } + /// Creates a new supervisor with the given config and subscribers (maybe empty). + pub fn new(cfg: Config, subscribers: Vec>) -> Arc { + Self::builder(cfg).with_subscribers(subscribers).build() + } + + /// Creates a builder for constructing a Supervisor. + /// + /// ## Example + /// ```rust + /// use taskvisor::{Config, Supervisor}; + /// + /// let sup = Supervisor::builder(Config::default()) + /// .with_subscribers(vec![]) + /// .build(); + /// ``` + pub fn builder(cfg: Config) -> SupervisorBuilder { + SupervisorBuilder::new(cfg) + } + /// Adds a new task to the supervisor at runtime. /// /// Publishes `TaskAddRequested` with the spec to the bus. @@ -229,6 +254,43 @@ impl Supervisor { self.wait_task_removed(&mut rx, name, wait_for).await } + /// Submits a task to the controller (if enabled). + /// + /// Returns an error if: + /// - Controller feature is disabled + /// - Controller is not configured + /// - Submission queue is full (use `try_submit` for non-blocking) + /// + /// Requires the `controller` feature flag. + #[cfg(feature = "controller")] + #[cfg_attr(docsrs, doc(cfg(feature = "controller")))] + pub async fn submit( + &self, + spec: crate::controller::ControllerSpec, + ) -> Result<(), crate::controller::SubmitError> { + match self.controller.get() { + Some(ctrl) => ctrl.handle().submit(spec).await, + None => Err(crate::controller::SubmitError::NotConfigured), + } + } + + /// Tries to submit a task without blocking. + /// + /// Returns `TrySubmitError::Full` if the queue is full. + /// + /// Requires the `controller` feature flag. + #[cfg(feature = "controller")] + #[cfg_attr(docsrs, doc(cfg(feature = "controller")))] + pub fn try_submit( + &self, + spec: crate::controller::ControllerSpec, + ) -> Result<(), crate::controller::SubmitError> { + match self.controller.get() { + Some(ctrl) => ctrl.handle().try_submit(spec), + None => Err(crate::controller::SubmitError::NotConfigured), + } + } + /// Listens to the internal event bus and fans out each received [`Event`] to all active subscribers. /// /// The listener also updates the [`AliveTracker`] before fan-out to keep @@ -268,13 +330,6 @@ impl Supervisor { }); } - /// Builds global semaphore for concurrency limiting. - /// - /// Returns `None` if unlimited. - fn build_semaphore_static(cfg: &Config) -> Option> { - cfg.concurrency_limit().map(Semaphore::new).map(Arc::new) - } - /// Waits for either shutdown signal or natural completion of all tasks. async fn drive_shutdown(&self) -> Result<(), RuntimeError> { tokio::select! { diff --git a/src/lib.rs b/src/lib.rs index d36f6c1..ea198ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,13 +151,13 @@ //! } //! ``` mod config; +pub mod controller; mod core; mod error; mod events; mod policies; mod subscribers; mod tasks; - // ---- Public re-exports ---- pub use config::Config; From b0fa1a2814e71f70a3cb654e79df259d1760f992 Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Tue, 21 Oct 2025 17:52:40 +0300 Subject: [PATCH 02/11] remove slot name from ControllerSpec - the same as TaskName --- examples/basic_controller.rs | 3 --- src/controller/core.rs | 1 - src/controller/spec.rs | 13 ++----------- 3 files changed, 2 insertions(+), 15 deletions(-) diff --git a/examples/basic_controller.rs b/examples/basic_controller.rs index e0f13ed..15b0b88 100644 --- a/examples/basic_controller.rs +++ b/examples/basic_controller.rs @@ -87,7 +87,6 @@ async fn main() -> anyhow::Result<()> { println!("[main] submitting build (Queue)"); sup.submit(ControllerSpec::new( - "build", Admission::Queue, make_task("build", 800), )) @@ -95,7 +94,6 @@ async fn main() -> anyhow::Result<()> { println!("[main] submitting test (DropIfRunning)"); sup.submit(ControllerSpec::new( - "test", Admission::DropIfRunning, make_task("test", 600), )) @@ -103,7 +101,6 @@ async fn main() -> anyhow::Result<()> { println!("[main] submitting deploy (Replace)"); sup.submit(ControllerSpec::new( - "deploy", Admission::Replace, make_task("deploy", 1000), )) diff --git a/src/controller/core.rs b/src/controller/core.rs index de0606c..dae21cf 100644 --- a/src/controller/core.rs +++ b/src/controller/core.rs @@ -44,7 +44,6 @@ impl ControllerHandle { /// Each slot can run at most one task at a time. New submissions are handled /// according to the configured admission policy. pub struct Controller { - #[allow(dead_code)] config: ControllerConfig, supervisor: Weak, bus: Bus, diff --git a/src/controller/spec.rs b/src/controller/spec.rs index 3b6bc4b..1e09bcb 100644 --- a/src/controller/spec.rs +++ b/src/controller/spec.rs @@ -6,9 +6,6 @@ use crate::TaskSpec; /// Combines a slot name, admission policy, and the actual task specification. #[derive(Clone)] pub struct ControllerSpec { - /// Logical slot name (tasks with same name share a slot). - slot_name: String, - /// Admission policy. pub admission: Admission, @@ -20,16 +17,10 @@ impl ControllerSpec { /// Creates a new controller submission specification. /// /// ## Parameters - /// - `slot_name`: Logical slot identifier (tasks with same name share a slot) /// - `admission`: How to handle concurrent submissions /// - `task_spec`: The task to execute - /// - /// ## Note - /// `slot_name` is independent of `task_spec.name()` β€” you can run the same - /// task in different slots by using different slot names. - pub fn new(slot_name: impl Into, admission: Admission, task_spec: TaskSpec) -> Self { + pub fn new(admission: Admission, task_spec: TaskSpec) -> Self { Self { - slot_name: slot_name.into(), admission, task_spec, } @@ -37,6 +28,6 @@ impl ControllerSpec { /// Returns the slot name. pub fn slot_name(&self) -> &str { - &self.slot_name + self.task_spec.name() } } From 78d604045ce2b6e39e8365d436ded75fbb3b4625 Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Tue, 21 Oct 2025 19:06:20 +0300 Subject: [PATCH 03/11] fix controller logic --- src/controller/core.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/controller/core.rs b/src/controller/core.rs index dae21cf..2c80356 100644 --- a/src/controller/core.rs +++ b/src/controller/core.rs @@ -160,25 +160,28 @@ impl Controller { /// Handles bus events (TaskStopped, TaskRemoved). async fn handle_event(&self, event: Arc) { match event.kind { - EventKind::TaskStopped | EventKind::TaskRemoved => { + EventKind::ActorExhausted + | EventKind::ActorDead + | EventKind::TaskRemoved => { self.on_task_finished(&event).await; } _ => {} } } - /// Called when a task finishes (stopped or removed). + /// Handles a terminal event (`ActorExhausted`, `ActorDead`, or `TaskRemoved`). + /// + /// IMPORTANT: Each slot is keyed by task name. async fn on_task_finished(&self, event: &Event) { - let Some(task_name) = event.task.as_deref() else { - return; - }; - let Some(sup) = self.supervisor.upgrade() else { - return; - }; + let Some(task_name) = event.task.as_deref() else { return }; + let Some(sup) = self.supervisor.upgrade() else { return }; + let mut slots = self.slots.write().await; - let Some(slot) = slots.get_mut(task_name) else { + let Some(slot) = slots.get_mut(task_name) else { return }; + + if matches!(slot.status, SlotStatus::Idle) { return; - }; + } slot.status = SlotStatus::Idle; if let Some(next_spec) = slot.queue.pop_front() { @@ -186,9 +189,7 @@ impl Controller { eprintln!("[controller] failed to start next task '{task_name}': {e}"); return; } - slot.status = SlotStatus::Running { - started_at: Instant::now(), - }; + slot.status = SlotStatus::Running { started_at: Instant::now() }; } } } From ed3db8fd73372bddcbfdc842ee41dde44b510ea3 Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Tue, 21 Oct 2025 19:24:29 +0300 Subject: [PATCH 04/11] add future TODO --- src/controller/core.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/controller/core.rs b/src/controller/core.rs index 2c80356..b9e0d2c 100644 --- a/src/controller/core.rs +++ b/src/controller/core.rs @@ -172,6 +172,7 @@ impl Controller { /// Handles a terminal event (`ActorExhausted`, `ActorDead`, or `TaskRemoved`). /// /// IMPORTANT: Each slot is keyed by task name. + /// TODO: maybe add `slot_name` with task_name as default. async fn on_task_finished(&self, event: &Event) { let Some(task_name) = event.task.as_deref() else { return }; let Some(sup) = self.supervisor.upgrade() else { return }; From 7fe08d7a29e488ebaadebbe15823a5d52a3be32b Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Tue, 21 Oct 2025 19:29:55 +0300 Subject: [PATCH 05/11] change logic --- src/controller/core.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/controller/core.rs b/src/controller/core.rs index b9e0d2c..e71edc9 100644 --- a/src/controller/core.rs +++ b/src/controller/core.rs @@ -114,6 +114,12 @@ impl Controller { } /// Handles a new task submission. + /// + // Replace semantics: + // - Enqueue the new spec at the head (latest-wins). + // - If slot was Running -> transition to Terminating and request remove once. + // - If already Terminating -> do NOT call remove again; just update the head. + // - The next task actually starts in `on_task_finished` upon terminal event. async fn handle_submission(&self, spec: ControllerSpec) { let Some(sup) = self.supervisor.upgrade() else { return; @@ -140,16 +146,18 @@ impl Controller { } (SlotStatus::Running { .. }, Admission::Replace) => { slot.queue.push_front(task_spec); - slot.status = SlotStatus::Terminating { - cancelled_at: Instant::now(), - }; + + slot.status = SlotStatus::Terminating { cancelled_at: Instant::now() }; if let Err(e) = sup.remove_task(&slot_name) { eprintln!("[controller] failed to cancel task '{slot_name}': {e}"); - } + }; } (SlotStatus::Running { .. }, Admission::Queue) => { slot.queue.push_back(task_spec); } + (SlotStatus::Terminating { .. }, Admission::Replace) => { + slot.queue.push_front(task_spec); + } (SlotStatus::Terminating { .. }, _) => { slot.queue.push_back(task_spec); } @@ -183,8 +191,8 @@ impl Controller { if matches!(slot.status, SlotStatus::Idle) { return; } - slot.status = SlotStatus::Idle; + if let Some(next_spec) = slot.queue.pop_front() { if let Err(e) = sup.add_task(next_spec) { eprintln!("[controller] failed to start next task '{task_name}': {e}"); From 23853e96f982ebba5fcdf82e308479e94373b0ab Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Tue, 21 Oct 2025 19:33:07 +0300 Subject: [PATCH 06/11] fmt --- src/controller/core.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/controller/core.rs b/src/controller/core.rs index e71edc9..f1d0353 100644 --- a/src/controller/core.rs +++ b/src/controller/core.rs @@ -147,7 +147,9 @@ impl Controller { (SlotStatus::Running { .. }, Admission::Replace) => { slot.queue.push_front(task_spec); - slot.status = SlotStatus::Terminating { cancelled_at: Instant::now() }; + slot.status = SlotStatus::Terminating { + cancelled_at: Instant::now(), + }; if let Err(e) = sup.remove_task(&slot_name) { eprintln!("[controller] failed to cancel task '{slot_name}': {e}"); }; @@ -168,9 +170,7 @@ impl Controller { /// Handles bus events (TaskStopped, TaskRemoved). async fn handle_event(&self, event: Arc) { match event.kind { - EventKind::ActorExhausted - | EventKind::ActorDead - | EventKind::TaskRemoved => { + EventKind::ActorExhausted | EventKind::ActorDead | EventKind::TaskRemoved => { self.on_task_finished(&event).await; } _ => {} @@ -182,11 +182,17 @@ impl Controller { /// IMPORTANT: Each slot is keyed by task name. /// TODO: maybe add `slot_name` with task_name as default. async fn on_task_finished(&self, event: &Event) { - let Some(task_name) = event.task.as_deref() else { return }; - let Some(sup) = self.supervisor.upgrade() else { return }; + let Some(task_name) = event.task.as_deref() else { + return; + }; + let Some(sup) = self.supervisor.upgrade() else { + return; + }; let mut slots = self.slots.write().await; - let Some(slot) = slots.get_mut(task_name) else { return }; + let Some(slot) = slots.get_mut(task_name) else { + return; + }; if matches!(slot.status, SlotStatus::Idle) { return; @@ -198,7 +204,9 @@ impl Controller { eprintln!("[controller] failed to start next task '{task_name}': {e}"); return; } - slot.status = SlotStatus::Running { started_at: Instant::now() }; + slot.status = SlotStatus::Running { + started_at: Instant::now(), + }; } } } From 8e6c3dda2ec9819327b21a5cde671b2c810fb21c Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Tue, 21 Oct 2025 19:51:01 +0300 Subject: [PATCH 07/11] move slots to DashMap --- Cargo.toml | 1 + src/controller/core.rs | 29 +++++++++++++++++------------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3142f5a..caa4669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,3 +28,4 @@ thiserror = "2.0.16" anyhow = "1.0.100" futures = "0.3.31" rand = "0.9.2" +dashmap = "6.1.0" diff --git a/src/controller/core.rs b/src/controller/core.rs index f1d0353..7a372b3 100644 --- a/src/controller/core.rs +++ b/src/controller/core.rs @@ -1,8 +1,8 @@ -use std::collections::HashMap; +use dashmap::DashMap; use std::sync::{Arc, Weak}; use std::time::Instant; -use tokio::sync::{RwLock, mpsc}; +use tokio::sync::{Mutex, RwLock, mpsc}; use tokio_util::sync::CancellationToken; use crate::{ @@ -48,8 +48,8 @@ pub struct Controller { supervisor: Weak, bus: Bus, - // Internal state (protected by RwLock). - slots: RwLock>, + // Concurrent slots map. + slots: DashMap>>, // Submission queue. tx: mpsc::Sender, @@ -65,7 +65,7 @@ impl Controller { config, supervisor: Arc::downgrade(supervisor), bus, - slots: RwLock::new(HashMap::new()), + slots: DashMap::new(), tx, rx: RwLock::new(Some(rx)), }) @@ -129,10 +129,8 @@ impl Controller { let admission = spec.admission; let task_spec = spec.task_spec; - let mut slots = self.slots.write().await; - let slot = slots - .entry(slot_name.clone()) - .or_insert_with(SlotState::new); + let slot_arc = self.get_or_create_slot(&slot_name); + let mut slot = slot_arc.lock().await; match (&slot.status, admission) { (SlotStatus::Idle, _) => { @@ -188,11 +186,10 @@ impl Controller { let Some(sup) = self.supervisor.upgrade() else { return; }; - - let mut slots = self.slots.write().await; - let Some(slot) = slots.get_mut(task_name) else { + let Some(slot_arc) = self.slots.get(task_name).map(|e| e.clone()) else { return; }; + let mut slot = slot_arc.lock().await; if matches!(slot.status, SlotStatus::Idle) { return; @@ -209,4 +206,12 @@ impl Controller { }; } } + + #[inline] + fn get_or_create_slot(&self, slot_name: &str) -> Arc> { + self.slots + .entry(slot_name.to_string()) + .or_insert_with(|| Arc::new(Mutex::new(SlotState::new()))) + .clone() + } } From 0b9a85b14e9b4bee3b365042eacfc4568c94724e Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Tue, 21 Oct 2025 22:14:17 +0300 Subject: [PATCH 08/11] add slots limit --- src/controller/config.rs | 4 ++++ src/controller/core.rs | 30 ++++++++++++++++++++++++------ 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/controller/config.rs b/src/controller/config.rs index 316330f..0687639 100644 --- a/src/controller/config.rs +++ b/src/controller/config.rs @@ -5,12 +5,16 @@ pub struct ControllerConfig { /// /// When full, `submit()` will wait and `try_submit()` will return `Full` error. pub queue_capacity: usize, + + /// Capacity of the slots. + pub slot_capacity: usize } impl Default for ControllerConfig { fn default() -> Self { Self { queue_capacity: 1024, + slot_capacity: 100, } } } diff --git a/src/controller/core.rs b/src/controller/core.rs index 7a372b3..d314960 100644 --- a/src/controller/core.rs +++ b/src/controller/core.rs @@ -132,6 +132,14 @@ impl Controller { let slot_arc = self.get_or_create_slot(&slot_name); let mut slot = slot_arc.lock().await; + if slot.queue.len() >= self.config.slot_capacity { + eprintln!( + "[controller] slot '{}' queue full ({}/{}), dropping submission", + slot_name, slot.queue.len(), self.config.slot_capacity + ); + return; + } + match (&slot.status, admission) { (SlotStatus::Idle, _) => { if let Err(e) = sup.add_task(task_spec) { @@ -143,20 +151,26 @@ impl Controller { }; } (SlotStatus::Running { .. }, Admission::Replace) => { - slot.queue.push_front(task_spec); + if let Some(head) = slot.queue.front_mut() { + *head = task_spec; + } else { + slot.queue.push_front(task_spec); + } - slot.status = SlotStatus::Terminating { - cancelled_at: Instant::now(), - }; + slot.status = SlotStatus::Terminating { cancelled_at: Instant::now() }; if let Err(e) = sup.remove_task(&slot_name) { eprintln!("[controller] failed to cancel task '{slot_name}': {e}"); - }; + } } (SlotStatus::Running { .. }, Admission::Queue) => { slot.queue.push_back(task_spec); } (SlotStatus::Terminating { .. }, Admission::Replace) => { - slot.queue.push_front(task_spec); + if let Some(head) = slot.queue.front_mut() { + *head = task_spec; + } else { + slot.queue.push_front(task_spec); + } } (SlotStatus::Terminating { .. }, _) => { slot.queue.push_back(task_spec); @@ -205,6 +219,10 @@ impl Controller { started_at: Instant::now(), }; } + if matches!(slot.status, SlotStatus::Idle) && slot.queue.is_empty() { + drop(slot); + self.slots.remove(task_name); + } } #[inline] From 7e90febc89ecbfeb0205b8ee2964255a8fdff7e0 Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Wed, 22 Oct 2025 14:05:10 +0300 Subject: [PATCH 09/11] update --- README.md | 5 +- examples/basic_controller.rs | 120 --------------------- examples/controller.rs | 169 +++++++++++++++++++++++++++++ examples/custom_subscriber.rs | 5 +- src/controller/admission.rs | 2 +- src/controller/config.rs | 2 +- src/controller/core.rs | 184 +++++++++++++++++++++++--------- src/controller/error.rs | 2 +- src/controller/mod.rs | 127 ++++++++++++++++++++-- src/controller/spec.rs | 22 +++- src/core/builder.rs | 12 +-- src/core/supervisor.rs | 8 +- src/events/event.rs | 24 +++++ src/lib.rs | 20 +++- src/subscribers/embedded/log.rs | 30 ++++++ 15 files changed, 528 insertions(+), 204 deletions(-) delete mode 100644 examples/basic_controller.rs create mode 100644 examples/controller.rs diff --git a/README.md b/README.md index 8a699a0..800061f 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ taskvisor = "0.0.7" > Optional features: > - `logging` enables the built-in [`LogWriter`], (demo logger); +> - `controller` enables the slot-based [`Controller`] with admission policies. ```toml [dependencies] @@ -189,7 +190,8 @@ Check out the [examples](./examples) directory for: - [retry_with_backoff.rs](examples/retry_with_backoff.rs): retry loop with exponential backoff and jitter - [dynamic_add_remove.rs](examples/dynamic_add_remove.rs): add/remove tasks at runtime via API - [custom_subscriber.rs](examples/custom_subscriber.rs): custom subscriber reacting to events -- [task_cancel.rs](examples/task_cancel.rs): task cancellation from outside +- [task_cancel.rs](examples/task_cancel.rs): task cancellation from outside +- [controller.rs](examples/controller.rs): examples with `controller` feature ```bash # basic / retry / dynamic do not require extra features @@ -198,6 +200,7 @@ cargo run --example retry_with_backoff cargo run --example dynamic_add_remove cargo run --example custom_subscriber cargo run --example task_cancel --features logging +cargo run --example controller --features controller ``` ## 🀝 Contributing diff --git a/examples/basic_controller.rs b/examples/basic_controller.rs deleted file mode 100644 index 15b0b88..0000000 --- a/examples/basic_controller.rs +++ /dev/null @@ -1,120 +0,0 @@ -//! # Example: Controller Pipeline - -use std::{sync::Arc, time::Duration}; -use tokio_util::sync::CancellationToken; - -use taskvisor::{ - BackoffPolicy, Config, RestartPolicy, Supervisor, TaskError, TaskFn, TaskRef, TaskSpec, -}; - -#[cfg(feature = "controller")] -use taskvisor::{controller::Admission, controller::ControllerConfig, controller::ControllerSpec}; - -/// One-shot task factory. -fn make_task(name: &'static str, work_ms: u64) -> TaskSpec { - let n = name.to_string(); - let task: TaskRef = TaskFn::arc(name, move |_ctx: CancellationToken| { - let n = n.clone(); - async move { - println!("[{n}] start (work {work_ms}ms)"); - tokio::time::sleep(Duration::from_millis(work_ms)).await; - println!("[{n}] done"); - Ok::<(), TaskError>(()) - } - }); - - TaskSpec::new( - task, - RestartPolicy::Never, - BackoffPolicy::default(), - Some(Duration::from_secs(5)), - ) -} - -#[cfg(feature = "controller")] -fn make_producer() -> TaskSpec { - let task: TaskRef = TaskFn::arc("producer", |ctx: CancellationToken| async move { - // Give controller time to start - tokio::time::sleep(Duration::from_millis(200)).await; - - // Get supervisor handle (need to pass via closure) - // For now, we'll submit via a shared channel or just demo the API - - println!("[producer] submitting build (Queue)"); - // sup.submit(ControllerSpec::new("build", Admission::Queue, make_task("build", 800))).await?; - - println!("[producer] done"); - Ok::<(), TaskError>(()) - }); - - TaskSpec::new( - task, - RestartPolicy::Never, - BackoffPolicy::default(), - Some(Duration::from_secs(5)), - ) -} - -#[tokio::main(flavor = "current_thread")] -async fn main() -> anyhow::Result<()> { - #[cfg(not(feature = "controller"))] - { - eprintln!("This example requires the 'controller' feature."); - eprintln!("Run with: cargo run --example basic_controller --features controller"); - return Ok(()); - } - - #[cfg(feature = "controller")] - { - let mut cfg = Config::default(); - cfg.grace = Duration::from_secs(5); - - let sup = Supervisor::builder(cfg) - .with_subscribers(vec![]) - .with_controller(ControllerConfig::default()) - .build(); - - // Spawn supervisor in background - let sup_clone = Arc::clone(&sup); - let sup_task = tokio::spawn(async move { - if let Err(e) = sup_clone.run(vec![]).await { - eprintln!("[supervisor] error: {e}"); - } - }); - - // Submit tasks via controller - tokio::time::sleep(Duration::from_millis(100)).await; - - println!("[main] submitting build (Queue)"); - sup.submit(ControllerSpec::new( - Admission::Queue, - make_task("build", 800), - )) - .await?; - - println!("[main] submitting test (DropIfRunning)"); - sup.submit(ControllerSpec::new( - Admission::DropIfRunning, - make_task("test", 600), - )) - .await?; - - println!("[main] submitting deploy (Replace)"); - sup.submit(ControllerSpec::new( - Admission::Replace, - make_task("deploy", 1000), - )) - .await?; - - // Wait a bit for tasks to complete - tokio::time::sleep(Duration::from_secs(3)).await; - - // Shutdown - drop(sup); - let _ = sup_task.await; - - println!("[main] finished"); - } - - Ok(()) -} diff --git a/examples/controller.rs b/examples/controller.rs new file mode 100644 index 0000000..1109780 --- /dev/null +++ b/examples/controller.rs @@ -0,0 +1,169 @@ +//! # Example: Controller Demo +//! Visual demonstration of the controller’s slot-based admission model. +//! +//! ```text +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ application β”‚ +//! β”‚ (user submits)β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ +//! submit(...) +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ controller β”‚ +//! β”‚ (admission logic) β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! publishes events +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ supervisor β”‚ +//! β”‚ (orchestrator) β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! spawns actors +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ task actor β”‚ +//! β”‚ (run / retry loop)β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! ``` +//! +//! Demonstrates the controller's admission policies: +//! - Queue: tasks execute sequentially (same slot name) +//! - Replace: new submission cancels running task (latest wins) +//! - DropIfRunning: new submission ignored if slot busy +//! +//! Shows how controller events are published and can be observed via `LogWriter`. +//! +//! ## Run +//! ```bash +//! cargo run --example basic_controller --features "controller,logging" +//! ``` +#[cfg(not(feature = "controller"))] +compile_error!( + "This example requires the 'controller' feature. Run with: --features controller,logging" +); + +#[cfg(not(feature = "logging"))] +compile_error!( + "This example requires the 'logging' feature. Run with: --features controller,logging" +); + +use std::{sync::Arc, time::Duration}; +use tokio_util::sync::CancellationToken; + +use taskvisor::LogWriter; +use taskvisor::{ + BackoffPolicy, Config, RestartPolicy, Supervisor, TaskError, TaskFn, TaskRef, TaskSpec, +}; +use taskvisor::{ControllerConfig, ControllerSpec}; + +/// Creates a task that simulates work. +fn make_worker(name: &'static str, work_ms: u64) -> TaskSpec { + let task_name = name.to_string(); + + let task: TaskRef = TaskFn::arc(name, move |ctx: CancellationToken| { + let name = task_name.clone(); + + async move { + println!("[{name}] started (work={work_ms}ms)"); + let start = tokio::time::Instant::now(); + + loop { + if ctx.is_cancelled() { + println!("[{name}] cancelled after {:?}", start.elapsed()); + return Ok::<(), TaskError>(()); + } + if start.elapsed().as_millis() >= work_ms as u128 { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + println!("[{name}] completed in {:?}", start.elapsed()); + Ok(()) + } + }); + TaskSpec::new( + task, + RestartPolicy::Never, + BackoffPolicy::default(), + Some(Duration::from_secs(10)), + ) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + println!("=== Controller Demo ===\n"); + + let sup = Supervisor::builder(Config::default()) + .with_subscribers(vec![Arc::new(LogWriter::default())]) + .with_controller(ControllerConfig { + queue_capacity: 100, + slot_capacity: 10, + }) + .build(); + + // Spawn supervisor in background + let sup_clone = Arc::clone(&sup); + let sup_task = tokio::spawn(async move { + if let Err(e) = sup_clone.run(vec![]).await { + eprintln!("Supervisor error: {e}"); + } + }); + tokio::time::sleep(Duration::from_millis(200)).await; + + // === Demo 1: Queue Policy (sequential execution in the SAME slot) === + println!("\n--- Demo 1: Queue Policy (sequential execution) ---"); + sup.submit(ControllerSpec::queue(make_worker("build", 500))) + .await?; + sup.submit(ControllerSpec::queue(make_worker("build", 500))) + .await?; + sup.submit(ControllerSpec::queue(make_worker("build", 500))) + .await?; + tokio::time::sleep(Duration::from_millis(2000)).await; + + // === Demo 2: Replace Policy (cancel current and start latest on terminal) === + println!("\n--- Demo 2: Replace Policy (cancel and restart) ---"); + sup.submit(ControllerSpec::replace(make_worker("deploy", 1000))) + .await?; + tokio::time::sleep(Duration::from_millis(300)).await; + + sup.submit(ControllerSpec::replace(make_worker("deploy", 1000))) + .await?; + tokio::time::sleep(Duration::from_millis(300)).await; + + sup.submit(ControllerSpec::replace(make_worker("deploy", 1000))) + .await?; + tokio::time::sleep(Duration::from_millis(1500)).await; + + // === Demo 3: DropIfRunning Policy (ignore if busy) === + println!("\n--- Demo 3: DropIfRunning Policy (ignore if busy) ---"); + sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800))) + .await?; + tokio::time::sleep(Duration::from_millis(200)).await; + + // These will be ignored (same slot is running) + sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800))) + .await?; + sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800))) + .await?; + tokio::time::sleep(Duration::from_millis(1000)).await; + + // After slot is idle, this will execute + sup.submit(ControllerSpec::drop_if_running(make_worker("health", 800))) + .await?; + tokio::time::sleep(Duration::from_millis(1000)).await; + + // === Demo 4: Queue Full (rejections) === + println!("\n--- Demo 4: Queue Full (rejection) ---"); + // Same slot "batch": slot_capacity = 10 β†’ 2 submissions will be rejected. + for _ in 1..=12 { + sup.submit(ControllerSpec::queue(make_worker("batch", 200))) + .await?; + } + tokio::time::sleep(Duration::from_secs(4)).await; + + println!("\n--- Demo Complete ---"); + drop(sup); + let _ = sup_task.await; + Ok(()) +} diff --git a/examples/custom_subscriber.rs b/examples/custom_subscriber.rs index fa3a6f0..d0fd569 100644 --- a/examples/custom_subscriber.rs +++ b/examples/custom_subscriber.rs @@ -133,10 +133,7 @@ impl Subscribe for ConsoleSubscriber { } // === Ignored === - EventKind::SubscriberPanicked - | EventKind::SubscriberOverflow - | EventKind::TaskAddRequested - | EventKind::TaskRemoveRequested => {} + _ => {} } } diff --git a/src/controller/admission.rs b/src/controller/admission.rs index 5c9da22..8f84f65 100644 --- a/src/controller/admission.rs +++ b/src/controller/admission.rs @@ -15,7 +15,7 @@ /// Policy controlling how new submissions are handled when a slot is busy. #[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum Admission { +pub enum ControllerAdmission { /// Skip task if already running. /// /// Use when: diff --git a/src/controller/config.rs b/src/controller/config.rs index 0687639..d1c8482 100644 --- a/src/controller/config.rs +++ b/src/controller/config.rs @@ -7,7 +7,7 @@ pub struct ControllerConfig { pub queue_capacity: usize, /// Capacity of the slots. - pub slot_capacity: usize + pub slot_capacity: usize, } impl Default for ControllerConfig { diff --git a/src/controller/core.rs b/src/controller/core.rs index d314960..24ab120 100644 --- a/src/controller/core.rs +++ b/src/controller/core.rs @@ -1,7 +1,9 @@ -use dashmap::DashMap; -use std::sync::{Arc, Weak}; -use std::time::Instant; +use std::{ + sync::{Arc, Weak}, + time::Instant, +}; +use dashmap::DashMap; use tokio::sync::{Mutex, RwLock, mpsc}; use tokio_util::sync::CancellationToken; @@ -11,9 +13,9 @@ use crate::{ }; use super::{ - admission::Admission, + admission::ControllerAdmission, config::ControllerConfig, - error::SubmitError, + error::ControllerError, slot::{SlotState, SlotStatus}, spec::ControllerSpec, }; @@ -26,15 +28,18 @@ pub struct ControllerHandle { impl ControllerHandle { /// Submit a task (async, waits if queue is full). - pub async fn submit(&self, spec: ControllerSpec) -> Result<(), SubmitError> { - self.tx.send(spec).await.map_err(|_| SubmitError::Closed) + pub async fn submit(&self, spec: ControllerSpec) -> Result<(), ControllerError> { + self.tx + .send(spec) + .await + .map_err(|_| ControllerError::Closed) } /// Try to submit without blocking (fails if queue full). - pub fn try_submit(&self, spec: ControllerSpec) -> Result<(), SubmitError> { + pub fn try_submit(&self, spec: ControllerSpec) -> Result<(), ControllerError> { self.tx.try_send(spec).map_err(|e| match e { - mpsc::error::TrySendError::Full(_) => SubmitError::Full, - mpsc::error::TrySendError::Closed(_) => SubmitError::Closed, + mpsc::error::TrySendError::Full(_) => ControllerError::Full, + mpsc::error::TrySendError::Closed(_) => ControllerError::Closed, }) } } @@ -96,7 +101,6 @@ impl Controller { .ok_or_else(|| anyhow::anyhow!("controller already running"))?; let mut bus_rx = self.bus.subscribe(); - loop { tokio::select! { _ = token.cancelled() => break, @@ -109,17 +113,16 @@ impl Controller { } } } - Ok(()) } /// Handles a new task submission. /// - // Replace semantics: - // - Enqueue the new spec at the head (latest-wins). - // - If slot was Running -> transition to Terminating and request remove once. - // - If already Terminating -> do NOT call remove again; just update the head. - // - The next task actually starts in `on_task_finished` upon terminal event. + /// Replace semantics (latest-wins): + /// - Replace does NOT grow the queue; it replaces the head (the immediate successor). + /// - If slot was Running β†’ transition to Terminating and request remove once. + /// - If already Terminating β†’ do NOT call remove again; just replace the head. + /// - The next task actually starts in `on_task_finished` upon terminal event. async fn handle_submission(&self, spec: ControllerSpec) { let Some(sup) = self.supervisor.upgrade() else { return; @@ -132,66 +135,117 @@ impl Controller { let slot_arc = self.get_or_create_slot(&slot_name); let mut slot = slot_arc.lock().await; - if slot.queue.len() >= self.config.slot_capacity { - eprintln!( - "[controller] slot '{}' queue full ({}/{}), dropping submission", - slot_name, slot.queue.len(), self.config.slot_capacity - ); - return; - } - match (&slot.status, admission) { (SlotStatus::Idle, _) => { if let Err(e) = sup.add_task(task_spec) { - eprintln!("[controller] failed to add task '{slot_name}': {e}"); + self.bus.publish( + Event::new(EventKind::ControllerRejected) + .with_task(slot_name.as_str()) + .with_reason(format!("add_failed: {}", e)), + ); return; } slot.status = SlotStatus::Running { started_at: Instant::now(), }; + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!("admission={:?} status=running", admission)), + ); } - (SlotStatus::Running { .. }, Admission::Replace) => { - if let Some(head) = slot.queue.front_mut() { - *head = task_spec; - } else { - slot.queue.push_front(task_spec); - } - - slot.status = SlotStatus::Terminating { cancelled_at: Instant::now() }; + (SlotStatus::Running { .. }, ControllerAdmission::Replace) => { + Self::replace_head_or_push(&mut slot, task_spec); + slot.status = SlotStatus::Terminating { + cancelled_at: Instant::now(), + }; + self.bus.publish( + Event::new(EventKind::ControllerSlotTransition) + .with_task(slot_name.as_str()) + .with_reason("runningβ†’terminating (replace)"), + ); if let Err(e) = sup.remove_task(&slot_name) { - eprintln!("[controller] failed to cancel task '{slot_name}': {e}"); + self.bus.publish( + Event::new(EventKind::ControllerRejected) + .with_task(slot_name.as_str()) + .with_reason(format!("remove_failed: {}", e)), + ); } + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!("admission=Replace depth={}", slot.queue.len())), + ); } - (SlotStatus::Running { .. }, Admission::Queue) => { + (SlotStatus::Running { .. }, ControllerAdmission::Queue) => { + if self.reject_if_full(&slot_name, slot.queue.len()) { + return; + } slot.queue.push_back(task_spec); + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!("admission=Queue depth={}", slot.queue.len())), + ); + } + (SlotStatus::Terminating { .. }, ControllerAdmission::Replace) => { + Self::replace_head_or_push(&mut slot, task_spec); + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!( + "admission=Replace status=terminating depth={}", + slot.queue.len() + )), + ); } - (SlotStatus::Terminating { .. }, Admission::Replace) => { - if let Some(head) = slot.queue.front_mut() { - *head = task_spec; - } else { - slot.queue.push_front(task_spec); + (SlotStatus::Terminating { .. }, ControllerAdmission::Queue) => { + if self.reject_if_full(&slot_name, slot.queue.len()) { + return; } + slot.queue.push_back(task_spec); + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!( + "admission=Queue status=terminating depth={}", + slot.queue.len() + )), + ); } - (SlotStatus::Terminating { .. }, _) => { + (SlotStatus::Running { .. }, ControllerAdmission::DropIfRunning) => {} + _ => { + if self.reject_if_full(&slot_name, slot.queue.len()) { + return; + } slot.queue.push_back(task_spec); + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(slot_name.as_str()) + .with_reason(format!( + "admission={:?} depth={}", + admission, + slot.queue.len() + )), + ); } - (SlotStatus::Running { .. }, Admission::DropIfRunning) => {} } } - /// Handles bus events (TaskStopped, TaskRemoved). + /// Handles bus events (terminal only). async fn handle_event(&self, event: Arc) { match event.kind { - EventKind::ActorExhausted | EventKind::ActorDead | EventKind::TaskRemoved => { - self.on_task_finished(&event).await; - } + EventKind::TaskRemoved => self.on_task_finished(&event).await, _ => {} } } - /// Handles a terminal event (`ActorExhausted`, `ActorDead`, or `TaskRemoved`). + /// Handles `TaskRemoved` for a task; frees the slot and optionally starts the queued next. /// - /// IMPORTANT: Each slot is keyed by task name. + /// Rationale: `ActorExhausted`/`ActorDead` may arrive before the actor is + /// deregistered in the Supervisor’s Registry. Starting the next task before + /// `TaskRemoved` can race and produce `task_already_exists`. By gating on + /// `TaskRemoved` we avoid double-adds and registry races. /// TODO: maybe add `slot_name` with task_name as default. async fn on_task_finished(&self, event: &Event) { let Some(task_name) = event.task.as_deref() else { @@ -218,6 +272,12 @@ impl Controller { slot.status = SlotStatus::Running { started_at: Instant::now(), }; + + self.bus.publish( + Event::new(EventKind::ControllerSubmitted) + .with_task(task_name) + .with_reason(format!("started_from_queue depth={}", slot.queue.len())), + ); } if matches!(slot.status, SlotStatus::Idle) && slot.queue.is_empty() { drop(slot); @@ -232,4 +292,30 @@ impl Controller { .or_insert_with(|| Arc::new(Mutex::new(SlotState::new()))) .clone() } + + #[inline] + fn reject_if_full(&self, slot_name: &str, slot_len: usize) -> bool { + if slot_len >= self.config.slot_capacity { + self.bus.publish( + Event::new(EventKind::ControllerRejected) + .with_task(slot_name) + .with_reason(format!( + "queue_full: {}/{}", + slot_len, self.config.slot_capacity + )), + ); + true + } else { + false + } + } + + #[inline] + fn replace_head_or_push(slot: &mut SlotState, task_spec: crate::TaskSpec) { + if let Some(head) = slot.queue.front_mut() { + *head = task_spec; + } else { + slot.queue.push_front(task_spec); + } + } } diff --git a/src/controller/error.rs b/src/controller/error.rs index 369e9fd..8a95b5e 100644 --- a/src/controller/error.rs +++ b/src/controller/error.rs @@ -2,7 +2,7 @@ use thiserror::Error; /// Error returned by [`Supervisor::submit`](crate::Supervisor::submit). #[derive(Error, Debug, Clone, Copy, PartialEq, Eq)] -pub enum SubmitError { +pub enum ControllerError { /// Controller is not configured (builder didn't call `with_controller`). #[error("controller not configured")] NotConfigured, diff --git a/src/controller/mod.rs b/src/controller/mod.rs index 7f21b68..e2107de 100644 --- a/src/controller/mod.rs +++ b/src/controller/mod.rs @@ -1,13 +1,124 @@ -pub mod admission; -pub mod config; -pub mod error; -pub mod spec; +//! # Controller: slot-based admission & start-on-`TaskRemoved` +//! +//! The **controller** is a thin policy layer (wrapper) over `TaskSpec` submission. +//! It enforces per-slot admission rules (`Queue`, `Replace`, `DropIfRunning`) and +//! starts the *next* task **only after** a terminal `TaskRemoved` event is observed +//! on the runtime bus. In this model, **one slot = one task name** +//! (`slot key = TaskSpec::name()`). +//! +//! --- +//! +//! ## Role in Taskvisor +//! +//! The controller accepts `ControllerSpec`, unwraps its `TaskSpec`, applies admission +//! rules (including Replace latest-wins), and delegates `add/remove` to the Supervisor. +//! It subscribes to the Bus and advances slots strictly on `TaskRemoved` to avoid +//! registry races and double starts. +//! +//! ```text +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ Application code β”‚ +//! β”‚ sup.submit(ControllerSpec) β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! β”‚ +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ Controller β”‚ (per-slot admission FSM) +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! unwraps & applies policy +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ TaskSpec β”‚ (from ControllerSpec) +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! add/remove +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +//! β”‚ Supervisor β”‚ +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! publishes runtime events +//! β–Ό +//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” subscribes +//! β”‚ Bus │◄────────────── Controller +//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! β–Ό +//! Task Actors +//! ``` +//! +//! **Flow summary** +//! 1. Application calls `sup.submit(ControllerSpec)`. +//! 2. Controller unwraps `TaskSpec` and applies `Admission` rules. +//! 3. If accepted, controller calls `Supervisor::add_task(TaskSpec)` or requests remove. +//! 4. On terminal `TaskRemoved` (via Bus), the slot becomes `Idle` and the next queued +//! task (if any) is started. +//! +//! --- +//! +//! ## Per-slot model +//! +//! - **Key**: `task_spec.name()` (exactly one running task per slot). +//! - **State**: `Idle | Running | Terminating`, with a FIFO queue per slot. +//! - **Replace (latest-wins)**: does **not** grow the queue; it **replaces the head** +//! (the immediate successor). The next task actually starts **only after `TaskRemoved`**. +//! +//! ```text +//! State machine (per task name) +//! +//! Idle ── submit β†’ start β†’ Running ── submit(Replace) β†’ Terminating +//! β–² (Supervisor.add) β”‚ +//! └─────────── on TaskRemoved β—„β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +//! β”œβ”€ if queue empty β†’ Idle +//! └─ if queue head exists β†’ start head β†’ Running +//! ``` +//! +//! Queue operations +//! - **Queue**: push to tail (FIFO). +//! - **Replace**: replace head (latest-wins), not increasing depth. +//! +//! ```text +//! Queue example (head on the left): +//! [ next, a, b ] --(Replace c)--> [ c, a, b ] --(Replace d)--> [ d, a, b ] +//! (head replaced) (head replaced) +//! ``` +//! +//! --- +//! +//! ## Why gate on `TaskRemoved` +//! +//! `ActorExhausted/ActorDead` may arrive **before** full deregistration of the actor. +//! Starting the next task on those signals can race the registry and cause +//! `task_already_exists`. Gating advancement on **`TaskRemoved`** prevents double-adds. +//! +//! --- +//! +//! ## Concurrency & scalability +//! +//! - `DashMap>>` avoids global map lock contention. +//! - Per-slot `Mutex` ensures updates to one slot don’t block others. +//! +//! --- +//! +//! ## Public surface +//! +//! - Configure via `Supervisor::builder(..).with_controller(ControllerConfig)`. +//! - Submit via `sup.submit(ControllerSpec::{queue, replace, drop_if_running}(...))`. +//! - Policies: [`ControllerAdmission`] = `Queue | Replace | DropIfRunning`. +//! - Controller emits `ControllerSubmitted`, `ControllerRejected`, and +//! `ControllerSlotTransition` (feature `"controller"`); readable with `"logging"`’s `LogWriter`. +//! +//! ## Invariants +//! - At most **one** running task per slot. +//! - Slots advance to next task **only** after `TaskRemoved`. +//! - `Replace` is **latest-wins** (head replace); `Queue` is FIFO. +mod admission; +mod config; +mod error; +mod spec; mod core; mod slot; -pub use admission::Admission; +pub use admission::ControllerAdmission; pub use config::ControllerConfig; -pub use core::{Controller, ControllerHandle}; -pub use error::SubmitError; -pub use spec::ControllerSpec; +pub use core::Controller; +pub use error::ControllerError; +pub use spec::ControllerSpec; \ No newline at end of file diff --git a/src/controller/spec.rs b/src/controller/spec.rs index 1e09bcb..76ad7d3 100644 --- a/src/controller/spec.rs +++ b/src/controller/spec.rs @@ -1,4 +1,4 @@ -use super::admission::Admission; +use super::admission::ControllerAdmission; use crate::TaskSpec; /// Request to submit a task to the controller. @@ -7,7 +7,7 @@ use crate::TaskSpec; #[derive(Clone)] pub struct ControllerSpec { /// Admission policy. - pub admission: Admission, + pub admission: ControllerAdmission, /// Task specification to run. pub task_spec: TaskSpec, @@ -19,7 +19,7 @@ impl ControllerSpec { /// ## Parameters /// - `admission`: How to handle concurrent submissions /// - `task_spec`: The task to execute - pub fn new(admission: Admission, task_spec: TaskSpec) -> Self { + pub fn new(admission: ControllerAdmission, task_spec: TaskSpec) -> Self { Self { admission, task_spec, @@ -30,4 +30,20 @@ impl ControllerSpec { pub fn slot_name(&self) -> &str { self.task_spec.name() } + + /// Convenience: Queue admission. + #[inline] + pub fn queue(task_spec: TaskSpec) -> Self { + Self::new(ControllerAdmission::Queue, task_spec) + } + + #[inline] + pub fn replace(task_spec: TaskSpec) -> Self { + Self::new(ControllerAdmission::Replace, task_spec) + } + + #[inline] + pub fn drop_if_running(task_spec: TaskSpec) -> Self { + Self::new(ControllerAdmission::DropIfRunning, task_spec) + } } diff --git a/src/core/builder.rs b/src/core/builder.rs index 9a30158..34e2323 100644 --- a/src/core/builder.rs +++ b/src/core/builder.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use tokio::sync; use crate::{ config::Config, @@ -88,14 +89,12 @@ impl SupervisorBuilder { let semaphore = self .cfg .concurrency_limit() - .map(tokio::sync::Semaphore::new) + .map(sync::Semaphore::new) .map(Arc::new); let registry = Registry::new(bus.clone(), runtime_token.clone(), semaphore); - let alive = Arc::new(AliveTracker::new()); - // Use internal constructor let sup = Arc::new(Supervisor::new_internal( self.cfg, bus.clone(), @@ -105,14 +104,13 @@ impl SupervisorBuilder { runtime_token.clone(), )); - // Initialize controller if configured #[cfg(feature = "controller")] if let Some(ctrl_cfg) = self.controller_config { - let controller = crate::controller::Controller::new(ctrl_cfg, &sup, bus); + let controller = crate::controller::Controller::new(ctrl_cfg, &sup, bus.clone()); - let _ = sup.controller.set(controller); + let _ = sup.controller.set(Arc::clone(&controller)); + controller.run(runtime_token.clone()); } - sup } } diff --git a/src/core/supervisor.rs b/src/core/supervisor.rs index 83ce021..3edc512 100644 --- a/src/core/supervisor.rs +++ b/src/core/supervisor.rs @@ -267,10 +267,10 @@ impl Supervisor { pub async fn submit( &self, spec: crate::controller::ControllerSpec, - ) -> Result<(), crate::controller::SubmitError> { + ) -> Result<(), crate::controller::ControllerError> { match self.controller.get() { Some(ctrl) => ctrl.handle().submit(spec).await, - None => Err(crate::controller::SubmitError::NotConfigured), + None => Err(crate::controller::ControllerError::NotConfigured), } } @@ -284,10 +284,10 @@ impl Supervisor { pub fn try_submit( &self, spec: crate::controller::ControllerSpec, - ) -> Result<(), crate::controller::SubmitError> { + ) -> Result<(), crate::controller::ControllerError> { match self.controller.get() { Some(ctrl) => ctrl.handle().try_submit(spec), - None => Err(crate::controller::SubmitError::NotConfigured), + None => Err(crate::controller::ControllerError::NotConfigured), } } diff --git a/src/events/event.rs b/src/events/event.rs index 4b27e02..1c1ccd7 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -191,6 +191,30 @@ pub enum EventKind { /// - `at`: wall-clock timestamp /// - `seq`: global sequence ActorDead, + + #[cfg(feature = "controller")] + /// Controller submission rejected (queue full, add failed, etc). + /// + /// Sets: + /// - `task`: slot name + /// - `reason`: rejection reason ("queue_full", "add_failed: ...", etc) + ControllerRejected, + + #[cfg(feature = "controller")] + /// Task submitted successfully to controller slot. + /// + /// Sets: + /// - `task`: slot name + /// - `reason`: "admission={admission} status={status} depth={N}" + ControllerSubmitted, + + #[cfg(feature = "controller")] + /// Slot transitioned state (Running β†’ Terminating, etc). + /// + /// Sets: + /// - `task`: slot name + /// - `reason`: "runningβ†’terminating" (Replace), "terminatingβ†’idle", etc + ControllerSlotTransition, } /// Reason for schedule the next run/backoff. diff --git a/src/lib.rs b/src/lib.rs index ea198ce..8250c46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,7 @@ //! β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ //! β–Ό β–Ό β–Ό //! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” -//! β”‚ Supervisor (runtime orchestrator) β”‚ +//! β”‚ Supervisor (runtime orchestrator) β”‚ //! β”‚ - Bus (broadcast events) β”‚ //! β”‚ - AliveTracker (tracks task state with sequence numbers) β”‚ //! β”‚ - SubscriberSet (fans out to user subscribers) β”‚ @@ -104,7 +104,7 @@ //! //! ## Optional features //! - `logging`: exports a simple built-in [`LogWriter`] _(demo/reference only)_. -//! - `events`: exports [`Event`] and [`EventKind`] for advanced integrations. +//! - `controller`: exposes controller runtime and admission types. //! //! ## Example //! ```rust @@ -122,13 +122,16 @@ //! #[cfg(feature = "logging")] //! let subs: Vec> = { //! use taskvisor::LogWriter; -//! vec![Arc::new(LogWriter)] +//! vec![Arc::new(LogWriter::default())] //! }; //! #[cfg(not(feature = "logging"))] //! let subs: Vec> = Vec::new(); //! //! // Create supervisor -//! let sup = Supervisor::new(cfg, subs); +//! let sup = taskvisor::Supervisor::builder(cfg) +//! .with_subscribers(subs) +//! // .with_controller(ControllerConfig { ... }) // if feature = "controller" +//! .build(); //! //! // Define a simple task that runs once and exits //! let hello: TaskRef = TaskFn::arc("hello", |ctx: CancellationToken| async move { @@ -151,13 +154,13 @@ //! } //! ``` mod config; -pub mod controller; mod core; mod error; mod events; mod policies; mod subscribers; mod tasks; + // ---- Public re-exports ---- pub use config::Config; @@ -168,6 +171,13 @@ pub use policies::{BackoffPolicy, JitterPolicy, RestartPolicy}; pub use subscribers::{Subscribe, SubscriberSet}; pub use tasks::{Task, TaskFn, TaskRef, TaskSpec}; +// Optional: expose a controller object. +// Enable with: `--features controller` +#[cfg(feature = "controller")] +mod controller; +#[cfg(feature = "controller")] +pub use controller::{ControllerAdmission, ControllerConfig, ControllerError, ControllerSpec}; + // Optional: expose a simple built-in logger subscriber (demo/reference). // Enable with: `--features logging` #[cfg(feature = "logging")] diff --git a/src/subscribers/embedded/log.rs b/src/subscribers/embedded/log.rs index d0db55d..a3d7660 100644 --- a/src/subscribers/embedded/log.rs +++ b/src/subscribers/embedded/log.rs @@ -180,6 +180,36 @@ impl Subscribe for LogWriter { or(e.reason.as_deref(), "fatal") ); } + + #[cfg(feature = "controller")] + EventKind::ControllerRejected => { + println!( + "{} [controller-rejected] slot={} reason=\"{}\"", + seq, + or(e.task.as_deref(), "none"), + or(e.reason.as_deref(), "unknown") + ); + } + + #[cfg(feature = "controller")] + EventKind::ControllerSubmitted => { + println!( + "{} [controller-submitted] slot={} {}", + seq, + or(e.task.as_deref(), "none"), + or(e.reason.as_deref(), "") + ); + } + + #[cfg(feature = "controller")] + EventKind::ControllerSlotTransition => { + println!( + "{} [controller-transition] slot={} transition=\"{}\"", + seq, + or(e.task.as_deref(), "none"), + or(e.reason.as_deref(), "unknown") + ); + } } } From b79e20f5442558b358a683870dd7eff6472efe20 Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Wed, 22 Oct 2025 14:06:23 +0300 Subject: [PATCH 10/11] fix clippy --- src/controller/core.rs | 5 +---- src/controller/mod.rs | 6 +++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/controller/core.rs b/src/controller/core.rs index 24ab120..331e53d 100644 --- a/src/controller/core.rs +++ b/src/controller/core.rs @@ -234,10 +234,7 @@ impl Controller { /// Handles bus events (terminal only). async fn handle_event(&self, event: Arc) { - match event.kind { - EventKind::TaskRemoved => self.on_task_finished(&event).await, - _ => {} - } + if event.kind == EventKind::TaskRemoved { self.on_task_finished(&event).await } } /// Handles `TaskRemoved` for a task; frees the slot and optionally starts the queued next. diff --git a/src/controller/mod.rs b/src/controller/mod.rs index e2107de..9c18b70 100644 --- a/src/controller/mod.rs +++ b/src/controller/mod.rs @@ -112,13 +112,13 @@ mod admission; mod config; -mod error; -mod spec; mod core; +mod error; mod slot; +mod spec; pub use admission::ControllerAdmission; pub use config::ControllerConfig; pub use core::Controller; pub use error::ControllerError; -pub use spec::ControllerSpec; \ No newline at end of file +pub use spec::ControllerSpec; From 7e226b5c4785d1fe355d7d4f5e4dfc1e38ed80ae Mon Sep 17 00:00:00 2001 From: mr-chelyshkin Date: Wed, 22 Oct 2025 14:09:46 +0300 Subject: [PATCH 11/11] fix checks --- src/controller/core.rs | 4 +++- src/core/builder.rs | 21 --------------------- 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/src/controller/core.rs b/src/controller/core.rs index 331e53d..c304b94 100644 --- a/src/controller/core.rs +++ b/src/controller/core.rs @@ -234,7 +234,9 @@ impl Controller { /// Handles bus events (terminal only). async fn handle_event(&self, event: Arc) { - if event.kind == EventKind::TaskRemoved { self.on_task_finished(&event).await } + if event.kind == EventKind::TaskRemoved { + self.on_task_finished(&event).await + } } /// Handles `TaskRemoved` for a task; frees the slot and optionally starts the queued next. diff --git a/src/core/builder.rs b/src/core/builder.rs index 34e2323..1e1509a 100644 --- a/src/core/builder.rs +++ b/src/core/builder.rs @@ -10,27 +10,6 @@ use crate::{ use super::{alive::AliveTracker, registry::Registry, supervisor::Supervisor}; /// Builder for constructing a Supervisor with optional features. -/// -/// ## Example -/// ```rust -/// use taskvisor::{Config, Supervisor}; -/// -/// // Without controller -/// let sup = Supervisor::builder(Config::default()) -/// .with_subscribers(vec![]) -/// .build(); -/// -/// // With controller (requires "controller" feature) -/// #[cfg(feature = "controller")] -/// { -/// use taskvisor::controller::ControllerConfig; -/// -/// let sup = Supervisor::builder(Config::default()) -/// .with_subscribers(vec![]) -/// .with_controller(ControllerConfig::default()) -/// .build(); -/// } -/// ``` pub struct SupervisorBuilder { cfg: Config, subscribers: Vec>,