Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions examples/rust_supervised_app/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ fn main_logic(args: &Args, stop: Arc<AtomicBool>) -> Result<(), Box<dyn std::err
.add_deadline_monitor(MonitorTag::from("mon1"), builder)
.with_supervisor_api_cycle(std::time::Duration::from_millis(50))
.with_internal_processing_cycle(std::time::Duration::from_millis(50))
.build();
.build()
.expect("Failed to build health monitor");

let mon = hm
.get_deadline_monitor(MonitorTag::from("mon1"))
.expect("Failed to get monitor");

hm.start();
hm.start().expect("Failed to start health monitor");

if !lifecycle_client_rs::report_execution_state_running() {
error!("Rust app FAILED to report execution state!");
Expand Down
27 changes: 24 additions & 3 deletions src/health_monitoring_lib/rust/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// SPDX-License-Identifier: Apache-2.0
// *******************************************************************************

use crate::deadline::DeadlineEvaluationError;
use crate::log::ScoreDebug;
use crate::tag::MonitorTag;
use core::hash::Hash;
use core::time::Duration;
Expand All @@ -30,11 +32,30 @@ impl TimeRange {
}
}

/// A monitor with an evaluation handle available.
pub(crate) trait Monitor {
/// Get an evaluation handle for this monitor.
///
/// # NOTE
///
/// This evaluation handle is intended to be called from a background thread periodically.
fn get_eval_handle(&self) -> MonitorEvalHandle;
}

/// Errors that can occur during monitor evaluation.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, crate::log::ScoreDebug)]
/// Contains failing monitor type.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, ScoreDebug)]
#[allow(dead_code)]
pub(crate) enum MonitorEvaluationError {
TooEarly,
TooLate,
Deadline(DeadlineEvaluationError),
Heartbeat,
Logic,
}

impl From<DeadlineEvaluationError> for MonitorEvaluationError {
fn from(value: DeadlineEvaluationError) -> Self {
MonitorEvaluationError::Deadline(value)
}
}

/// Trait for evaluating monitors and reporting errors to be used by HealthMonitor.
Expand Down
187 changes: 97 additions & 90 deletions src/health_monitoring_lib/rust/deadline/deadline_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,26 @@
//
// SPDX-License-Identifier: Apache-2.0
// *******************************************************************************
use super::common::DeadlineTemplate;
use crate::common::{MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator, TimeRange};
use crate::common::{Monitor, MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator};
use crate::deadline::common::{DeadlineTemplate, StateIndex};
use crate::deadline::deadline_state::{DeadlineState, DeadlineStateSnapshot};
use crate::log::{error, warn, ScoreDebug};
use crate::protected_memory::ProtectedMemoryAllocator;
use crate::tag::{DeadlineTag, MonitorTag};
use crate::{
deadline::{
common::StateIndex,
deadline_state::{DeadlineState, DeadlineStateSnapshot},
},
protected_memory::ProtectedMemoryAllocator,
};
use crate::TimeRange;
use core::hash::Hash;
use std::{collections::HashMap, sync::Arc, time::Instant};

use crate::log::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

/// Deadline evaluation errors.
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, ScoreDebug)]
pub(crate) enum DeadlineEvaluationError {
/// Finished too early.
TooEarly,
/// Finished too late.
TooLate,
}

