From af3f38fd9e7b8f6f472b4fb86bb143ea0752d70b Mon Sep 17 00:00:00 2001 From: Artem Shein Date: Thu, 12 Mar 2026 09:06:58 +0100 Subject: [PATCH] feo-com: mw com integrated as a backend --- MODULE.bazel | 26 +- examples/rust/cycle-benchmark/BUILD.bazel | 4 +- .../cycle-benchmark/src/bin/cycle_bench.rs | 88 ----- examples/rust/mini-adas/BUILD.bazel | 76 +++-- examples/rust/mini-adas/README.md | 51 +-- .../rust/mini-adas/mini-adas-gen/BUILD.bazel | 21 +- examples/rust/mini-adas/mini-adas-gen/lib.rs | 149 +++++++++ ...ni_adas_gen.rs => mini_adas_gen_mw_com.rs} | 68 +--- .../mini-adas/src/activities/components.rs | 249 ++++++-------- .../rust/mini-adas/src/activities/input.rs | 99 ++++++ examples/rust/mini-adas/src/activities/mod.rs | 2 + .../rust/mini-adas/src/activities/output.rs | 71 ++++ .../rust/mini-adas/src/bin/adas_primary.rs | 16 + .../rust/mini-adas/src/bin/adas_secondary.rs | 41 +++ examples/rust/mini-adas/src/config.rs | 5 +- src/feo-com/BUILD.bazel | 33 +- src/feo-com/src/interface.rs | 191 +++++++++-- src/feo-com/src/lib.rs | 2 + src/feo-com/src/linux_shm/mod.rs | 45 ++- src/feo-com/src/mw_com/mod.rs | 188 +++++++++++ src/feo/BUILD.bazel | 79 ----- src/feo/src/lib.rs | 2 - src/feo/src/recording/mod.rs | 21 -- src/feo/src/recording/recorder.rs | 308 ------------------ src/feo/src/recording/registry.rs | 186 ----------- src/feo/src/recording/transcoder.rs | 104 ------ src/feo/src/topicspec.rs | 6 +- 27 files changed, 985 insertions(+), 1146 deletions(-) create mode 100644 examples/rust/mini-adas/mini-adas-gen/lib.rs rename examples/rust/mini-adas/mini-adas-gen/{mini_adas_gen.rs => mini_adas_gen_mw_com.rs} (87%) create mode 100644 examples/rust/mini-adas/src/activities/input.rs create mode 100644 examples/rust/mini-adas/src/activities/output.rs create mode 100644 src/feo-com/src/mw_com/mod.rs delete mode 100644 src/feo/src/recording/mod.rs delete mode 100644 src/feo/src/recording/recorder.rs delete mode 100644 src/feo/src/recording/registry.rs delete mode 100644 src/feo/src/recording/transcoder.rs diff --git a/MODULE.bazel b/MODULE.bazel index 97fb963..c1f873e 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -20,7 +20,13 @@ bazel_dep(name = "platforms", version = "1.0.0") bazel_dep(name = "score_bazel_platforms", version = "0.1.1") # SCORE bazel module dependencies -bazel_dep(name = "score_tooling", version = "1.1.0") +bazel_dep(name = "score_tooling", version = "1.1.2") +git_override( + module_name = "score_tooling", + commit = "58581abf75b321defbb53ffbad3b65e7f62b5082", + remote = "https://github.com/eclipse-score/tooling.git", +) + bazel_dep(name = "score_crates", version = "0.0.6") git_override( module_name = "score_crates", @@ -41,14 +47,14 @@ bazel_dep(name = "score_communication", version = "0.1.2") git_override( module_name = "score_communication", - commit = "dfc9b0f6fd2ae43cecdf11f4684ee2aa628faaf5", + commit = "ced8c0e09d136c5de430268fd2270878b6f297ee", remote = "https://github.com/eclipse-score/communication.git", ) -bazel_dep(name = "score_baselibs", version = "0.2.2") +bazel_dep(name = "score_baselibs", version = "0.2.4") git_override( module_name = "score_baselibs", - commit = "6951cd2a3045fb7a053acf203833a17704c22ccf", + commit = "052c2f271be4239f97182b164f4903b8c88d6c72", remote = "https://github.com/eclipse-score/baselibs.git", ) @@ -60,11 +66,11 @@ git_override( remote = "https://github.com/bmw-software-engineering/trlc.git", ) -bazel_dep(name = "score_logging", version = "0.0.5") - -single_version_override( - module_name = "score_docs_as_code", - version = "3.0.0", +bazel_dep(name = "score_logging", version = "0.1.2") +git_override( + module_name = "score_logging", + commit = "2833f5d6f28cbd8be1f3037dfa1217aa1ec94848", + remote = "https://github.com/eclipse-score/logging.git", ) bazel_dep(name = "rules_boost", repo_name = "com_github_nelhage_rules_boost") @@ -92,7 +98,7 @@ bazel_dep(name = "rules_proto", version = "7.1.0") bazel_dep(name = "aspect_rules_lint", version = "2.0.0") bazel_dep(name = "buildifier_prebuilt", version = "8.2.0.2") bazel_dep(name = "googletest", version = "1.17.0") -bazel_dep(name = "score_docs_as_code", version = "3.0.0") +bazel_dep(name = "score_docs_as_code", version = "3.0.1") # Toolchains bazel_dep(name = "rules_cc", version = "0.2.16") diff --git a/examples/rust/cycle-benchmark/BUILD.bazel b/examples/rust/cycle-benchmark/BUILD.bazel index 5e1bae2..da89933 100644 --- a/examples/rust/cycle-benchmark/BUILD.bazel +++ b/examples/rust/cycle-benchmark/BUILD.bazel @@ -29,7 +29,7 @@ rust_library( ], visibility = ["//visibility:public"], deps = [ - "//src/feo:libfeo_recording_rust", + "//src/feo:libfeo_rust", "//src/feo-com:libfeo_com_rust", "//src/feo-time:libfeo_time_rust", "//src/feo-tracing:libfeo_tracing_rust", @@ -54,7 +54,7 @@ rust_binary( visibility = ["//visibility:public"], deps = [ ":libcycle_benchmark_rust", - "//src/feo:libfeo_recording_rust", + "//src/feo:libfeo_rust", "//src/feo-time:libfeo_time_rust", "//src/feo-tracing:libfeo_tracing_rust", "@score_baselibs_rust//src/log/score_log", diff --git a/examples/rust/cycle-benchmark/src/bin/cycle_bench.rs b/examples/rust/cycle-benchmark/src/bin/cycle_bench.rs index 843f2e6..52431e4 100644 --- a/examples/rust/cycle-benchmark/src/bin/cycle_bench.rs +++ b/examples/rust/cycle-benchmark/src/bin/cycle_bench.rs @@ -13,8 +13,6 @@ use cycle_benchmark::config::{ApplicationConfig, SignallingType}; use feo::ids::AgentId; -use feo::recording::recorder::RecordingRules; -use feo::recording::registry::TypeRegistry; use feo_time::Duration; const DEFAULT_FEO_CYCLE_TIME: Duration = Duration::from_millis(5); @@ -31,8 +29,6 @@ fn main() { run_as_primary(params, app_config); } else if app_config.secondaries().contains(¶ms.agent_id) { run_as_secondary(params, app_config); - } else if app_config.recorders().contains(¶ms.agent_id) { - run_as_recorder(params, app_config); } else { eprintln!( "ERROR: Agent or recorder id {} not defined in system configuration", @@ -96,34 +92,6 @@ fn run_as_secondary(params: Params, app_config: ApplicationConfig) { } } -fn run_as_recorder(params: Params, app_config: ApplicationConfig) { - let signalling = app_config.signalling(); - println!( - "Starting recorder {} using signalling {:?}", - params.agent_id, signalling - ); - - // the benchmarking application does not exchange data, - // so have an empty type registry and an empty set of recording rules - let registry = TypeRegistry::default(); - let rules: RecordingRules = Default::default(); - - match signalling { - SignallingType::DirectMpsc => { - let config = direct_mpsc::make_recorder_config(params, app_config, ®istry, rules); - direct_mpsc::Recorder::new(config).run(); - }, - signalling @ SignallingType::DirectTcp | signalling @ SignallingType::DirectUnix => { - let config = direct_sockets::make_recorder_config(params, app_config, ®istry, rules, signalling); - direct_sockets::Recorder::new(config).run(); - }, - signalling @ SignallingType::RelayedTcp | signalling @ SignallingType::RelayedUnix => { - let config = relayed_sockets::make_recorder_config(params, app_config, ®istry, rules, signalling); - relayed_sockets::Recorder::new(config).run(); - }, - } -} - /// Parameters of the primary struct Params { /// Agent ID @@ -162,11 +130,8 @@ impl Params { mod direct_mpsc { use super::{Duration, Params}; use cycle_benchmark::config::ApplicationConfig; - use feo::recording::recorder::RecordingRules; - use feo::recording::registry::TypeRegistry; pub(super) use feo::agent::direct::primary_mpsc::{Primary, PrimaryConfig}; - pub(super) use feo::agent::direct::recorder::{Recorder, RecorderConfig}; pub(super) use feo::agent::direct::secondary::{Secondary, SecondaryConfig}; pub(super) fn make_primary_config(params: Params, app_config: ApplicationConfig) -> PrimaryConfig { @@ -195,26 +160,14 @@ mod direct_mpsc { pub(super) fn make_secondary_config(_: Params, _: ApplicationConfig) -> SecondaryConfig { panic!("direct mpsc signalling does not support secondary agents"); } - - pub(super) fn make_recorder_config( - _: Params, - _: ApplicationConfig, - _: &TypeRegistry, - _: RecordingRules, - ) -> RecorderConfig<'_> { - panic!("direct mpsc signalling does not support recorders"); - } } mod direct_sockets { use super::{Duration, Params}; use cycle_benchmark::config::{ApplicationConfig, SignallingType}; use feo::agent::NodeAddress; - use feo::recording::recorder::RecordingRules; - use feo::recording::registry::TypeRegistry; pub(super) use feo::agent::direct::primary::{Primary, PrimaryConfig}; - pub(super) use feo::agent::direct::recorder::{Recorder, RecorderConfig}; pub(super) use feo::agent::direct::secondary::{Secondary, SecondaryConfig}; fn endpoint(app_config: &ApplicationConfig, signalling: SignallingType) -> NodeAddress { @@ -261,35 +214,14 @@ mod direct_sockets { endpoint: endpoint(&app_config, signalling), } } - - pub(super) fn make_recorder_config( - params: Params, - app_config: ApplicationConfig, - type_registry: &TypeRegistry, - recording_rules: RecordingRules, - signalling: SignallingType, - ) -> RecorderConfig<'_> { - let agent_id = params.agent_id; - RecorderConfig { - id: agent_id, - record_file: "./rec.bin", - rules: recording_rules, - registry: type_registry, - receive_timeout: Duration::from_secs(10), - endpoint: endpoint(&app_config, signalling), - } - } } mod relayed_sockets { use super::{Duration, Params}; use cycle_benchmark::config::{ApplicationConfig, SignallingType}; use feo::agent::NodeAddress; - use feo::recording::recorder::RecordingRules; - use feo::recording::registry::TypeRegistry; pub(super) use feo::agent::relayed::primary::{Primary, PrimaryConfig}; - pub(super) use feo::agent::relayed::recorder::{Recorder, RecorderConfig}; pub(super) use feo::agent::relayed::secondary::{Secondary, SecondaryConfig}; fn endpoints(app_config: &ApplicationConfig, signalling: SignallingType) -> (NodeAddress, NodeAddress) { @@ -346,24 +278,4 @@ mod relayed_sockets { bind_address_receivers: endpoints.1, } } - - pub(super) fn make_recorder_config( - params: Params, - app_config: ApplicationConfig, - type_registry: &TypeRegistry, - recording_rules: RecordingRules, - signalling: SignallingType, - ) -> RecorderConfig<'_> { - let agent_id = params.agent_id; - let endpoints = endpoints(&app_config, signalling); - RecorderConfig { - id: agent_id, - record_file: "./rec.bin", - rules: recording_rules, - registry: type_registry, - receive_timeout: Duration::from_secs(10), - bind_address_senders: endpoints.0, - bind_address_receivers: endpoints.1, - } - } } diff --git a/examples/rust/mini-adas/BUILD.bazel b/examples/rust/mini-adas/BUILD.bazel index 4758eda..dbd911a 100644 --- a/examples/rust/mini-adas/BUILD.bazel +++ b/examples/rust/mini-adas/BUILD.bazel @@ -17,7 +17,9 @@ rust_library( name = "libmini_adas_rust", srcs = [ "src/activities/components.rs", + "src/activities/input.rs", "src/activities/mod.rs", + "src/activities/output.rs", "src/config.rs", "src/lib.rs", ], @@ -26,52 +28,42 @@ rust_library( "signalling_relayed_tcp", ], crate_name = "mini_adas", - proc_macro_deps = [ - "//src/feo-cpp-macros:feo_cpp_macros_rust", - ], visibility = ["//visibility:public"], deps = [ "//examples/rust/mini-adas/mini-adas-gen:mini_adas_gen_rs", "//src/feo:libfeo_rust", "//src/feo-com:libfeo_com_rust", - "//src/feo-cpp-build:libfeo_cpp_build_rust", "//src/feo-time:libfeo_time_rust", "//src/feo-tracing:libfeo_tracing_rust", "@score_baselibs_rust//src/log/score_log", - "@score_communication//score/mw/com/impl/rust/com-api/com-api", "@score_crates//:tracing", ], ) rust_library( - name = "libmini_adas_recording_rust", + name = "libmini_adas_rust_mw_com", srcs = [ "src/activities/components.rs", + "src/activities/input.rs", "src/activities/mod.rs", + "src/activities/output.rs", "src/config.rs", "src/lib.rs", ], crate_features = [ - "com_linux_shm", - "recording", - "signalling_direct_tcp", + "com_mw", + "signalling_relayed_tcp", ], crate_name = "mini_adas", - proc_macro_deps = [ - "//src/feo-cpp-macros:feo_cpp_macros_rust", - ], visibility = ["//visibility:public"], deps = [ - "//examples/rust/mini-adas/mini-adas-gen:mini_adas_gen_rs", - "//src/feo:libfeo_recording_rust", - "//src/feo-com:libfeo_com_rust", - "//src/feo-cpp-build:libfeo_cpp_build_rust", + "//examples/rust/mini-adas/mini-adas-gen:mini_adas_gen_rs_mw_com", + "//src/feo:libfeo_rust", + "//src/feo-com:libfeo_com_rust_mw_com", "//src/feo-time:libfeo_time_rust", "//src/feo-tracing:libfeo_tracing_rust", "@score_baselibs_rust//src/log/score_log", "@score_communication//score/mw/com/impl/rust/com-api/com-api", - "@score_crates//:postcard", - "@score_crates//:serde", "@score_crates//:tracing", ], ) @@ -82,6 +74,27 @@ rust_binary( "src/bin/adas_primary.rs", ], crate_features = ["signalling_relayed_tcp"], + visibility = ["//visibility:public"], + deps = [ + ":libmini_adas_rust", + "//src/feo:libfeo_rust", + "//src/feo-com:libfeo_com_rust", + "//src/feo-time:libfeo_time_rust", + "//src/feo-tracing:libfeo_tracing_rust", + "@score_baselibs_rust//src/log/score_log", + "@score_baselibs_rust//src/log/stdout_logger", + ], +) + +rust_binary( + name = "adas_primary_mw_com", + srcs = [ + "src/bin/adas_primary.rs", + ], + crate_features = [ + "signalling_relayed_tcp", + "com_mw", + ], data = [ "etc/logging.json", "etc/mw_com_config.json", @@ -96,8 +109,9 @@ rust_binary( ], visibility = ["//visibility:public"], deps = [ - ":libmini_adas_rust", + ":libmini_adas_rust_mw_com", "//src/feo:libfeo_rust", + "//src/feo-com:libfeo_com_rust_mw_com", "//src/feo-time:libfeo_time_rust", "//src/feo-tracing:libfeo_tracing_rust", "@score_baselibs_rust//src/log/score_log", @@ -112,6 +126,27 @@ rust_binary( "src/bin/adas_secondary.rs", ], crate_features = ["signalling_relayed_tcp"], + visibility = ["//visibility:public"], + deps = [ + ":libmini_adas_rust", + "//src/feo:libfeo_rust", + "//src/feo-com:libfeo_com_rust", + "//src/feo-time:libfeo_time_rust", + "//src/feo-tracing:libfeo_tracing_rust", + "@score_baselibs_rust//src/log/score_log", + "@score_baselibs_rust//src/log/stdout_logger", + ], +) + +rust_binary( + name = "adas_secondary_mw_com", + srcs = [ + "src/bin/adas_secondary.rs", + ], + crate_features = [ + "signalling_relayed_tcp", + "com_mw", + ], data = [ "etc/logging.json", "etc/mw_com_config.json", @@ -126,8 +161,9 @@ rust_binary( ], visibility = ["//visibility:public"], deps = [ - ":libmini_adas_rust", + ":libmini_adas_rust_mw_com", "//src/feo:libfeo_rust", + "//src/feo-com:libfeo_com_rust_mw_com", "//src/feo-time:libfeo_time_rust", "//src/feo-tracing:libfeo_tracing_rust", "@score_baselibs_rust//src/log/score_log", diff --git a/examples/rust/mini-adas/README.md b/examples/rust/mini-adas/README.md index 6352e8f..9358617 100644 --- a/examples/rust/mini-adas/README.md +++ b/examples/rust/mini-adas/README.md @@ -9,69 +9,34 @@ You need to run the following commands in separate terminals. ```sh # Use 400ms cycle time -bazel run //examples/rust/mini-adas:adas_primary -- 400 +bazelisk run //examples/rust/mini-adas:adas_primary -- 400 ``` ```sh -bazel run //examples/rust/mini-adas:adas_secondary -- 1 +bazelisk run //examples/rust/mini-adas:adas_secondary -- 1 ``` ```sh -bazel run //examples/rust/mini-adas:adas_secondary -- 2 +bazelisk run //examples/rust/mini-adas:adas_secondary -- 2 ``` -If you want to include recording, -you need to pass the recorder's agent ID to the primary -and start the recorder. - -```sh -# Use 400ms cycle time -# Wait for recorder with ID 900 -bazel run //examples/rust/mini-adas:adas_primary -- 400 900 -``` - -```sh -bazel run //examples/rust/mini-adas:adas_secondary -- 1 -``` - -```sh -bazel run //examples/rust/mini-adas:adas_secondary -- 2 -``` - -```sh -# Start recorder with ID 900 -bazel run //examples/rust/mini-adas:adas_recorder -- 900 -``` - -You may also use more than one recorder by specifying multiple recorder agent ids as a dot-separated -list to the primary and then start all the corresponding recorders. +## Using middleware COM for data exchange +In order to use mw com instead of feo-com for data exchange use the following bazel targets instead: ```sh # Use 400ms cycle time -# Wait for two recorders with IDs 900 and 901 -bazel run //examples/rust/mini-adas:adas_primary -- 400 900.901 -``` - -```sh -bazel run //examples/rust/mini-adas:adas_secondary -- 1 +bazelisk run //examples/rust/mini-adas:adas_primary_mw_com -- 400 ``` ```sh -bazel run //examples/rust/mini-adas:adas_secondary -- 2 +bazelisk run //examples/rust/mini-adas:adas_secondary_mw_com -- 1 ``` ```sh -# Start recorder with ID 900 -bazel run //examples/rust/mini-adas:adas_recorder -- 900 +bazelisk run //examples/rust/mini-adas:adas_secondary_mw_com -- 2 ``` -```sh -# Start recorder with ID 901 -bazel run //examples/rust/mini-adas:adas_recorder -- 901 -``` - - ## Different signalling layer The easiest way to switch the signalling layer is by changing the crate_features in the `BUILD.bazel`, diff --git a/examples/rust/mini-adas/mini-adas-gen/BUILD.bazel b/examples/rust/mini-adas/mini-adas-gen/BUILD.bazel index 1715a9f..7ef9f67 100644 --- a/examples/rust/mini-adas/mini-adas-gen/BUILD.bazel +++ b/examples/rust/mini-adas/mini-adas-gen/BUILD.bazel @@ -14,8 +14,12 @@ load("@rules_rust//rust:defs.bzl", "rust_library") load("@score_baselibs//score/language/safecpp:toolchain_features.bzl", "COMPILER_WARNING_FEATURES") rust_library( - name = "mini_adas_gen_rs", - srcs = ["mini_adas_gen.rs"], + name = "mini_adas_gen_rs_mw_com", + srcs = [ + "lib.rs", + "mini_adas_gen_mw_com.rs", + ], + crate_features = ["mw_com"], crate_name = "mini_adas_gen", features = ["link_std_cpp_lib"], visibility = [ @@ -29,6 +33,19 @@ rust_library( ], ) +rust_library( + name = "mini_adas_gen_rs", + srcs = ["lib.rs"], + crate_name = "mini_adas_gen", + visibility = [ + "//examples/rust/mini-adas:__subpackages__", + ], + deps = [ + "@score_baselibs_rust//src/log/score_log", + "@score_crates//:libc", + ], +) + cc_library( name = "mini_adas_gen_cpp", srcs = ["mini_adas_gen.cpp"], diff --git a/examples/rust/mini-adas/mini-adas-gen/lib.rs b/examples/rust/mini-adas/mini-adas-gen/lib.rs new file mode 100644 index 0000000..c5ba0ca --- /dev/null +++ b/examples/rust/mini-adas/mini-adas-gen/lib.rs @@ -0,0 +1,149 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ +#[cfg(feature = "mw_com")] +mod mini_adas_gen_mw_com; + +#[cfg(feature = "mw_com")] +pub use mini_adas_gen_mw_com::*; + +#[cfg(feature = "mw_com")] +use com_api::{PlacementDefault, Reloc}; + +use score_log::ScoreDebug; + +/// Camera image +/// +/// A neural network could detect the number of people, +/// number of cars and the distance to the closest obstacle. +/// Given that we do not have a real neural network, +/// we already include information to be dummy inferred. +#[derive(Debug, Default, ScoreDebug)] +#[repr(C)] +#[cfg_attr(feature = "mw_com", derive(Reloc))] +pub struct CameraImage { + pub num_people: libc::size_t, + pub num_cars: libc::size_t, + pub distance_obstacle: f64, +} + +#[cfg(feature = "mw_com")] +// SAFETY: only writes via field access +unsafe impl PlacementDefault for CameraImage { + #[allow(clippy::not_unsafe_ptr_arg_deref)] // part of MW COM API + fn placement_default(s: *mut Self) { + unsafe { + (*s).num_people = 0; + (*s).num_cars = 0; + (*s).distance_obstacle = 0.; + } + } +} + +/// Radar scan +/// +/// With post-processing, we could detect the closest object +/// from a real radar scan. In this example, +/// the message type already carries the information to be dummy extracted. +#[derive(Debug, Default, ScoreDebug)] +#[repr(C)] +#[cfg_attr(feature = "mw_com", derive(Reloc))] +pub struct RadarScan { + pub distance_obstacle: f64, + pub error_margin: f64, +} + +#[cfg(feature = "mw_com")] +// SAFETY: only writes via field access +unsafe impl PlacementDefault for RadarScan { + #[allow(clippy::not_unsafe_ptr_arg_deref)] // part of MW COM API + fn placement_default(s: *mut Self) { + unsafe { + (*s).distance_obstacle = 0.; + (*s).error_margin = 0.; + } + } +} + +/// Scene +/// +/// The scene is the result of fusing the camera image and the radar scan +/// with a neural network. In our example, we just extract the information. +#[derive(Debug, Default, ScoreDebug)] +#[repr(C)] +#[cfg_attr(feature = "mw_com", derive(Reloc))] +pub struct Scene { + pub num_people: usize, + pub num_cars: usize, + pub distance_obstacle: f64, + pub distance_left_lane: f64, + pub distance_right_lane: f64, +} + +#[cfg(feature = "mw_com")] +// SAFETY: only writes via field access +unsafe impl PlacementDefault for Scene { + #[allow(clippy::not_unsafe_ptr_arg_deref)] // part of MW COM API + fn placement_default(s: *mut Self) { + unsafe { + (*s).num_people = 0; + (*s).num_cars = 0; + (*s).distance_obstacle = 0.; + (*s).distance_left_lane = 0.; + (*s).distance_right_lane = 0.; + } + } +} + +/// Brake instruction +/// +/// This is an instruction whether to engage the brakes and at which level. +#[derive(Debug, Default, ScoreDebug)] +#[repr(C)] +#[cfg_attr(feature = "mw_com", derive(Reloc))] +pub struct BrakeInstruction { + pub active: bool, + pub level: f64, +} + +#[cfg(feature = "mw_com")] +// SAFETY: only writes via field access +unsafe impl PlacementDefault for BrakeInstruction { + #[allow(clippy::not_unsafe_ptr_arg_deref)] // part of MW COM API + fn placement_default(s: *mut Self) { + unsafe { + (*s).active = false; + (*s).level = 0.; + } + } +} + +/// Steering +/// +/// This carries the angle of steering. +#[derive(Debug, Default, ScoreDebug)] +#[repr(C)] +#[cfg_attr(feature = "mw_com", derive(Reloc))] +pub struct Steering { + pub angle: f64, +} + +#[cfg(feature = "mw_com")] +// SAFETY: only writes via field access +unsafe impl PlacementDefault for Steering { + #[allow(clippy::not_unsafe_ptr_arg_deref)] // part of MW COM API + fn placement_default(s: *mut Self) { + unsafe { + (*s).angle = 0.; + } + } +} diff --git a/examples/rust/mini-adas/mini-adas-gen/mini_adas_gen.rs b/examples/rust/mini-adas/mini-adas-gen/mini_adas_gen_mw_com.rs similarity index 87% rename from examples/rust/mini-adas/mini-adas-gen/mini_adas_gen.rs rename to examples/rust/mini-adas/mini-adas-gen/mini_adas_gen_mw_com.rs index a8cd474..86a72c1 100644 --- a/examples/rust/mini-adas/mini-adas-gen/mini_adas_gen.rs +++ b/examples/rust/mini-adas/mini-adas-gen/mini_adas_gen_mw_com.rs @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2025 Contributors to the Eclipse Foundation + * Copyright (c) 2026 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -11,24 +11,9 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use com_api::{ - CommData, Consumer, Interface, OfferedProducer, Producer, ProviderInfo, Publisher, Reloc, Runtime, Subscriber, -}; -use score_log::ScoreDebug; - -/// Camera image -/// -/// A neural network could detect the number of people, -/// number of cars and the distance to the closest obstacle. -/// Given that we do not have a real neural network, -/// we already include information to be dummy inferred. -#[derive(Debug, Default, Reloc, ScoreDebug)] -#[repr(C)] -pub struct CameraImage { - pub num_people: libc::size_t, - pub num_cars: libc::size_t, - pub distance_obstacle: f64, -} +use crate::*; + +use com_api::{CommData, Consumer, Interface, OfferedProducer, Producer, ProviderInfo, Publisher, Runtime, Subscriber}; impl CommData for CameraImage { const ID: &'static str = "CameraImage"; @@ -102,18 +87,6 @@ impl OfferedProducer for CameraOfferedProducer { } } -/// Radar scan -/// -/// With post-processing, we could detect the closest object -/// from a real radar scan. In this example, -/// the message type already carries the information to be dummy extracted. -#[derive(Debug, Default, Reloc, ScoreDebug)] -#[repr(C)] -pub struct RadarScan { - pub distance_obstacle: f64, - pub error_margin: f64, -} - impl CommData for RadarScan { const ID: &'static str = "RadarScan"; } @@ -186,20 +159,6 @@ impl OfferedProducer for RadarOfferedProducer { } } -/// Scene -/// -/// The scene is the result of fusing the camera image and the radar scan -/// with a neural network. In our example, we just extract the information. -#[derive(Debug, Default, Reloc, ScoreDebug)] -#[repr(C)] -pub struct Scene { - pub num_people: usize, - pub num_cars: usize, - pub distance_obstacle: f64, - pub distance_left_lane: f64, - pub distance_right_lane: f64, -} - impl CommData for Scene { const ID: &'static str = "Scene"; } @@ -272,16 +231,6 @@ impl OfferedProducer for NeuralNetOfferedProducer { } } -/// Brake instruction -/// -/// This is an instruction whether to engage the brakes and at which level. -#[derive(Debug, Default, Reloc, ScoreDebug)] -#[repr(C)] -pub struct BrakeInstruction { - pub active: bool, - pub level: f64, -} - impl CommData for BrakeInstruction { const ID: &'static str = "BrakeInstruction"; } @@ -356,15 +305,6 @@ impl OfferedProducer for BrakeControllerOfferedProducer< } } -/// Steering -/// -/// This carries the angle of steering. -#[derive(Debug, Default, Reloc, ScoreDebug)] -#[repr(C)] -pub struct Steering { - pub angle: f64, -} - impl CommData for Steering { const ID: &'static str = "Steering"; } diff --git a/examples/rust/mini-adas/src/activities/components.rs b/examples/rust/mini-adas/src/activities/components.rs index 394dcbe..6c18242 100644 --- a/examples/rust/mini-adas/src/activities/components.rs +++ b/examples/rust/mini-adas/src/activities/components.rs @@ -11,57 +11,36 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use crate::config::mw_com_runtime; -use com_api::{ - Builder, CommData, FindServiceSpecifier, InstanceSpecifier, Interface, LolaRuntimeImpl, Producer, Publisher, - Runtime, SampleContainer, SampleMaybeUninit, SampleMut, ServiceDiscovery, Subscriber, Subscription, -}; +use crate::activities::input::input; +use crate::activities::output::output; +#[cfg(feature = "com_mw")] +use com_api::{LolaRuntimeImpl, Runtime, SampleContainer, Subscriber}; use core::hash::{BuildHasher as _, Hasher as _}; use core::mem::MaybeUninit; -use core::ops::{Deref, DerefMut, Range}; +use core::ops::DerefMut; +use core::ops::{Deref, Range}; use core::time::Duration; use feo::activity::Activity; use feo::error::ActivityError; use feo::ids::ActivityId; +use feo_com::interface::{ActivityInput, ActivityOutput}; use feo_tracing::instrument; +#[cfg(feature = "com_mw")] use mini_adas_gen::{ - BrakeControllerConsumer, BrakeControllerInterface, CameraConsumer, CameraInterface, NeuralNetConsumer, - NeuralNetInterface, RadarConsumer, RadarInterface, SteeringControllerConsumer, SteeringControllerInterface, + BrakeControllerConsumer, BrakeControllerInterface, BrakeControllerOfferedProducer, CameraConsumer, CameraInterface, + CameraOfferedProducer, NeuralNetConsumer, NeuralNetInterface, NeuralNetOfferedProducer, RadarConsumer, + RadarInterface, RadarOfferedProducer, SteeringControllerConsumer, SteeringControllerInterface, + SteeringControllerOfferedProducer, }; use mini_adas_gen::{BrakeInstruction, CameraImage, RadarScan, Scene, Steering}; use score_log::debug; use std::hash::RandomState; use std::thread; -use std::thread::sleep; const SLEEP_RANGE: Range = 10..45; -pub struct DebugWrapper(pub T); - -impl core::fmt::Debug for DebugWrapper { - fn fmt(&self, _f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - Ok(()) - } -} - -impl core::ops::Deref for DebugWrapper { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for DebugWrapper { - fn deref_mut(&mut self) -> &mut ::Target { - &mut self.0 - } -} - -type MwComPublisher = ::Publisher; -type MwComSubscription = - <::Subscriber as Subscriber>::Subscription; -type MwComSample<'a, T> = as Subscription>::Sample<'a>; +#[cfg(feature = "com_mw")] +type MwComSubscriber = ::Subscriber; /// Camera activity /// @@ -71,8 +50,7 @@ pub struct Camera { /// ID of the activity activity_id: ActivityId, /// Image output - output_image: DebugWrapper>, - + output_image: Box>, // Local state for pseudo-random output generation num_people: usize, num_cars: usize, @@ -81,9 +59,10 @@ pub struct Camera { impl Camera { pub fn build(activity_id: ActivityId, image_topic: &str) -> Box { + let output_image = output!(CameraInterface, image_topic, |i: CameraOfferedProducer<_>| i.image); Box::new(Self { activity_id, - output_image: DebugWrapper(create_producer::(image_topic).image), + output_image, num_people: 4, num_cars: 10, distance_obstacle: 40.0, @@ -125,7 +104,12 @@ impl Activity for Camera { let image = self.get_image(); debug!("Sending image: {:?}", image); - self.output_image.send(image).unwrap(); + self.output_image + .write_uninit() + .unwrap() + .write_payload(image) + .send() + .unwrap(); Ok(()) } @@ -144,7 +128,7 @@ pub struct Radar { /// ID of the activity activity_id: ActivityId, /// Radar scan output - output_scan: DebugWrapper<::Publisher>, + output_scan: Box>, // Local state for pseudo-random output generation distance_obstacle: f64, @@ -152,9 +136,10 @@ pub struct Radar { impl Radar { pub fn build(activity_id: ActivityId, radar_topic: &str) -> Box { + let output_scan = output!(RadarInterface, radar_topic, |r: RadarOfferedProducer<_>| r.scan); Box::new(Self { activity_id, - output_scan: DebugWrapper(create_producer::(radar_topic).scan), + output_scan, distance_obstacle: 40.0, }) } @@ -191,7 +176,12 @@ impl Activity for Radar { let scan = self.get_scan(); debug!("Sending scan: {:?}", scan); - self.output_scan.send(scan).unwrap(); + self.output_scan + .write_uninit() + .unwrap() + .write_payload(scan) + .send() + .unwrap(); Ok(()) } @@ -202,48 +192,6 @@ impl Activity for Radar { } } -struct MwComInput { - count: usize, - /// Subscription - subscription: MwComSubscription, - /// Sample container - sample_container: SampleContainer>, -} - -impl core::fmt::Debug for MwComInput { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - write!(f, "MwComInput") - } -} - -impl MwComInput { - fn new = C>>( - topic: &str, - f: impl FnOnce(C) -> ::Subscriber, - ) -> Self { - Self { - count: 0, - subscription: f(create_consumer::(topic)).subscribe(1).unwrap(), - sample_container: SampleContainer::new(1), - } - } - - fn read(&mut self) -> MwComSample<'_, T> { - debug!("Starting to read {}", self.count); - self.count += 1; - assert_eq!( - 1, - self.subscription - .try_receive(&mut self.sample_container, 1,) - .expect("receive failed"), - "mw com read failed" - ); - let result = self.sample_container.pop_front().expect("pop_front failed"); - debug!("Reading done"); - result - } -} - /// Neural network activity /// /// This component emulates a neural network @@ -254,20 +202,29 @@ pub struct NeuralNet { /// ID of the activity activity_id: ActivityId, /// Image input - input_image: MwComInput, + input_image: Box>, /// Radar scan input - input_scan: MwComInput, + input_scan: Box>, /// Scene output - output_scene: DebugWrapper>, + output_scene: Box>, } impl NeuralNet { pub fn build(activity_id: ActivityId, image_topic: &str, scan_topic: &str, scene_topic: &str) -> Box { - let output_scene = DebugWrapper(create_producer::(scene_topic).scene); + let output_scene = output!(NeuralNetInterface, scene_topic, |s: NeuralNetOfferedProducer<_>| s + .scene); Box::new(Self { activity_id, - input_image: MwComInput::new::<_, CameraInterface>(image_topic, |i: CameraConsumer<_>| i.image), - input_scan: MwComInput::new::<_, RadarInterface>(scan_topic, |r: RadarConsumer<_>| r.scan), + input_image: input!( + CameraInterface, + image_topic, + |i: CameraConsumer<_>| -> MwComSubscriber { i.image } + ), + input_scan: input!( + RadarInterface, + scan_topic, + |r: RadarConsumer<_>| -> MwComSubscriber { r.scan } + ), output_scene, }) } @@ -313,13 +270,13 @@ impl Activity for NeuralNet { debug!("Stepping NeuralNet"); sleep_random(); - let camera = self.input_image.read(); - let radar = self.input_scan.read(); + let camera = self.input_image.read().unwrap(); + let radar = self.input_scan.read().unwrap(); debug!("Inferring scene with neural network"); - let mut scene = self.output_scene.allocate().unwrap(); - Self::infer(&camera, &radar, scene.as_mut()); + let mut scene = self.output_scene.write_uninit().unwrap(); + Self::infer(&camera, &radar, scene.deref_mut()); // Safety: `Scene` has `repr(C)` and was fully initialized by `Self::infer` above. let scene = unsafe { scene.assume_init() }; debug!("Sending Scene {:?}", scene.deref()); @@ -345,18 +302,25 @@ pub struct EmergencyBraking { /// ID of the activity activity_id: ActivityId, /// Scene input - input_scene: MwComInput, + input_scene: Box>, /// Brake instruction output - output_brake_instruction: DebugWrapper<::Publisher>, + output_brake_instruction: Box>, } impl EmergencyBraking { pub fn build(activity_id: ActivityId, scene_topic: &str, brake_instruction_topic: &str) -> Box { - let output_brake_instruction = - DebugWrapper(create_producer::(brake_instruction_topic).brake_instruction); + let output_brake_instruction = output!( + BrakeControllerInterface, + brake_instruction_topic, + |b: BrakeControllerOfferedProducer<_>| b.brake_instruction + ); Box::new(Self { activity_id, - input_scene: MwComInput::new::<_, NeuralNetInterface>(scene_topic, |n: NeuralNetConsumer<_>| n.scene), + input_scene: input!( + NeuralNetInterface, + scene_topic, + |n: NeuralNetConsumer<_>| -> MwComSubscriber { n.scene } + ), output_brake_instruction, }) } @@ -377,8 +341,8 @@ impl Activity for EmergencyBraking { debug!("Stepping EmergencyBraking"); sleep_random(); - let scene = self.input_scene.read(); - let brake_instruction = self.output_brake_instruction.allocate().unwrap(); + let scene = self.input_scene.read().unwrap(); + let brake_instruction = self.output_brake_instruction.write_uninit().unwrap(); const ENGAGE_DISTANCE: f64 = 30.0; const MAX_BRAKE_DISTANCE: f64 = 15.0; @@ -390,10 +354,10 @@ impl Activity for EmergencyBraking { (ENGAGE_DISTANCE - scene.distance_obstacle) / (ENGAGE_DISTANCE - MAX_BRAKE_DISTANCE), ); - let brake_instruction = brake_instruction.write(BrakeInstruction { active: true, level }); + let brake_instruction = brake_instruction.write_payload(BrakeInstruction { active: true, level }); brake_instruction.send().unwrap(); } else { - let brake_instruction = brake_instruction.write(BrakeInstruction { + let brake_instruction = brake_instruction.write_payload(BrakeInstruction { active: false, level: 0.0, }); @@ -420,16 +384,17 @@ pub struct BrakeController { /// ID of the activity activity_id: ActivityId, /// Brake instruction input - input_brake_instruction: MwComInput, + input_brake_instruction: Box>, } impl BrakeController { pub fn build(activity_id: ActivityId, brake_instruction_topic: &str) -> Box { Box::new(Self { activity_id, - input_brake_instruction: MwComInput::new::<_, BrakeControllerInterface>( + input_brake_instruction: input!( + BrakeControllerInterface, brake_instruction_topic, - |b: BrakeControllerConsumer<_>| b.brake_instruction, + |b: BrakeControllerConsumer<_>| -> MwComSubscriber { b.brake_instruction } ), }) } @@ -450,7 +415,7 @@ impl Activity for BrakeController { debug!("Stepping BrakeController"); sleep_random(); - let brake_instruction = self.input_brake_instruction.read(); + let brake_instruction = self.input_brake_instruction.read().unwrap(); if brake_instruction.active { debug!( "BrakeController activating brakes with level {:.3}", @@ -477,14 +442,18 @@ pub struct EnvironmentRenderer { /// ID of the activity activity_id: ActivityId, /// Scene input - input_scene: MwComInput, + input_scene: Box>, } impl EnvironmentRenderer { pub fn build(activity_id: ActivityId, scene_topic: &str) -> Box { Box::new(Self { activity_id, - input_scene: MwComInput::new::<_, NeuralNetInterface>(scene_topic, |n: NeuralNetConsumer<_>| n.scene), + input_scene: input!( + NeuralNetInterface, + scene_topic, + |n: NeuralNetConsumer<_>| -> MwComSubscriber { n.scene } + ), }) } } @@ -527,16 +496,17 @@ pub struct SteeringController { /// ID of the activity activity_id: ActivityId, /// Steering input - input_steering: MwComInput, + input_steering: Box>, } impl SteeringController { pub fn build(activity_id: ActivityId, steering_topic: &str) -> Box { Box::new(Self { activity_id, - input_steering: MwComInput::new::<_, SteeringControllerInterface>( + input_steering: input!( + SteeringControllerInterface, steering_topic, - |s: SteeringControllerConsumer<_>| s.steering, + |s: SteeringControllerConsumer<_>| -> MwComSubscriber { s.steering } ), }) } @@ -557,7 +527,7 @@ impl Activity for SteeringController { debug!("Stepping SteeringController"); sleep_random(); - let steering = self.input_steering.read(); + let steering = self.input_steering.read().unwrap(); debug!("SteeringController adjusting angle to {:.3}", steering.angle); Ok(()) } @@ -576,14 +546,19 @@ pub struct LaneAssist { /// ID of the activity activity_id: ActivityId, /// Brake instruction output - steering_controller: DebugWrapper<::Publisher>, + steering_controller: Box>, } impl LaneAssist { pub fn build(activity_id: ActivityId, steering_topic: &str) -> Box { + let steering_controller = output!( + SteeringControllerInterface, + steering_topic, + |s: SteeringControllerOfferedProducer<_>| s.steering + ); Box::new(Self { activity_id, - steering_controller: DebugWrapper(create_producer::(steering_topic).steering), + steering_controller, }) } } @@ -602,7 +577,12 @@ impl Activity for LaneAssist { fn step(&mut self) -> Result<(), ActivityError> { debug!("Stepping LaneAssist"); sleep_random(); - self.steering_controller.send(Steering { angle: 2.34 }).unwrap(); + self.steering_controller + .write_uninit() + .unwrap() + .write_payload(Steering { angle: 2.34 }) + .send() + .unwrap(); Ok(()) } @@ -688,40 +668,3 @@ fn random_walk_integer(previous: usize, change_prop: f64, max_delta: usize) -> u fn sleep_random() { thread::sleep(Duration::from_millis(gen_random_in_range(SLEEP_RANGE) as u64)); } - -fn create_producer( - topic: &str, -) -> <::Producer as Producer>::OfferedProducer { - debug!("Creating MW COM Producer for {}...", topic); - let service_id = InstanceSpecifier::new(topic).expect("Failed to create InstanceSpecifier"); - let producer_builder = mw_com_runtime().producer_builder::(service_id); - let producer = producer_builder.build().unwrap(); - producer.offer().expect("can't offer a producer") -} - -fn create_consumer(topic: &str) -> ::Consumer { - debug!("Creating MW COM Consumer for {}...", topic); - let mut tries = 0; - loop { - let service_id = InstanceSpecifier::new(topic).expect("Failed to create InstanceSpecifier"); - let consumer_discovery = mw_com_runtime().find_service::(FindServiceSpecifier::Specific(service_id)); - let available_service_instances = consumer_discovery.get_available_instances().unwrap(); - - if available_service_instances.is_empty() { - debug!("No producer yet available for {}, waiting...", topic); - sleep(Duration::from_millis(500)); - tries += 1; - if tries > 100 { - break; - } - continue; - } - - // Select service instance at specific handle_index - let handle_index = 0; // or any index you need from vector of instances - let consumer_builder = available_service_instances.into_iter().nth(handle_index).unwrap(); - - return consumer_builder.build().unwrap(); - } - panic!("can't create consumer for {topic}") -} diff --git a/examples/rust/mini-adas/src/activities/input.rs b/examples/rust/mini-adas/src/activities/input.rs new file mode 100644 index 0000000..d9820d4 --- /dev/null +++ b/examples/rust/mini-adas/src/activities/input.rs @@ -0,0 +1,99 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#[cfg(not(feature = "com_mw"))] +use feo_com::interface::ActivityInput; + +#[cfg(feature = "com_mw")] +use crate::config::mw_com_runtime; + +#[cfg(not(feature = "com_mw"))] +use score_log::fmt::ScoreDebug; + +#[cfg(not(feature = "com_mw"))] +use core::fmt::Debug; + +#[cfg(feature = "com_mw")] +use score_log::debug; + +#[cfg(feature = "com_mw")] +use com_api::{ + Builder, FindServiceSpecifier, InstanceSpecifier, Interface, LolaRuntimeImpl, Runtime, ServiceDiscovery, +}; + +#[cfg(feature = "com_mw")] +macro_rules! input { + ($interface:ident, $topic:ident, $mapping_fn:expr) => { + Box::new(feo_com::mw_com::MwComInput::new( + ($mapping_fn)($crate::activities::input::create_consumer::<$interface>($topic)) + .subscribe(1) + .unwrap(), + feo_com::interface::DebugWrapper(core::cell::RefCell::new(SampleContainer::new(1))), + )) + }; +} + +#[cfg(not(feature = "com_mw"))] +macro_rules! input { + ($interface:ident, $topic:ident, $mapping_fn:expr) => { + $crate::activities::input::activity_input($topic) + }; +} + +pub(crate) use input; + +#[cfg(feature = "com_mw")] +pub fn create_consumer(topic: &str) -> ::Consumer { + use feo_time::Duration; + use std::thread::sleep; + + debug!("Creating MW COM Consumer for {}...", topic); + let mut tries = 0; + loop { + let service_id = InstanceSpecifier::new(topic).expect("Failed to create InstanceSpecifier"); + let consumer_discovery = mw_com_runtime().find_service::(FindServiceSpecifier::Specific(service_id)); + let available_service_instances = consumer_discovery.get_available_instances().unwrap(); + + if available_service_instances.is_empty() { + debug!("No producer yet available for {}, waiting...", topic); + sleep(Duration::from_millis(500).into()); + tries += 1; + if tries > 100 { + break; + } + continue; + } + + // Select service instance at specific handle_index + let handle_index = 0; // or any index you need from vector of instances + let consumer_builder = available_service_instances.into_iter().nth(handle_index).unwrap(); + + return consumer_builder.build().unwrap(); + } + panic!("can't create consumer for {topic}") +} + +/// Create an activity input. +#[cfg(not(feature = "com_mw"))] +pub fn activity_input(topic: &str) -> Box> +where + T: Debug + ScoreDebug + 'static, +{ + #[cfg(feature = "com_linux_shm")] + use feo_com::linux_shm::LinuxShmInput; + + #[cfg(feature = "com_iox2")] + return Box::new(Iox2Input::new(topic)); + #[cfg(feature = "com_linux_shm")] + return Box::new(LinuxShmInput::new(topic)); +} diff --git a/examples/rust/mini-adas/src/activities/mod.rs b/examples/rust/mini-adas/src/activities/mod.rs index 6e2518a..d4c183d 100644 --- a/examples/rust/mini-adas/src/activities/mod.rs +++ b/examples/rust/mini-adas/src/activities/mod.rs @@ -12,3 +12,5 @@ ********************************************************************************/ pub mod components; +pub mod input; +pub mod output; diff --git a/examples/rust/mini-adas/src/activities/output.rs b/examples/rust/mini-adas/src/activities/output.rs new file mode 100644 index 0000000..8fc919d --- /dev/null +++ b/examples/rust/mini-adas/src/activities/output.rs @@ -0,0 +1,71 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#[cfg(feature = "com_mw")] +use crate::config::mw_com_runtime; +#[cfg(feature = "com_mw")] +use com_api::{Builder, InstanceSpecifier, Interface, LolaRuntimeImpl, Producer, Runtime}; +#[cfg(not(feature = "com_mw"))] +use core::fmt::Debug; +#[cfg(not(feature = "com_mw"))] +use feo_com::interface::ActivityOutput; +#[cfg(feature = "com_mw")] +use score_log::debug; +#[cfg(not(feature = "com_mw"))] +use score_log::fmt::ScoreDebug; + +#[cfg(feature = "com_iox2")] +use feo_com::iox2::Iox2Output; +#[cfg(feature = "com_linux_shm")] +use feo_com::linux_shm::LinuxShmOutput; + +#[cfg(feature = "com_mw")] +macro_rules! output { + ($interface:ident, $topic:ident, $mapping_fn:expr) => { + Box::new(feo_com::mw_com::MwComOutput::new(feo_com::interface::DebugWrapper( + ($mapping_fn)($crate::activities::output::create_producer::<$interface>($topic)), + ))) + }; +} + +#[cfg(not(feature = "com_mw"))] +macro_rules! output { + ($interface:ident, $topic:ident, $mapping_fn:expr) => { + $crate::activities::output::activity_output($topic) + }; +} + +pub(crate) use output; + +#[cfg(feature = "com_mw")] +pub fn create_producer( + topic: &str, +) -> <::Producer as Producer>::OfferedProducer { + debug!("Creating MW COM Producer for {}...", topic); + let service_id = InstanceSpecifier::new(topic).expect("Failed to create InstanceSpecifier"); + let producer_builder = mw_com_runtime().producer_builder::(service_id); + let producer = producer_builder.build().unwrap(); + producer.offer().expect("can't offer a producer") +} + +/// Create an activity output. +#[cfg(not(feature = "com_mw"))] +pub fn activity_output(topic: &str) -> Box> +where + T: Debug + ScoreDebug + 'static, +{ + #[cfg(feature = "com_iox2")] + return Box::new(Iox2Output::new(topic)); + #[cfg(feature = "com_linux_shm")] + return Box::new(LinuxShmOutput::new(topic)); +} diff --git a/examples/rust/mini-adas/src/bin/adas_primary.rs b/examples/rust/mini-adas/src/bin/adas_primary.rs index dcb6f36..0221929 100644 --- a/examples/rust/mini-adas/src/bin/adas_primary.rs +++ b/examples/rust/mini-adas/src/bin/adas_primary.rs @@ -11,9 +11,14 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ +#[cfg(not(feature = "com_mw"))] +use feo::agent::com_init::initialize_com_primary; use feo::ids::AgentId; use feo_time::Duration; +#[cfg(feature = "com_mw")] use mini_adas::config::mw_com_runtime; +#[cfg(not(feature = "com_mw"))] +use mini_adas::config::{agent_assignments_ids, topic_dependencies, COM_BACKEND, MAX_ADDITIONAL_SUBSCRIBERS}; use score_log::{error, info, LevelFilter}; use std::collections::HashSet; use stdout_logger::StdoutLoggerBuilder; @@ -36,7 +41,18 @@ fn main() { let config = cfg::make_config(params); + // Initialize topics. Do not drop. + #[cfg(not(feature = "com_mw"))] + let _topic_guards = initialize_com_primary( + COM_BACKEND, + AGENT_ID, + topic_dependencies(), + &agent_assignments_ids(), + MAX_ADDITIONAL_SUBSCRIBERS, + ); + // Initialize MW COM + #[cfg(feature = "com_mw")] mw_com_runtime(); // Setup and run primary diff --git a/examples/rust/mini-adas/src/bin/adas_secondary.rs b/examples/rust/mini-adas/src/bin/adas_secondary.rs index 245c734..6a0d4be 100644 --- a/examples/rust/mini-adas/src/bin/adas_secondary.rs +++ b/examples/rust/mini-adas/src/bin/adas_secondary.rs @@ -11,7 +11,15 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ +#[cfg(not(feature = "com_mw"))] +use feo::agent::com_init::initialize_com_secondary; +#[cfg(not(feature = "com_mw"))] +use feo::ids::ActivityId; +#[cfg(not(feature = "com_mw"))] +use mini_adas::config::{agent_assignments_ids, topic_dependencies, COM_BACKEND}; use score_log::{info, LevelFilter}; +#[cfg(not(feature = "com_mw"))] +use std::collections::HashSet; use stdout_logger::StdoutLoggerBuilder; #[cfg(any(feature = "signalling_direct_tcp", feature = "signalling_direct_unix"))] @@ -48,6 +56,7 @@ fn main() { }; // determine set of activity ids belonging to this agent + #[cfg(not(feature = "com_mw"))] let local_activities: HashSet = agent_assignments_ids() .remove(¶ms.agent_id) .unwrap() @@ -56,6 +65,10 @@ fn main() { .copied() .collect(); + // Initialize topics. Do not drop. + #[cfg(not(feature = "com_mw"))] + let _topic_guards = initialize_com_secondary(COM_BACKEND, topic_dependencies(), &local_activities); + let secondary = Secondary::new(config); secondary.run(); } @@ -84,6 +97,20 @@ fn main() { bind_address_receivers: NodeAddress::Tcp(BIND_ADDR2), }; + // determine set of activity ids belonging to this agent + #[cfg(not(feature = "com_mw"))] + let local_activities: HashSet = agent_assignments_ids() + .remove(¶ms.agent_id) + .unwrap() + .iter() + .flat_map(|(_, acts)| acts.iter()) + .copied() + .collect(); + + // Initialize topics. Do not drop. + #[cfg(not(feature = "com_mw"))] + let _topic_guards = initialize_com_secondary(COM_BACKEND, topic_dependencies(), &local_activities); + let secondary = Secondary::new(config); secondary.run(); } @@ -115,6 +142,20 @@ fn main() { bind_address_receivers: NodeAddress::UnixSocket(socket_paths().1), }; + // determine set of activity ids belonging to this agent + #[cfg(not(feature = "com_mw"))] + let local_activities: HashSet = agent_assignments_ids() + .remove(¶ms.agent_id) + .unwrap() + .iter() + .flat_map(|(_, acts)| acts.iter()) + .copied() + .collect(); + + // Initialize topics. Do not drop. + #[cfg(not(feature = "com_mw"))] + let _topic_guards = initialize_com_secondary(COM_BACKEND, topic_dependencies(), &local_activities); + let secondary = Secondary::new(config); secondary.run(); } diff --git a/examples/rust/mini-adas/src/config.rs b/examples/rust/mini-adas/src/config.rs index fe0e57a..99e09a3 100644 --- a/examples/rust/mini-adas/src/config.rs +++ b/examples/rust/mini-adas/src/config.rs @@ -15,16 +15,17 @@ use crate::activities::components::{ BrakeController, Camera, EmergencyBraking, EnvironmentRenderer, LaneAssist, NeuralNet, Radar, SteeringController, TrajectoryVisualizer, }; +#[cfg(feature = "com_mw")] use com_api::{Builder, LolaRuntimeBuilderImpl, LolaRuntimeImpl, RuntimeBuilder}; use core::net::{IpAddr, Ipv4Addr, SocketAddr}; use feo::activity::{ActivityBuilder, ActivityIdAndBuilder}; use feo::ids::{ActivityId, AgentId, WorkerId}; use feo::topicspec::{Direction, TopicSpecification}; +#[cfg(not(feature = "com_mw"))] use feo_com::interface::ComBackend; use mini_adas_gen::{BrakeInstruction, CameraImage, RadarScan, Scene, Steering}; use std::collections::HashMap; use std::path::{Path, PathBuf}; -use std::sync::LazyLock; pub type WorkerAssignment = (WorkerId, Vec<(ActivityId, Box)>); @@ -48,7 +49,9 @@ pub const TOPIC_RADAR_FRONT: &str = "/feo/com/MiniAdasRadar"; /// Allow up to two recorder processes (that potentially need to subscribe to every topic) pub const MAX_ADDITIONAL_SUBSCRIBERS: usize = 2; +#[cfg(feature = "com_mw")] pub fn mw_com_runtime() -> &'static LolaRuntimeImpl { + use std::sync::LazyLock; static RUNTIME: LazyLock = LazyLock::new(|| { let mut lola_runtime_builder = LolaRuntimeBuilderImpl::new(); lola_runtime_builder.load_config(&PathBuf::from("./examples/rust/mini-adas/etc/mw_com_config.json")); diff --git a/src/feo-com/BUILD.bazel b/src/feo-com/BUILD.bazel index 729dca0..69f747a 100644 --- a/src/feo-com/BUILD.bazel +++ b/src/feo-com/BUILD.bazel @@ -13,6 +13,35 @@ load("@rules_rust//rust:defs.bzl", "rust_library") +rust_library( + name = "libfeo_com_rust_mw_com", + srcs = [ + "src/interface.rs", + # Disabled due to compilation error of iceoryx2 + # "src/iox2/mod.rs", + "src/lib.rs", + "src/linux_shm/mod.rs", + "src/linux_shm/shared_memory.rs", + "src/mw_com/mod.rs", + ], + crate_features = [ + # Disabled due to compilation error of iceoryx2 + # "ipc_iceoryx2", + "ipc_linux_shm", + "ipc_mw_com", + ], + crate_name = "feo_com", + visibility = ["//visibility:public"], + deps = [ + # Disabled due to compilation error of iceoryx2 + # "@score_crates//:iceoryx2", + "@score_crates//:nix", + "@score_crates//:rand", + "@score_baselibs_rust//src/log/score_log", + "@score_communication//score/mw/com/impl/rust/com-api/com-api", + ], +) + rust_library( name = "libfeo_com_rust", srcs = [ @@ -24,8 +53,6 @@ rust_library( "src/linux_shm/shared_memory.rs", ], crate_features = [ - # bazel has a different concept (select) for optional dependencies than cargo, - # so we activate all features until we refactor to select # Disabled due to compilation error of iceoryx2 # "ipc_iceoryx2", "ipc_linux_shm", @@ -33,8 +60,6 @@ rust_library( crate_name = "feo_com", visibility = ["//visibility:public"], deps = [ - # all_crate_deps only contains optional dependencies which are default-activated, - # so we add all optional dependencies here to be independent of cargo default features # Disabled due to compilation error of iceoryx2 # "@score_crates//:iceoryx2", "@score_crates//:nix", diff --git a/src/feo-com/src/interface.rs b/src/feo-com/src/interface.rs index 595bca1..d5af86d 100644 --- a/src/feo-com/src/interface.rs +++ b/src/feo-com/src/interface.rs @@ -34,15 +34,74 @@ use crate::linux_shm; use crate::linux_shm::shared_memory::{MappingMode, TopicInitializationAgentRole}; #[cfg(feature = "ipc_linux_shm")] use crate::linux_shm::{LinuxShmInputGuard, LinuxShmOutputGuard, LinuxShmOutputUninitGuard}; +#[cfg(feature = "ipc_mw_com")] +use crate::mw_com::{MwComInputGuard, MwComOutputGuard, MwComOutputUninitGuard}; use alloc::boxed::Box; use core::any::Any; use core::fmt; +use core::fmt::Debug; use core::mem::MaybeUninit; use core::ops::{Deref, DerefMut}; use score_log::fmt::ScoreDebug; pub type Topic<'a> = &'a str; +#[cfg(feature = "ipc_mw_com")] +pub trait FeoComData: Debug + ScoreDebug + com_api::CommData {} + +#[cfg(not(feature = "ipc_mw_com"))] +pub trait FeoComData: Debug + ScoreDebug {} + +#[cfg(feature = "ipc_mw_com")] +pub trait FeoComDefault: Default + com_api::PlacementDefault {} + +#[cfg(not(feature = "ipc_mw_com"))] +pub trait FeoComDefault: Default {} + +#[cfg(feature = "ipc_mw_com")] +impl FeoComData for T {} + +#[cfg(not(feature = "ipc_mw_com"))] +impl FeoComData for T {} + +#[cfg(feature = "ipc_mw_com")] +impl FeoComDefault for T {} + +#[cfg(not(feature = "ipc_mw_com"))] +impl FeoComDefault for T {} + +pub struct DebugWrapper(pub T); + +impl core::fmt::Debug for DebugWrapper { + fn fmt(&self, _f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + Ok(()) + } +} + +impl score_log::fmt::ScoreDebug for DebugWrapper { + fn fmt( + &self, + _w: &mut dyn score_log::fmt::ScoreWrite, + _spec: &score_log::fmt::FormatSpec, + ) -> score_log::fmt::Result { + Ok(()) + } +} + +impl core::ops::Deref for DebugWrapper { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DebugWrapper { + fn deref_mut(&mut self) -> &mut ::Target { + &mut self.0 + } +} + // COM backend runtime switch. #[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] pub enum ComBackend { @@ -50,6 +109,8 @@ pub enum ComBackend { Iox2, #[cfg(feature = "ipc_linux_shm")] LinuxShm, + #[cfg(feature = "ipc_mw_com")] + MwCom, } /// Error type of communication module @@ -59,13 +120,21 @@ pub enum Error { SendFailed, } +#[cfg(feature = "ipc_mw_com")] +impl From for Error { + fn from(_e: com_api::Error) -> Self { + // TODO + Self::SendFailed + } +} + /// A trait for structs which can provide handles to input buffers pub trait ActivityInput: fmt::Debug where - T: fmt::Debug + ScoreDebug, + T: FeoComData, { /// Get a handle to an input buffer - fn read(&self) -> Result, Error>; + fn read(&self) -> Result, Error>; } /// Handle to an input buffer @@ -73,19 +142,23 @@ where /// This handle wraps buffers of specific com implementations /// and thereby provides references to the buffer with a lifetime. /// It is an enum so that it has a size known at compile-time. -pub enum InputGuard +pub enum InputGuard<'a, T> where - T: fmt::Debug + ScoreDebug, + T: FeoComData, { #[cfg(feature = "ipc_iceoryx2")] Iox2(Iox2InputGuard), #[cfg(feature = "ipc_linux_shm")] LinuxShm(LinuxShmInputGuard), + #[cfg(feature = "ipc_mw_com")] + MwCom(MwComInputGuard<'a, T>), + #[cfg(not(feature = "ipc_mw_com"))] + _Placeholder(core::marker::PhantomData<&'a T>), } -impl Deref for InputGuard +impl Deref for InputGuard<'_, T> where - T: fmt::Debug + ScoreDebug, + T: FeoComData, { type Target = T; @@ -95,26 +168,30 @@ where Self::Iox2(guard) => guard, #[cfg(feature = "ipc_linux_shm")] Self::LinuxShm(guard) => guard, + #[cfg(feature = "ipc_mw_com")] + Self::MwCom(guard) => guard, + #[cfg(not(feature = "ipc_mw_com"))] + Self::_Placeholder(_) => unimplemented!(), } } } /// A trait for structs which can provide handles to uninitialized output buffers -pub trait ActivityOutput: fmt::Debug +pub trait ActivityOutput: Debug where - T: fmt::Debug + ScoreDebug, + T: FeoComData, { /// Get a handle to an uninitialized output buffer - fn write_uninit(&mut self) -> Result, Error>; + fn write_uninit(&mut self) -> Result, Error>; } /// A trait for structs which can provide handles to default-initialized output buffers -pub trait ActivityOutputDefault: fmt::Debug +pub trait ActivityOutputDefault: Debug where - T: fmt::Debug + ScoreDebug + Default, + T: FeoComData + FeoComDefault, { /// Get a handle to a default initialized output buffer - fn write_init(&mut self) -> Result, Error>; + fn write_init(&mut self) -> Result, Error>; } /// Handle to an initialized output buffer @@ -138,16 +215,20 @@ where /// For the buffer to be receivable as input, it has to be [Self::send], /// consuming the handle. #[must_use = "buffer has to be sent to be observable"] -pub enum OutputGuard { +pub enum OutputGuard<'a, T: FeoComData> { #[cfg(feature = "ipc_iceoryx2")] Iox2(Iox2OutputGuard), #[cfg(feature = "ipc_linux_shm")] LinuxShm(LinuxShmOutputGuard), + #[cfg(feature = "ipc_mw_com")] + MwCom(MwComOutputGuard<'a, T>), + #[cfg(not(feature = "ipc_mw_com"))] + _Placeholder(core::marker::PhantomData<&'a T>), } -impl OutputGuard +impl OutputGuard<'_, T> where - T: fmt::Debug + ScoreDebug, + T: FeoComData, { /// Send this buffer pub fn send(self) -> Result<(), Error> { @@ -156,13 +237,17 @@ where Self::Iox2(guard) => guard.send(), #[cfg(feature = "ipc_linux_shm")] Self::LinuxShm(guard) => guard.send(), + #[cfg(feature = "ipc_mw_com")] + Self::MwCom(guard) => guard.send(), + #[cfg(not(feature = "ipc_mw_com"))] + Self::_Placeholder(_) => unimplemented!(), } } } -impl Deref for OutputGuard +impl Deref for OutputGuard<'_, T> where - T: fmt::Debug + ScoreDebug, + T: FeoComData, { type Target = T; @@ -172,13 +257,17 @@ where Self::Iox2(guard) => guard, #[cfg(feature = "ipc_linux_shm")] Self::LinuxShm(guard) => guard, + #[cfg(feature = "ipc_mw_com")] + Self::MwCom(guard) => guard.deref(), + #[cfg(not(feature = "ipc_mw_com"))] + Self::_Placeholder(_) => unimplemented!(), } } } -impl DerefMut for OutputGuard +impl DerefMut for OutputGuard<'_, T> where - T: fmt::Debug + ScoreDebug + Default, + T: FeoComData + Default, { fn deref_mut(&mut self) -> &mut Self::Target { match self { @@ -186,6 +275,10 @@ where Self::Iox2(guard) => guard, #[cfg(feature = "ipc_linux_shm")] Self::LinuxShm(guard) => guard, + #[cfg(feature = "ipc_mw_com")] + Self::MwCom(guard) => guard.deref_mut(), + #[cfg(not(feature = "ipc_mw_com"))] + Self::_Placeholder(_) => unimplemented!(), } } } @@ -206,16 +299,20 @@ where /// - Writing directly to the uninitialized memory and call [Self::assume_init]. /// This is `unsafe` and the caller has to ensure that the buffer is initialized /// to a valid value before calling [Self::assume_init]. -pub enum OutputUninitGuard { +pub enum OutputUninitGuard<'a, T: FeoComData> { #[cfg(feature = "ipc_iceoryx2")] Iox2(Iox2OutputUninitGuard), #[cfg(feature = "ipc_linux_shm")] LinuxShm(LinuxShmOutputUninitGuard), + #[cfg(feature = "ipc_mw_com")] + MwCom(MwComOutputUninitGuard<'a, T>), + #[cfg(not(feature = "ipc_mw_com"))] + _Placeholder(core::marker::PhantomData<&'a T>), } -impl OutputUninitGuard +impl<'a, T> OutputUninitGuard<'a, T> where - T: fmt::Debug + ScoreDebug, + T: FeoComData, { /// Assume the backing buffer is initialized /// @@ -223,44 +320,56 @@ where /// /// The caller has to ensure that the uninitialized memory /// was completely initialized with a valid value. - pub unsafe fn assume_init(self) -> OutputGuard { + pub unsafe fn assume_init(self) -> OutputGuard<'a, T> { match self { #[cfg(feature = "ipc_iceoryx2")] Self::Iox2(guard) => unsafe { OutputGuard::Iox2(guard.assume_init()) }, #[cfg(feature = "ipc_linux_shm")] Self::LinuxShm(guard) => OutputGuard::LinuxShm(guard.assume_init()), + #[cfg(feature = "ipc_mw_com")] + Self::MwCom(guard) => OutputGuard::MwCom(guard.assume_init()), + #[cfg(not(feature = "ipc_mw_com"))] + Self::_Placeholder(_) => unimplemented!(), } } /// Write a complete valid type into the uninitialized buffer, initializing it in the process - pub fn write_payload(self, value: T) -> OutputGuard { + pub fn write_payload(self, value: T) -> OutputGuard<'a, T> { match self { #[cfg(feature = "ipc_iceoryx2")] Self::Iox2(guard) => OutputGuard::Iox2(guard.write_payload(value)), #[cfg(feature = "ipc_linux_shm")] Self::LinuxShm(guard) => OutputGuard::LinuxShm(guard.write_payload(value)), + #[cfg(feature = "ipc_mw_com")] + Self::MwCom(guard) => OutputGuard::MwCom(guard.write_payload(value)), + #[cfg(not(feature = "ipc_mw_com"))] + Self::_Placeholder(_) => unimplemented!(), } } } -impl OutputUninitGuard +impl<'a, T> OutputUninitGuard<'a, T> where - T: fmt::Debug + ScoreDebug + Default, + T: FeoComData + FeoComDefault, { /// Initialize the uninitialized buffer with its [Default] trait - pub fn init(self) -> OutputGuard { + pub fn init(self) -> OutputGuard<'a, T> { match self { #[cfg(feature = "ipc_iceoryx2")] Self::Iox2(guard) => OutputGuard::Iox2(guard.init()), #[cfg(feature = "ipc_linux_shm")] Self::LinuxShm(guard) => OutputGuard::LinuxShm(guard.init()), + #[cfg(feature = "ipc_mw_com")] + Self::MwCom(guard) => OutputGuard::MwCom(guard.init()), + #[cfg(not(feature = "ipc_mw_com"))] + Self::_Placeholder(_) => unimplemented!(), } } } -impl Deref for OutputUninitGuard +impl Deref for OutputUninitGuard<'_, T> where - T: fmt::Debug + ScoreDebug, + T: FeoComData, { type Target = MaybeUninit; @@ -270,13 +379,17 @@ where Self::Iox2(guard) => guard, #[cfg(feature = "ipc_linux_shm")] Self::LinuxShm(guard) => guard, + #[cfg(feature = "ipc_mw_com")] + Self::MwCom(guard) => guard, + #[cfg(not(feature = "ipc_mw_com"))] + Self::_Placeholder(_) => unimplemented!(), } } } -impl DerefMut for OutputUninitGuard +impl DerefMut for OutputUninitGuard<'_, T> where - T: fmt::Debug + ScoreDebug, + T: FeoComData, { fn deref_mut(&mut self) -> &mut Self::Target { match self { @@ -284,6 +397,10 @@ where Self::Iox2(guard) => guard, #[cfg(feature = "ipc_linux_shm")] Self::LinuxShm(guard) => guard, + #[cfg(feature = "ipc_mw_com")] + Self::MwCom(guard) => guard, + #[cfg(not(feature = "ipc_mw_com"))] + Self::_Placeholder(_) => unimplemented!(), } } } @@ -338,7 +455,7 @@ impl<'a> ComBackendTopicSecondaryInitialization<'a> { } } -pub fn init_topic_primary( +pub fn init_topic_primary( params: &ComBackendTopicPrimaryInitialization, ) -> TopicHandle { match params.backend { @@ -360,10 +477,13 @@ pub fn init_topic_primary( }; linux_shm::init_topic::(params.topic, mapping_mode, agent_role) }, + + #[cfg(feature = "ipc_mw_com")] + ComBackend::MwCom => Box::new(()).into(), } } -pub fn init_topic_secondary( +pub fn init_topic_secondary( params: &ComBackendTopicSecondaryInitialization, ) -> TopicHandle { match params.backend { @@ -383,6 +503,9 @@ pub fn init_topic_secondary( }; linux_shm::init_topic::(params.topic, mapping_mode, agent_role) }, + + #[cfg(feature = "ipc_mw_com")] + ComBackend::MwCom => Box::new(()).into(), } } @@ -409,5 +532,7 @@ pub fn run_backend(backend: ComBackend, _local_requests: usize, _remote_requests ComBackend::LinuxShm => { linux_shm::ComRuntime::run_service(_remote_requests); }, + #[cfg(feature = "ipc_mw_com")] + ComBackend::MwCom => {}, } } diff --git a/src/feo-com/src/lib.rs b/src/feo-com/src/lib.rs index a1aad03..6cea65d 100644 --- a/src/feo-com/src/lib.rs +++ b/src/feo-com/src/lib.rs @@ -28,3 +28,5 @@ pub mod interface; pub mod iox2; #[cfg(feature = "ipc_linux_shm")] pub mod linux_shm; +#[cfg(feature = "ipc_mw_com")] +pub mod mw_com; diff --git a/src/feo-com/src/linux_shm/mod.rs b/src/feo-com/src/linux_shm/mod.rs index a4b3767..9e5470b 100644 --- a/src/feo-com/src/linux_shm/mod.rs +++ b/src/feo-com/src/linux_shm/mod.rs @@ -33,8 +33,8 @@ pub(crate) mod shared_memory; use crate::interface::{ - ActivityInput, ActivityOutput, ActivityOutputDefault, Error, InputGuard, OutputGuard, OutputUninitGuard, Topic, - TopicHandle, + ActivityInput, ActivityOutput, ActivityOutputDefault, Error, FeoComData, FeoComDefault, InputGuard, OutputGuard, + OutputUninitGuard, Topic, TopicHandle, }; use crate::linux_shm::shared_memory::{ MappedPtrReadGuard, MappedPtrWriteGuard, MappingMode, ReadWriteAccessControlPtr, TopicInitializationAgentRole, @@ -55,7 +55,6 @@ use nix::fcntl::OFlag; use nix::sys::mman::{mmap, shm_open, MapFlags, ProtFlags}; use nix::sys::stat::Mode; use nix::unistd; -use score_log::fmt::ScoreDebug; use score_log::{debug, error, info}; use std::collections::HashMap; use std::io::{read_to_string, Write}; @@ -299,7 +298,7 @@ impl ComRuntime { } // Initialize the topic and register it in the global COM runtime -pub fn init_topic( +pub fn init_topic( topic: Topic, mapping_mode: MappingMode, agent_role: TopicInitializationAgentRole, @@ -308,9 +307,9 @@ pub fn init_topic( TopicHandle::from(Box::new(())) } -pub struct LinuxShmInputGuard(MappedPtrReadGuard); +pub struct LinuxShmInputGuard(MappedPtrReadGuard); -impl Deref for LinuxShmInputGuard { +impl Deref for LinuxShmInputGuard { type Target = T; fn deref(&self) -> &Self::Target { @@ -318,13 +317,13 @@ impl Deref for LinuxShmInputGuard { } } -pub struct LinuxShmOutputGuard { +pub struct LinuxShmOutputGuard { ptr: MappedPtrWriteGuard, } impl LinuxShmOutputGuard where - T: Debug + ScoreDebug, + T: FeoComData, { pub(crate) fn send(self) -> Result<(), Error> { self.ptr.send(); @@ -332,7 +331,7 @@ where } } -impl Deref for LinuxShmOutputGuard { +impl Deref for LinuxShmOutputGuard { type Target = T; fn deref(&self) -> &Self::Target { @@ -340,17 +339,17 @@ impl Deref for LinuxShmOutputGuard { } } -impl DerefMut for LinuxShmOutputGuard { +impl DerefMut for LinuxShmOutputGuard { fn deref_mut(&mut self) -> &mut Self::Target { DerefMut::deref_mut(&mut self.ptr) } } -pub struct LinuxShmOutputUninitGuard(MappedPtrWriteGuard); +pub struct LinuxShmOutputUninitGuard(MappedPtrWriteGuard); impl LinuxShmOutputUninitGuard where - T: Debug + ScoreDebug, + T: FeoComData, { // Value is initialized when allocated pub(crate) fn assume_init(self) -> LinuxShmOutputGuard { @@ -366,7 +365,7 @@ where impl LinuxShmOutputUninitGuard where - T: Debug + ScoreDebug + Default, + T: FeoComData + Default, { // Overwrites with [Default::default] pub(crate) fn init(mut self) -> LinuxShmOutputGuard { @@ -375,7 +374,7 @@ where } } -impl Deref for LinuxShmOutputUninitGuard { +impl Deref for LinuxShmOutputUninitGuard { type Target = MaybeUninit; fn deref(&self) -> &Self::Target { @@ -385,7 +384,7 @@ impl Deref for LinuxShmOutputUninitGuard { } } -impl DerefMut for LinuxShmOutputUninitGuard { +impl DerefMut for LinuxShmOutputUninitGuard { fn deref_mut(&mut self) -> &mut Self::Target { // Safety: MaybeUninit is guaranteed to have the same size, alignment, and ABI as T (according to Rust docs) // T is guarantied to be initialized @@ -399,7 +398,7 @@ pub struct LinuxShmInput { _type: PhantomData, } -impl LinuxShmInput { +impl LinuxShmInput { pub fn new(topic: Topic) -> Self { Self { ptr: ComRuntime::global_runtime().topic_mapping::(topic, MappingMode::Read), @@ -410,9 +409,9 @@ impl LinuxShmInput { impl ActivityInput for LinuxShmInput where - T: Debug + ScoreDebug + 'static, + T: FeoComData + 'static, { - fn read(&self) -> Result, Error> { + fn read(&self) -> Result, Error> { Ok(InputGuard::LinuxShm(LinuxShmInputGuard(self.ptr.get()))) } } @@ -423,7 +422,7 @@ pub struct LinuxShmOutput { _type: PhantomData, } -impl LinuxShmOutput { +impl LinuxShmOutput { pub fn new(topic: Topic) -> Self { Self { ptr: ComRuntime::global_runtime().topic_mapping::(topic, MappingMode::Write), @@ -434,10 +433,10 @@ impl LinuxShmOutput { impl ActivityOutput for LinuxShmOutput where - T: Debug + ScoreDebug + 'static, + T: FeoComData + 'static, { // Initialized when allocated - fn write_uninit(&mut self) -> Result, Error> { + fn write_uninit(&mut self) -> Result, Error> { Ok(OutputUninitGuard::LinuxShm(LinuxShmOutputUninitGuard( self.ptr.get_mut(), ))) @@ -446,10 +445,10 @@ where impl ActivityOutputDefault for LinuxShmOutput where - T: Debug + ScoreDebug + Default + 'static, + T: FeoComData + FeoComDefault + 'static, { // Overwrites with [Default::default] - fn write_init(&mut self) -> Result, Error> { + fn write_init(&mut self) -> Result, Error> { let mut ptr = self.ptr.get_mut(); *ptr = T::default(); Ok(OutputGuard::LinuxShm(LinuxShmOutputGuard { ptr })) diff --git a/src/feo-com/src/mw_com/mod.rs b/src/feo-com/src/mw_com/mod.rs new file mode 100644 index 0000000..dca5770 --- /dev/null +++ b/src/feo-com/src/mw_com/mod.rs @@ -0,0 +1,188 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::interface::{ + ActivityInput, ActivityOutput, ActivityOutputDefault, DebugWrapper, Error, FeoComData, FeoComDefault, InputGuard, + OutputGuard, OutputUninitGuard, +}; +use com_api::{ + LolaRuntimeImpl, PlacementDefault, Publisher, Runtime, SampleContainer, SampleMaybeUninit, SampleMut, Subscriber, + Subscription, +}; +use core::cell::RefCell; +use core::cell::UnsafeCell; +use core::fmt::Debug; +use core::mem::MaybeUninit; +use core::ops::Deref; +use core::ops::DerefMut; + +type MwComSubscription = + <::Subscriber as Subscriber>::Subscription; + +type MwComSample<'a, T> = as Subscription>::Sample<'a>; + +type MwComPublisher = ::Publisher; + +type MwComSampleMaybeUninit<'a, T> = as Publisher>::SampleMaybeUninit<'a>; + +type MwComSampleMut<'a, T> = as SampleMaybeUninit>::SampleMut; + +pub struct MwComInputGuard<'a, T: FeoComData>(MwComSample<'a, T>); + +impl Deref for MwComInputGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + Deref::deref(&self.0) + } +} + +pub struct MwComOutputGuard<'a, T: FeoComData>(MwComSampleMut<'a, T>); + +impl<'a, T> MwComOutputGuard<'a, T> +where + T: FeoComData, +{ + pub(crate) fn send(self) -> Result<(), Error> { + self.0.send()?; + Ok(()) + } +} + +impl Deref for MwComOutputGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + +impl DerefMut for MwComOutputGuard<'_, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.deref_mut() + } +} + +pub struct MwComOutputUninitGuard<'a, T: FeoComData>( + UnsafeCell< as Publisher>::SampleMaybeUninit<'a>>, +); + +impl<'a, T> MwComOutputUninitGuard<'a, T> +where + T: FeoComData, +{ + // Value is initialized when allocated + pub(crate) unsafe fn assume_init(self) -> MwComOutputGuard<'a, T> { + MwComOutputGuard(self.0.into_inner().assume_init()) + } + + // Overwrites with given value + pub(crate) fn write_payload(self, value: T) -> MwComOutputGuard<'a, T> { + MwComOutputGuard(self.0.into_inner().write(value)) + } +} + +impl<'a, T> MwComOutputUninitGuard<'a, T> +where + T: FeoComData + PlacementDefault + Default + 'a, +{ + // Overwrites with [Default::default] + pub(crate) fn init(self) -> MwComOutputGuard<'a, T> { + MwComOutputGuard(self.0.into_inner().write_default()) + } +} + +impl Deref for MwComOutputUninitGuard<'_, T> { + type Target = MaybeUninit; + + fn deref(&self) -> &Self::Target { + // SAFETY: not safe, needs support from MW COM API to be fixed + AsMut::as_mut(unsafe { &mut *self.0.get() }) + } +} + +impl DerefMut for MwComOutputUninitGuard<'_, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.get_mut().as_mut() + } +} + +#[derive(Debug)] +pub struct MwComInput { + subscription: MwComSubscription, + sample_container: DebugWrapper>>>, +} + +impl MwComInput { + pub fn new( + subscription: MwComSubscription, + sample_container: DebugWrapper>>>, + ) -> Self { + Self { + subscription, + sample_container, + } + } +} + +impl ActivityInput for MwComInput +where + T: FeoComData + 'static, +{ + fn read(&self) -> Result, Error> { + assert_eq!( + 1, + self.subscription + .try_receive(&mut self.sample_container.borrow_mut(), 1,) + .expect("receive failed"), + "mw com read failed" + ); + let result = self + .sample_container + .borrow_mut() + .pop_front() + .expect("pop_front failed"); + Ok(InputGuard::MwCom(MwComInputGuard(result))) + } +} + +#[derive(Debug)] +pub struct MwComOutput(DebugWrapper>); + +impl MwComOutput { + pub fn new(publisher: DebugWrapper>) -> Self { + Self(publisher) + } +} + +impl ActivityOutput for MwComOutput +where + T: FeoComData + 'static, +{ + // Initialized when allocated + fn write_uninit(&mut self) -> Result, Error> { + Ok(OutputUninitGuard::MwCom(MwComOutputUninitGuard(UnsafeCell::new( + self.0.allocate()?, + )))) + } +} + +impl ActivityOutputDefault for MwComOutput +where + T: FeoComData + FeoComDefault + 'static, +{ + // Overwrites with [Default::default] + fn write_init(&mut self) -> Result, Error> { + Ok(OutputGuard::MwCom(MwComOutputGuard(self.0.allocate()?.write_default()))) + } +} diff --git a/src/feo/BUILD.bazel b/src/feo/BUILD.bazel index 6e36cb1..efc7e87 100644 --- a/src/feo/BUILD.bazel +++ b/src/feo/BUILD.bazel @@ -33,10 +33,6 @@ rust_library( "src/error.rs", "src/ids.rs", "src/lib.rs", - "src/recording/mod.rs", - "src/recording/recorder.rs", - "src/recording/registry.rs", - "src/recording/transcoder.rs", "src/scheduler.rs", "src/signalling/common/interface.rs", "src/signalling/common/mod.rs", @@ -84,78 +80,3 @@ rust_library( "@score_crates//:mio", ], ) - -rust_library( - name = "libfeo_recording_rust", - srcs = [ - "src/activity.rs", - "src/agent/com_init.rs", - "src/agent/direct/mod.rs", - "src/agent/direct/primary.rs", - "src/agent/direct/primary_mpsc.rs", - "src/agent/direct/recorder.rs", - "src/agent/direct/secondary.rs", - "src/agent/mod.rs", - "src/agent/relayed/mod.rs", - "src/agent/relayed/primary.rs", - "src/agent/relayed/recorder.rs", - "src/agent/relayed/secondary.rs", - "src/cpp.rs", - "src/debug_fmt.rs", - "src/error.rs", - "src/ids.rs", - "src/lib.rs", - "src/recording/mod.rs", - "src/recording/recorder.rs", - "src/recording/registry.rs", - "src/recording/transcoder.rs", - "src/scheduler.rs", - "src/signalling/common/interface.rs", - "src/signalling/common/mod.rs", - "src/signalling/common/mpsc/endpoint.rs", - "src/signalling/common/mpsc/mod.rs", - "src/signalling/common/mpsc/primitives.rs", - "src/signalling/common/mpsc/worker.rs", - "src/signalling/common/signals.rs", - "src/signalling/common/socket/client.rs", - "src/signalling/common/socket/connection.rs", - "src/signalling/common/socket/mod.rs", - "src/signalling/common/socket/server.rs", - "src/signalling/direct/mod.rs", - "src/signalling/direct/mpsc/mod.rs", - "src/signalling/direct/mpsc/scheduler.rs", - "src/signalling/direct/recorder.rs", - "src/signalling/direct/scheduler.rs", - "src/signalling/direct/worker.rs", - "src/signalling/mod.rs", - "src/signalling/relayed/connectors/mod.rs", - "src/signalling/relayed/connectors/recorder.rs", - "src/signalling/relayed/connectors/relays.rs", - "src/signalling/relayed/connectors/scheduler.rs", - "src/signalling/relayed/connectors/secondary.rs", - "src/signalling/relayed/interface.rs", - "src/signalling/relayed/mod.rs", - "src/signalling/relayed/mpsc/endpoint.rs", - "src/signalling/relayed/mpsc/mod.rs", - "src/signalling/relayed/sockets/endpoint.rs", - "src/signalling/relayed/sockets/mod.rs", - "src/signalling/relayed/sockets_mpsc.rs", - "src/timestamp.rs", - "src/topicspec.rs", - "src/worker/mod.rs", - ], - crate_features = ["recording"], - crate_name = "feo", - visibility = ["//visibility:public"], - deps = [ - "//src/feo-com:libfeo_com_rust", - "//src/feo-time:libfeo_time_rust", - "//src/feo-tracing:libfeo_tracing_rust", - "@score_baselibs_rust//src/log/score_log", - "@score_crates//:ctrlc", - "@score_crates//:libc", - "@score_crates//:mio", - "@score_crates//:postcard", - "@score_crates//:serde", - ], -) diff --git a/src/feo/src/lib.rs b/src/feo/src/lib.rs index ad9f8e6..a3fbdf2 100644 --- a/src/feo/src/lib.rs +++ b/src/feo/src/lib.rs @@ -47,8 +47,6 @@ pub mod cpp; pub mod debug_fmt; pub mod error; pub mod ids; -#[cfg(feature = "recording")] -pub mod recording; pub mod scheduler; pub mod signalling; mod timestamp; diff --git a/src/feo/src/recording/mod.rs b/src/feo/src/recording/mod.rs deleted file mode 100644 index 2dd0415..0000000 --- a/src/feo/src/recording/mod.rs +++ /dev/null @@ -1,21 +0,0 @@ -// ******************************************************************************* -// Copyright (c) 2025 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache License Version 2.0 which is available at -// -// -// SPDX-License-Identifier: Apache-2.0 -// ******************************************************************************* - -#[cfg(feature = "recording")] -pub mod recorder; - -#[cfg(feature = "recording")] -pub mod registry; - -#[cfg(feature = "recording")] -mod transcoder; diff --git a/src/feo/src/recording/recorder.rs b/src/feo/src/recording/recorder.rs deleted file mode 100644 index bd3ec4a..0000000 --- a/src/feo/src/recording/recorder.rs +++ /dev/null @@ -1,308 +0,0 @@ -// ******************************************************************************* -// Copyright (c) 2025 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache License Version 2.0 which is available at -// -// -// SPDX-License-Identifier: Apache-2.0 -// ******************************************************************************* - -//! FEO data recorder. Records communication for debugging and development purposes - -use crate::ids::AgentId; -use crate::recording::registry::TypeRegistry; -use crate::recording::transcoder::ComRecTranscoder; -use crate::signalling::common::interface::ConnectRecorder; -use crate::signalling::common::signals::Signal; -use crate::timestamp; -use crate::timestamp::{timestamp, Timestamp}; -use alloc::boxed::Box; -use alloc::vec; -use alloc::vec::Vec; -use feo_time::Duration; -use feo_tracing::ScoreDebugIoError; -use io::Write; -use postcard::experimental::max_size::MaxSize; -use score_log::{debug, error, trace, ScoreDebug}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::io::BufWriter; -use std::thread; -use std::{fs, io}; - -/// Maximum allowed length of topics and type names in the recording -const TOPIC_TYPENAME_MAX_SIZE: usize = 256; - -/// The data recorder. -pub(crate) struct FileRecorder<'s> { - // ID of the recorder - id: AgentId, - - /// Connector to the primary - connector: Box, - - /// Receive timeout used on poll - receive_timeout: Duration, - - // A file writer receiving the data - writer: BufWriter, - - // Which topics with what types to record - rules: RecordingRules, - - // The type registry - registry: &'s TypeRegistry, - - // Transcoders reading and serializing com data - transcoders: Vec>, -} - -impl<'s> FileRecorder<'s> { - /// Create a new data recorder - pub(crate) fn new<'t: 's>( - id: AgentId, - connector: Box, - receive_timeout: Duration, - record_file: &'static str, - rules: RecordingRules, - registry: &'t TypeRegistry, - ) -> io::Result { - // Create the recording file - let file = fs::File::create(record_file)?; - let writer = BufWriter::new(file); - - Ok(Self { - id, - connector, - receive_timeout, - writer, - rules, - registry, - transcoders: vec![], - }) - } - - /// Run the recording - pub(crate) fn run(&mut self) { - // Create transcoders reading from the required topics - debug!("Creating transcoders"); - for (topic, type_name) in self.rules.iter() { - let info = self - .registry - .info_name(type_name) - .unwrap_or_else(|| panic!("type name {type_name} not in registry")); - let transcoder_builder = &info.comrec_builder; - let transcoder = transcoder_builder(topic); - debug!("Creating transcoder: {}, {}", topic, type_name); - self.transcoders.push(transcoder); - } - - debug!("Starting main loop"); - let msg_buf_size = self - .transcoders - .iter() - .map(|t| t.buffer_size()) - .max() - .unwrap_or_default(); - let mut msg_buf = vec![0; msg_buf_size]; - loop { - // Receive the next signal from the primary process - trace!("Waiting for next signal to record"); - - let Ok(received) = self.connector.receive(self.receive_timeout) else { - error!("Failed to receive signal, trying to continue"); - self.writer - .flush() - .unwrap_or_else(|_| error!("Failed to flush writer, trying to continue")); - continue; - }; - let Some(signal) = received else { - continue; - }; - debug!("Received signal {}", signal); - - match signal { - Signal::StartupSync(sync_info) => { - timestamp::initialize_from(sync_info); - }, - // If received a step signal, or an end-of-taskchain signal, - // record the current latest change of com data, then record the signal. - // Also, flush the recording file at whenever the end of the task chain is reached. - Signal::Step(_) => { - self.record_com_data(&mut msg_buf); - self.record_signal(signal); - }, - Signal::TaskChainEnd(_) => { - self.record_com_data(&mut msg_buf); - self.record_signal(signal); - self.flush(); - self.send_recorder_ready(); - }, - Signal::Terminate(_) => { - debug!( - "Recorder {} received Terminate signal. Acknowledging and exiting.", - self.id - ); - // Send TerminateAck to the primary. - if let Err(e) = self.connector.send_to_scheduler(&Signal::TerminateAck(self.id)) { - error!("Recorder {} failed to send TerminateAck: {:?}", self.id, e); - } - debug!("Recorder {} sent termination ack. Exiting.", self.id); - // Linger for a moment to ensure TerminateAck has time to be sent - // over the network before the thread exits and closes the socket. - thread::sleep(Duration::from_millis(100).into()); - return; // Graceful exit from the run loop - }, - - // Otherwise, only record the signal - _ => { - self.record_signal(signal); - }, - } - } - } - - /// Flush the recording file - fn flush(&mut self) { - if let Err(e) = self.writer.flush() { - panic!("failed to flush recording file: {e}"); - } - } - - // Record the latest changes of com data - fn record_com_data(&mut self, data_buffer: &mut [u8]) { - for transcoder in self.transcoders.iter() { - let data = transcoder.read_transcode(data_buffer); - if let Some(serialized_data) = data { - // create serialized data description record - assert!( - transcoder.type_name().len() <= TOPIC_TYPENAME_MAX_SIZE, - "serialized type name exceeds maximal size of {TOPIC_TYPENAME_MAX_SIZE}" - ); - assert!( - transcoder.topic().len() <= TOPIC_TYPENAME_MAX_SIZE, - "serialized type name exceeds maximal size of {TOPIC_TYPENAME_MAX_SIZE}" - ); - let description = DataDescriptionRecord { - timestamp: timestamp(), - type_name: transcoder.type_name(), - data_size: serialized_data.len(), - topic: transcoder.topic(), - }; - let data_desc_record = Record::DataDescription(description); - let mut buf = [0u8; Record::POSTCARD_MAX_SIZE]; - let serialized_header = postcard::to_slice(&data_desc_record, &mut buf).expect("serialization failed"); - - trace!("Writing data: {:?}", description); - - // Write description record and subsequent data block - // In case of failure, log an error message and continue - // (which may result in a corrupted file) - if let Err(e) = self - .writer - .write_all(serialized_header) - .and_then(|_| self.writer.write_all(serialized_data)) - { - error!("Failed to write data: {:?}", ScoreDebugIoError(e)); - } - } - } - } - - /// Record the given signal - fn record_signal(&mut self, signal: Signal) { - let signal_record = Record::Signal(SignalRecord { - signal, - timestamp: timestamp(), - }); - let mut buf = [0u8; Record::POSTCARD_MAX_SIZE]; - let serialized = postcard::to_slice(&signal_record, &mut buf).expect("serialization failed"); - if let Err(e) = self.writer.write_all(serialized) { - error!("Failed to write signal {:?}: {:?}", signal, ScoreDebugIoError(e)); - } - } - - // Send RecorderReady signal to the primary agent - fn send_recorder_ready(&mut self) { - let signal = Signal::RecorderReady((self.id, timestamp())); - self.connector - .send_to_scheduler(&signal) - .unwrap_or_else(|e| panic!("failed to send 'recorder_ready': {:?}", e)); - } -} - -impl Drop for FileRecorder<'_> { - fn drop(&mut self) { - // Try to flush pending data. - self.flush(); - } -} - -/// Set of recording rules -/// -/// Maps every topic to be recorded to a corresponding type name from the type registry -pub type RecordingRules = HashMap<&'static str, &'static str>; - -/// Possible records in the recording file -#[derive(Debug, Serialize, Deserialize, MaxSize)] -pub enum Record<'s> { - Signal(SignalRecord), - #[serde(borrow)] - DataDescription(DataDescriptionRecord<'s>), -} - -#[derive(Debug, Copy, Clone, Serialize, Deserialize, MaxSize)] -pub struct SignalRecord { - // The monotonic time at the moment of recording as duration since the epoch - pub timestamp: Timestamp, - // The recorded signal - pub signal: Signal, -} - -#[derive(Debug, Copy, Clone, Serialize, Deserialize, ScoreDebug)] -pub struct DataDescriptionRecord<'s> { - // The monotonic time at the moment of recording as duration since the epoch - pub timestamp: Timestamp, - /// size of the appended data - pub data_size: usize, - #[serde(borrow)] - /// restricted to 256 chars - pub type_name: &'s str, - #[serde(borrow)] - /// restricted to 256 chars - pub topic: &'s str, -} - -impl MaxSize for DataDescriptionRecord<'_> { - #[allow(clippy::identity_op)] - const POSTCARD_MAX_SIZE: usize = Timestamp::POSTCARD_MAX_SIZE + - usize::POSTCARD_MAX_SIZE + // data_size - 2*( // type_name, topic - usize::POSTCARD_MAX_SIZE + // len - TOPIC_TYPENAME_MAX_SIZE * u8::POSTCARD_MAX_SIZE // restrict to 256 bytes - ); -} - -#[cfg(test)] -mod test { - use super::*; - use alloc::string::String; - use core::time::Duration; - - #[test] - fn test_max_size_for_data_description_record() { - let s = String::from_utf8(vec![b'a'; TOPIC_TYPENAME_MAX_SIZE]).expect("valid string"); - let record = DataDescriptionRecord { - timestamp: Timestamp(Duration::MAX), - data_size: usize::MAX, - type_name: &s, - topic: &s, - }; - let mut buf = [0u8; DataDescriptionRecord::POSTCARD_MAX_SIZE]; - postcard::to_slice(&record, &mut buf).expect("should fit"); - } -} diff --git a/src/feo/src/recording/registry.rs b/src/feo/src/recording/registry.rs deleted file mode 100644 index 2ace242..0000000 --- a/src/feo/src/recording/registry.rs +++ /dev/null @@ -1,186 +0,0 @@ -// ******************************************************************************* -// Copyright (c) 2025 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache License Version 2.0 which is available at -// -// -// SPDX-License-Identifier: Apache-2.0 -// ******************************************************************************* - -//! Type registry -use crate::recording::transcoder::{ComRecTranscoderBuilder, RecordingTranscoder}; -use alloc::borrow::ToOwned as _; -use alloc::boxed::Box; -use feo_com::interface::ActivityInput; -use score_log::fmt::ScoreDebug; -use serde::Serialize; -use std::collections::HashMap; - -/// Registry of types used in the com layer -#[derive(Debug)] -pub struct TypeRegistry { - /// Map user-defined, human-readable type names to type information - map: RegistryMap, -} - -impl TypeRegistry { - /// Create empty type registry - pub fn new() -> Self { - let map = HashMap::default(); - Self { map } - } - - /// Helper method for adding a new registry entry - fn add_helper(&mut self, type_info: TypeInfo) -> &mut Self { - let type_name = type_info.type_name; - let previous_info = self.map.insert(type_name, type_info); - assert!(previous_info.is_none(), "type '{type_name}' already registered"); - self - } - - /// Add the given type to the registry - /// - /// The user may define a unique type name, otherwise the system type name will be used. - /// Note that system type names may not be unique in which case the method will panic. - /// - /// # Panics - /// - /// This method will panic if - /// - a type with identical type id (i.e. the same type) has already been registered - /// - the explicitly or implicitly provided type name is not unique - pub fn add( - &mut self, - type_name: Option<&'static str>, - input_builder: impl Fn(&str) -> Box> + Clone + Send + 'static, - ) -> &mut Self { - let type_name = type_name.unwrap_or(core::any::type_name::()); - let decser_builder = { - let type_name = type_name.to_owned(); - let input_builder = input_builder.clone(); - Box::new(move |topic: &str| { - let topic = topic.to_owned(); - let type_name = type_name.clone(); - RecordingTranscoder::::build(input_builder.clone(), topic, type_name) - }) as Box - }; - let type_info = TypeInfo { - type_name, - comrec_builder: decser_builder, - }; - self.add_helper(type_info) - } - - /// Import the given type registry into this registry - pub fn import(&mut self, other: TypeRegistry) -> &mut Self { - for (_, type_info) in other.map { - self.add_helper(type_info); - } - self - } - - /// Retrieve a [`TypeInfo`] for the given type name, or None if not existent - pub fn info_name(&self, type_name: &str) -> Option<&TypeInfo> { - self.map.get(type_name) - } -} - -impl Default for TypeRegistry { - fn default() -> Self { - TypeRegistry::new() - } -} - -/// Type registry map, mapping from human-readable type names to required objects for each type -type RegistryMap = HashMap<&'static str, TypeInfo>; - -/// Type information stored in the type registry -pub struct TypeInfo { - // Human-readable type name - pub type_name: &'static str, - - // Corresponding [`ComRecTranscoderBuilder`] object - pub comrec_builder: Box, -} - -impl core::fmt::Debug for TypeInfo { - fn fmt(&self, writer: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - writer.write_fmt(format_args!( - "[ type_name: {:?}, decser_builder: Box(?) ]", - self.type_name - )) - } -} - -#[macro_export] -macro_rules! register_type { - ($registry:ident, $type:ty: $name:expr, $input:expr) => { - $registry.add::<$type>(Some($name), $input) - }; - ($registry:ident, $type:ty, $input:expr) => { - $registry.add::<$type>(None, $input) - }; -} - -#[macro_export] -macro_rules! register_types { - ($registry:ident; $($type:ty $(:$name:expr)?, $input_builder:expr);+ $(,)?) => { - $( - register_type!( - $registry, - $type $(:$name)?, - $input_builder - ) - );+ - }; -} - -///////////// -// Tests -///////////// - -#[test] -fn test_type_registry() { - #[derive(Debug)] - // Dummy input implementation for the test - struct DummyInput; - - impl ActivityInput for DummyInput { - fn read(&self) -> Result, feo_com::interface::Error> { - todo!() - } - } - - #[derive(Debug, serde::Serialize, postcard::experimental::max_size::MaxSize)] - struct TestType1 {} - - #[derive(Debug, serde::Serialize, postcard::experimental::max_size::MaxSize)] - struct TestType2 {} - - #[derive(Debug, serde::Serialize, postcard::experimental::max_size::MaxSize)] - struct TestType3 {} - - let mut registry = TypeRegistry::default(); - register_types!(registry; TestType1, |_: &str| Box::new(DummyInput); TestType2, |_: &str| Box::new(DummyInput); TestType3: "my_test_type3_name", |_: &str| Box::new(DummyInput)); - - // test presence and data of entry for TestType1 - let type_name = core::any::type_name::(); - assert!(registry.map.contains_key(&type_name)); - assert!(registry.info_name(type_name).is_some()); - assert_eq!(registry.info_name(type_name).unwrap().type_name, type_name); - - // test presence and data of entry for TestType3 - let type_name = "my_test_type3_name"; - assert!(registry.map.contains_key(&type_name)); - assert!(registry.info_name(type_name).is_some()); - assert_eq!(registry.info_name(type_name).unwrap().type_name, type_name); - - // test missing type Foo - struct Foo {} - let type_name = core::any::type_name::(); - assert!(!registry.map.contains_key(&type_name)); - assert!(registry.info_name(type_name).is_none()); -} diff --git a/src/feo/src/recording/transcoder.rs b/src/feo/src/recording/transcoder.rs deleted file mode 100644 index b28278e..0000000 --- a/src/feo/src/recording/transcoder.rs +++ /dev/null @@ -1,104 +0,0 @@ -// ******************************************************************************* -// Copyright (c) 2025 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache License Version 2.0 which is available at -// -// -// SPDX-License-Identifier: Apache-2.0 -// ******************************************************************************* - -//! Transcoders between com layer format and serialization for recording - -use alloc::boxed::Box; -use alloc::string::String; -use core::ops::Deref as _; -use feo_com::interface::ActivityInput; -use score_log::fmt::ScoreDebug; -use score_log::info; -use serde::Serialize; - -/// Transcode data of the given type from com layer representation to recording serialization -pub(crate) struct RecordingTranscoder { - input: Box>, - topic: String, - type_name: String, -} - -impl RecordingTranscoder { - /// Create a transcoder reading from the given com layer topic - pub fn build( - input_builder: impl Fn(&str) -> Box> + Send, - topic: String, - type_name: String, - ) -> Box { - let input = input_builder(&topic); - Box::new(RecordingTranscoder:: { - input, - topic, - type_name, - }) - } - - /// Read com layer data and serialize them for recording - pub fn read_and_serialize<'a>(&self, buf: &'a mut [u8]) -> Option<&'a mut [u8]> { - let input = self.input.read(); - if let Ok(value) = input { - let value = value.deref(); - info!("Serializing {:?}", value); - let written = postcard::to_slice(value, buf).expect("serialization failed"); - return Some(written); - } - None - } -} - -/// Trait implementing reading and transcoding of com data for recording -pub trait ComRecTranscoder { - /// Read com layer data and serialize them for recording - fn read_transcode<'a>(&self, buf: &'a mut [u8]) -> Option<&'a mut [u8]>; - - /// Maximum buffer size required for serialization - fn buffer_size(&self) -> usize; - - // Get the topic to which this transcoder is connected - fn topic(&self) -> &str; - - // Get the type name of data this transcoder is transcoding - fn type_name(&self) -> &str; -} - -/// Implement the recording-and-serialization trait for all [`RecordingTranscoder`] types -impl ComRecTranscoder - for RecordingTranscoder -{ - fn buffer_size(&self) -> usize { - T::POSTCARD_MAX_SIZE - } - fn read_transcode<'a>(&self, buf: &'a mut [u8]) -> Option<&'a mut [u8]> { - self.read_and_serialize(buf) - } - - fn topic(&self) -> &str { - &self.topic - } - - // Get the type name of data this transcoder is transcoding - fn type_name(&self) -> &str { - &self.type_name - } -} - -/// Builder trait for a [`ComRecTranscoder`] object -/// -/// A builder is a function taking a com layer topic and creating a [`ComRecTranscoder`] object -/// for that topic -pub trait ComRecTranscoderBuilder: Fn(&'static str) -> Box {} - -/// Implement the builder trait for any function matching the [`ComRecTranscoderBuilder`] builder trait. -/// -/// In particular, this will apply to the [`build`] method of [`RecordingTranscoder`] -impl Box> ComRecTranscoderBuilder for T {} diff --git a/src/feo/src/topicspec.rs b/src/feo/src/topicspec.rs index 5af6b8d..b0b3727 100644 --- a/src/feo/src/topicspec.rs +++ b/src/feo/src/topicspec.rs @@ -16,10 +16,10 @@ use crate::ids::ActivityId; use alloc::boxed::Box; use alloc::vec::Vec; -use core::fmt; +use core::fmt::Debug; use feo_com::interface::{ init_topic_primary, init_topic_secondary, ComBackendTopicPrimaryInitialization, - ComBackendTopicSecondaryInitialization, Topic, TopicHandle, + ComBackendTopicSecondaryInitialization, FeoComData, FeoComDefault, Topic, TopicHandle, }; use score_log::fmt::ScoreDebug; @@ -46,7 +46,7 @@ pub struct TopicSpecification<'a> { } impl<'a> TopicSpecification<'a> { - pub fn new( + pub fn new( topic: Topic<'a>, peers: Vec<(ActivityId, Direction)>, ) -> Self {