///
/// Errors that can occur when working with DeadlineMonitor
Expand Down Expand Up @@ -65,7 +71,8 @@ impl DeadlineMonitorBuilder {

/// Builds the DeadlineMonitor with the configured deadlines.
pub(crate) fn build(self, monitor_tag: MonitorTag, _allocator: &ProtectedMemoryAllocator) -> DeadlineMonitor {
DeadlineMonitor::new(monitor_tag, self.deadlines)
let inner = Arc::new(DeadlineMonitorInner::new(monitor_tag, self.deadlines));
DeadlineMonitor::new(inner)
}

// Used by FFI and config parsing code which prefer not to move builder instance
Expand All @@ -80,27 +87,9 @@ pub struct DeadlineMonitor {
}

impl DeadlineMonitor {
fn new(monitor_tag: MonitorTag, deadlines: HashMap<DeadlineTag, TimeRange>) -> Self {
let mut active_deadlines = vec![];

let deadlines = deadlines
.into_iter()
.enumerate()
.map(|(index, (deadline_tag, range))| {
active_deadlines.push((deadline_tag, DeadlineState::new()));
(deadline_tag, DeadlineTemplate::new(range, StateIndex::new(index)))
})
.collect();

Self {
#[allow(clippy::arc_with_non_send_sync)] // This will be fixed once we add background thread
inner: Arc::new(DeadlineMonitorInner {
monitor_tag,
deadlines,
active_deadlines: active_deadlines.into(),
monitor_starting_point: Instant::now(),
}),
}
/// Create a new [`DeadlineMonitor`] instance.
fn new(inner: Arc<DeadlineMonitorInner>) -> Self {
Self { inner }
}

/// Acquires a deadline instance for the given tag.
Expand All @@ -109,26 +98,12 @@ impl DeadlineMonitor {
/// - Err(DeadlineMonitorError::DeadlineInUse) - if the deadline is already in use
/// - Err(DeadlineMonitorError::DeadlineNotFound) - if the deadline tag is not registered
pub fn get_deadline(&self, deadline_tag: DeadlineTag) -> Result<Deadline, DeadlineMonitorError> {
if let Some(template) = self.inner.deadlines.get(&deadline_tag) {
match template.acquire_deadline() {
Some(range) => Ok(Deadline {
range,
deadline_tag,
monitor: Arc::clone(&self.inner),
state_index: template.assigned_state_index,
}),
None => Err(DeadlineMonitorError::DeadlineInUse),
}
} else {
Err(DeadlineMonitorError::DeadlineNotFound)
}
self.inner.get_deadline(deadline_tag)
}
}

/// Handle for evaluation of all active deadlines and reporting any missed deadlines or underruns.
///
/// # NOTE
/// This function is intended to be called from a background thread periodically.
pub(crate) fn get_eval_handle(&self) -> MonitorEvalHandle {
impl Monitor for DeadlineMonitor {
fn get_eval_handle(&self) -> MonitorEvalHandle {
MonitorEvalHandle::new(Arc::clone(&self.inner))
}
}
Expand Down Expand Up @@ -220,7 +195,7 @@ impl Deadline {

let expected = current.timestamp_ms();
if expected < now {
possible_err = (Some(MonitorEvaluationError::TooLate), now - expected);
possible_err = (Some(DeadlineEvaluationError::TooLate), now - expected);
return None; // Deadline missed, let state as is for BG thread to report
}

Expand All @@ -231,18 +206,18 @@ impl Deadline {
// Finished too early, leave it for reporting by BG thread

current.set_underrun();
possible_err = (Some(MonitorEvaluationError::TooEarly), earliest_time - now);
possible_err = (Some(DeadlineEvaluationError::TooEarly), earliest_time - now);
return Some(current);
}

Some(DeadlineStateSnapshot::default()) // Reset to stopped state as all fine
});

match possible_err {
(Some(MonitorEvaluationError::TooEarly), val) => {
(Some(DeadlineEvaluationError::TooEarly), val) => {
error!("Deadline {:?} stopped too early by {} ms", self.deadline_tag, val);
},
(Some(MonitorEvaluationError::TooLate), val) => {
(Some(DeadlineEvaluationError::TooLate), val) => {
error!("Deadline {:?} stopped too late by {} ms", self.deadline_tag, val);
},
(None, _) => {},
Expand Down Expand Up @@ -286,35 +261,14 @@ struct DeadlineMonitorInner {

impl MonitorEvaluator for DeadlineMonitorInner {
fn evaluate(&self, on_error: &mut dyn FnMut(&MonitorTag, MonitorEvaluationError)) {
self.evaluate(on_error);
}
}

impl DeadlineMonitorInner {
fn release_deadline(&self, deadline_tag: DeadlineTag) {
if let Some(template) = self.deadlines.get(&deadline_tag) {
template.release_deadline();
} else {
unreachable!("Releasing unknown deadline tag: {:?}", deadline_tag);
}
}

fn now(&self) -> u32 {
let duration = self.monitor_starting_point.elapsed();
// As u32 can hold up to ~49 days in milliseconds, this should be sufficient for our use case
// We still have a room up to 60bits timestamp if needed in future
u32::try_from(duration.as_millis()).expect("Monitor running for too long")
}

fn evaluate(&self, mut on_failed: impl FnMut(&MonitorTag, MonitorEvaluationError)) {
for (deadline_tag, deadline) in self.active_deadlines.iter() {
let snapshot = deadline.snapshot();
if snapshot.is_underrun() {
// Deadline finished too early, report
warn!("Deadline ({:?}) finished too early!", deadline_tag);

// Here we would normally report the underrun to the monitoring system
on_failed(&self.monitor_tag, MonitorEvaluationError::TooEarly);
on_error(&self.monitor_tag, DeadlineEvaluationError::TooEarly.into());
} else if snapshot.is_running() {
debug_assert!(
snapshot.is_stopped(),
Expand All @@ -331,13 +285,66 @@ impl DeadlineMonitorInner {
);

// Here we would normally report the missed deadline to the monitoring system
on_failed(&self.monitor_tag, MonitorEvaluationError::TooLate);
on_error(&self.monitor_tag, DeadlineEvaluationError::TooLate.into());
}
}
}
}
}

impl DeadlineMonitorInner {
fn new(monitor_tag: MonitorTag, deadlines: HashMap<DeadlineTag, TimeRange>) -> Self {
let mut active_deadlines = vec![];

let deadlines = deadlines
.into_iter()
.enumerate()
.map(|(index, (deadline_tag, range))| {
active_deadlines.push((deadline_tag, DeadlineState::new()));
(deadline_tag, DeadlineTemplate::new(range, StateIndex::new(index)))
})
.collect();

Self {
monitor_tag,
deadlines,
active_deadlines: active_deadlines.into(),
monitor_starting_point: Instant::now(),
}
}

fn release_deadline(&self, deadline_tag: DeadlineTag) {
if let Some(template) = self.deadlines.get(&deadline_tag) {
template.release_deadline();
} else {
unreachable!("Releasing unknown deadline tag: {:?}", deadline_tag);
}
}

pub(crate) fn get_deadline(self: &Arc<Self>, deadline_tag: DeadlineTag) -> Result<Deadline, DeadlineMonitorError> {
if let Some(template) = self.deadlines.get(&deadline_tag) {
match template.acquire_deadline() {
Some(range) => Ok(Deadline {
range,
deadline_tag,
monitor: self.clone(),
state_index: template.assigned_state_index,
}),
None => Err(DeadlineMonitorError::DeadlineInUse),
}
} else {
Err(DeadlineMonitorError::DeadlineNotFound)
}
}

fn now(&self) -> u32 {
let duration = self.monitor_starting_point.elapsed();
// As u32 can hold up to ~49 days in milliseconds, this should be sufficient for our use case
// We still have a room up to 60bits timestamp if needed in future
u32::try_from(duration.as_millis()).expect("Monitor running for too long")
}
}

#[score_testing_macros::test_mod_with_log]
#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -410,7 +417,7 @@ mod tests {

drop(handle); // stop the deadline

monitor.inner.evaluate(|monitor_tag, deadline_failure| {
monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| {
panic!(
"Deadline {:?} should not have failed or underrun({:?})",
monitor_tag, deadline_failure
Expand All @@ -426,10 +433,10 @@ mod tests {

drop(handle); // stop the deadline

monitor.inner.evaluate(|monitor_tag, deadline_failure| {
monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| {
assert_eq!(
deadline_failure,
MonitorEvaluationError::TooEarly,
DeadlineEvaluationError::TooEarly.into(),
"Deadline {:?} should not have failed({:?})",
monitor_tag,
deadline_failure
Expand All @@ -444,10 +451,10 @@ mod tests {

// So deadline stop happens after evaluate, still it should be reported as failed

monitor.inner.evaluate(|monitor_tag, deadline_failure| {
monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| {
assert_eq!(
deadline_failure,
MonitorEvaluationError::TooEarly,
DeadlineEvaluationError::TooEarly.into(),
"Deadline {:?} should not have failed({:?})",
monitor_tag,
deadline_failure
Expand All @@ -470,10 +477,10 @@ mod tests {
let handle = deadline.start();
assert_eq!(handle.err(), Some(DeadlineError::DeadlineAlreadyFailed));

monitor.inner.evaluate(|monitor_tag, deadline_failure| {
monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| {
assert_eq!(
deadline_failure,
MonitorEvaluationError::TooEarly,
DeadlineEvaluationError::TooEarly.into(),
"Deadline {:?} should not have failed ({:?})",
monitor_tag,
deadline_failure
Expand All @@ -489,10 +496,10 @@ mod tests {

drop(handle); // stop the deadline

monitor.inner.evaluate(|monitor_tag, deadline_failure| {
monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| {
assert_eq!(
deadline_failure,
MonitorEvaluationError::TooLate,
DeadlineEvaluationError::TooLate.into(),
"Deadline {:?} should not have failed({:?})",
monitor_tag,
deadline_failure
Expand All @@ -517,11 +524,11 @@ mod tests {

let mut cnt = 0;

monitor.inner.evaluate(|monitor_tag, deadline_failure| {
monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| {
cnt += 1;
assert_eq!(
deadline_failure,
MonitorEvaluationError::TooLate,
DeadlineEvaluationError::TooLate.into(),
"Deadline {:?} should not have failed({:?})",
monitor_tag,
deadline_failure
Expand Down
1 change: 1 addition & 0 deletions src/health_monitoring_lib/rust/deadline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod common;
mod deadline_monitor;
mod deadline_state;

pub(crate) use deadline_monitor::DeadlineEvaluationError;
pub use deadline_monitor::{
DeadlineError, DeadlineHandle, DeadlineMonitor, DeadlineMonitorBuilder, DeadlineMonitorError,
};
Expand Down
Loading
Loading