diff --git a/crates/lgp-cli/src/experiment_runner.rs b/crates/lgp-cli/src/experiment_runner.rs index 1a94d2f1..a19196d2 100644 --- a/crates/lgp-cli/src/experiment_runner.rs +++ b/crates/lgp-cli/src/experiment_runner.rs @@ -152,6 +152,7 @@ fn run_iris( n_generations: config.hyperparameters.n_generations, n_trials: config.hyperparameters.n_trials, seed: Some(seed), + n_threads: config.hyperparameters.n_threads, program_parameters: build_program_params(config), }; @@ -173,6 +174,7 @@ fn run_cart_pole_lgp( n_generations: config.hyperparameters.n_generations, n_trials: config.hyperparameters.n_trials, seed: Some(seed), + n_threads: config.hyperparameters.n_threads, program_parameters: build_program_params(config), }; @@ -208,6 +210,7 @@ fn run_cart_pole_q( n_generations: config.hyperparameters.n_generations, n_trials: config.hyperparameters.n_trials, seed: Some(seed), + n_threads: config.hyperparameters.n_threads, program_parameters: q_program_params, }; @@ -229,6 +232,7 @@ fn run_mountain_car_lgp( n_generations: config.hyperparameters.n_generations, n_trials: config.hyperparameters.n_trials, seed: Some(seed), + n_threads: config.hyperparameters.n_threads, program_parameters: build_program_params(config), }; @@ -264,6 +268,7 @@ fn run_mountain_car_q( n_generations: config.hyperparameters.n_generations, n_trials: config.hyperparameters.n_trials, seed: Some(seed), + n_threads: config.hyperparameters.n_threads, program_parameters: q_program_params, }; diff --git a/crates/lgp/Cargo.toml b/crates/lgp/Cargo.toml index 1f9d4473..9f676dac 100644 --- a/crates/lgp/Cargo.toml +++ b/crates/lgp/Cargo.toml @@ -42,3 +42,7 @@ criterion = "0.4.0" name = "performance_after_training" harness = false required-features = ["gym"] + +[[bench]] +name = "parallel_fitness" +harness = false diff --git a/crates/lgp/benches/parallel_fitness.rs b/crates/lgp/benches/parallel_fitness.rs new file mode 100644 index 00000000..268f4f99 --- /dev/null +++ b/crates/lgp/benches/parallel_fitness.rs @@ -0,0 +1,124 @@ +use std::iter::repeat_with; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use itertools::Itertools; +use lgp::{ + core::{ + engines::{ + core_engine::Core, fitness_engine::Fitness, generate_engine::Generate, + reset_engine::Reset, status_engine::Status, + }, + instruction::InstructionGeneratorParameters, + program::ProgramGeneratorParameters, + }, + problems::iris::{IrisEngine, IrisState}, +}; +use rayon::prelude::*; + +/// Sequential fitness evaluation (pre-parallelization baseline). +fn eval_fitness_sequential( + population: &mut Vec, + trials: &[C::State], + default_fitness: f64, +) { + let n_trials = trials.len(); + for individual in population.iter_mut() { + let total: f64 = trials + .iter() + .cloned() + .map(|mut trial| { + C::Reset::reset(individual); + C::Reset::reset(&mut trial); + let score = C::Fitness::eval_fitness(individual, &mut trial); + if score.is_finite() { + score + } else { + default_fitness + } + }) + .sum(); + C::Status::set_fitness(individual, total / n_trials as f64); + } +} + +/// Parallel fitness evaluation (current implementation). +fn eval_fitness_parallel( + population: &mut Vec, + trials: &[C::State], + default_fitness: f64, +) { + let n_trials = trials.len(); + population.par_iter_mut().for_each(|individual| { + let total: f64 = trials + .iter() + .cloned() + .map(|mut trial| { + C::Reset::reset(individual); + C::Reset::reset(&mut trial); + let score = C::Fitness::eval_fitness(individual, &mut trial); + if score.is_finite() { + score + } else { + default_fitness + } + }) + .sum(); + C::Status::set_fitness(individual, total / n_trials as f64); + }); +} + +fn parallel_vs_sequential(c: &mut Criterion) { + let mut group = c.benchmark_group("fitness_evaluation"); + + let n_trials = 5; + let default_fitness = 0.0; + + let trials: Vec = repeat_with(|| ::Generate::generate(())) + .take(n_trials) + .collect_vec(); + + for pop_size in [50, 100, 200, 500] { + let population: Vec<_> = repeat_with(|| { + ::Generate::generate(ProgramGeneratorParameters { + max_instructions: 100, + instruction_generator_parameters: InstructionGeneratorParameters { + n_extras: 1, + external_factor: 10.0, + n_actions: 3, + n_inputs: 4, + }, + }) + }) + .take(pop_size) + .collect(); + + group.bench_with_input( + BenchmarkId::new("sequential", pop_size), + &pop_size, + |b, _| { + b.iter_batched( + || population.clone(), + |mut pop| { + eval_fitness_sequential::(&mut pop, &trials, default_fitness); + }, + criterion::BatchSize::SmallInput, + ); + }, + ); + + group.bench_with_input(BenchmarkId::new("parallel", pop_size), &pop_size, |b, _| { + b.iter_batched( + || population.clone(), + |mut pop| { + eval_fitness_parallel::(&mut pop, &trials, default_fitness); + }, + criterion::BatchSize::SmallInput, + ); + }); + } + + group.finish(); +} + +criterion_group!(benches, parallel_vs_sequential); +criterion_main!(benches); diff --git a/crates/lgp/src/core/config.rs b/crates/lgp/src/core/config.rs index e3584f1f..d05c8fc0 100644 --- a/crates/lgp/src/core/config.rs +++ b/crates/lgp/src/core/config.rs @@ -72,6 +72,10 @@ pub struct ExperimentParams { #[arg(long)] pub seed: Option, + /// Number of threads for parallel evaluation (defaults to all available cores) + #[arg(long)] + pub n_threads: Option, + /// Fitness assigned to invalid programs (overridden per environment if not set) #[arg(long)] pub default_fitness: Option, @@ -236,6 +240,7 @@ impl ExperimentParams { n_generations: self.n_generations, n_trials: self.n_trials, seed: self.seed, + n_threads: self.n_threads, program_parameters: self.build_program_params(), }; run_experiment!(hyperparameters); @@ -252,6 +257,7 @@ impl ExperimentParams { n_generations: self.n_generations, n_trials: self.n_trials, seed: self.seed, + n_threads: self.n_threads, program_parameters: self.build_q_program_params(), }; ResetEngine::reset(&mut hyperparameters.program_parameters.consts); @@ -269,6 +275,7 @@ impl ExperimentParams { n_generations: self.n_generations, n_trials: self.n_trials, seed: self.seed, + n_threads: self.n_threads, program_parameters: self.build_program_params(), }; run_experiment!(hyperparameters); @@ -285,6 +292,7 @@ impl ExperimentParams { n_generations: self.n_generations, n_trials: self.n_trials, seed: self.seed, + n_threads: self.n_threads, program_parameters: self.build_q_program_params(), }; ResetEngine::reset(&mut hyperparameters.program_parameters.consts); @@ -300,6 +308,7 @@ impl ExperimentParams { n_generations: self.n_generations, n_trials: self.n_trials, seed: self.seed, + n_threads: self.n_threads, program_parameters: self.build_program_params(), }; run_experiment!(hyperparameters); diff --git a/crates/lgp/src/core/engines/core_engine.rs b/crates/lgp/src/core/engines/core_engine.rs index 5e6f5f79..1b985036 100644 --- a/crates/lgp/src/core/engines/core_engine.rs +++ b/crates/lgp/src/core/engines/core_engine.rs @@ -4,6 +4,7 @@ use clap::{Args, Parser}; use derivative::Derivative; use itertools::Itertools; use rand::{seq::IteratorRandom, Rng}; +use rayon::prelude::*; use crate::{ core::{ @@ -53,6 +54,9 @@ where #[builder(default = "None")] #[arg(long)] pub seed: Option, + #[builder(default = "None")] + #[arg(long)] + pub n_threads: Option, #[command(flatten)] pub program_parameters: C::ProgramParameters, } @@ -78,7 +82,8 @@ where gap = hp.gap, mutation_percent = hp.mutation_percent, crossover_percent = hp.crossover_percent, - seed = ?hp.seed + seed = ?hp.seed, + n_threads = ?hp.n_threads ))] pub fn new(hp: HyperParameters) -> Self { debug!("Initializing evolution engine"); @@ -116,11 +121,7 @@ where let mut population = self.next_population.clone(); - C::eval_fitness( - &mut population, - &mut self.trials, - self.params.default_fitness, - ); + C::eval_fitness(&mut population, &self.trials, self.params.default_fitness); C::rank(&mut population); assert!(population.iter().all(C::Status::evaluated)); @@ -182,6 +183,14 @@ where { pub fn build_engine(&self) -> CoreIter { update_seed(self.seed); + + if let Some(n_threads) = self.n_threads { + rayon::ThreadPoolBuilder::new() + .num_threads(n_threads) + .build_global() + .ok(); + } + CoreIter::new(self.clone()) } } @@ -189,7 +198,7 @@ where pub trait Core { type Individual: Ord + Clone + Send + Sync + Serialize + DeserializeOwned; type ProgramParameters: Copy + Send + Sync + Clone + Serialize + DeserializeOwned + Args; - type State: State; + type State: State + Clone + Send + Sync; type FitnessMarker; type Generate: Generate + Generate<(), Self::State>; type Fitness: Fitness; @@ -210,27 +219,27 @@ pub trait Core { fn eval_fitness( population: &mut Vec, - trials: &mut Vec, + trials: &[Self::State], default_fitness: f64, ) { - for individual in population.iter_mut() { - let mut scores = trials - .iter_mut() - .map(|trial| { + let n_trials = trials.len(); + population.par_iter_mut().for_each(|individual| { + let total: f64 = trials + .iter() + .cloned() + .map(|mut trial| { Self::Reset::reset(individual); - Self::Reset::reset(trial); - Self::Fitness::eval_fitness(individual, trial) + Self::Reset::reset(&mut trial); + let score = Self::Fitness::eval_fitness(individual, &mut trial); + if score.is_finite() { + score + } else { + default_fitness + } }) - .collect_vec(); - - let n_trials = scores.len(); - scores = scores - .into_iter() - .map(|s| if !s.is_finite() { default_fitness } else { s }) - .collect_vec(); - let average = scores.into_iter().sum::() / n_trials as f64; - Self::Status::set_fitness(individual, average); - } + .sum(); + Self::Status::set_fitness(individual, total / n_trials as f64); + }); } fn rank(population: &mut Vec) { diff --git a/crates/lgp/src/core/experiment_config.rs b/crates/lgp/src/core/experiment_config.rs index 20ae8507..fdf69216 100644 --- a/crates/lgp/src/core/experiment_config.rs +++ b/crates/lgp/src/core/experiment_config.rs @@ -99,6 +99,9 @@ pub struct HyperParams { /// Serialized as a string to support values > i64::MAX in TOML format. #[serde(default, with = "optional_u64_as_string")] pub seed: Option, + /// Number of threads for parallel evaluation. If None, uses all available cores. + #[serde(default)] + pub n_threads: Option, pub program: ProgramConfig, } diff --git a/crates/lgp/src/problems/gym.rs b/crates/lgp/src/problems/gym.rs index 185957d8..74ccb4d1 100644 --- a/crates/lgp/src/problems/gym.rs +++ b/crates/lgp/src/problems/gym.rs @@ -23,9 +23,9 @@ use crate::extensions::interactive::UseRlFitness; use crate::extensions::q_learning::QProgram; use crate::extensions::q_learning::QProgramGeneratorParameters; -pub trait GymRsEnvExt: Env +pub trait GymRsEnvExt: Env + Send + Sync where - Self::Observation: Copy + Into>, + Self::Observation: Copy + Into> + Send + Sync, { fn create() -> Self; fn max_steps() -> usize; @@ -59,7 +59,7 @@ impl GymRsEnvExt for MountainCarEnv { #[derive(Clone, Debug)] pub struct GymRsInput where - E::Observation: Copy + Into>, + E::Observation: Copy + Into> + Send + Sync, { environment: E, terminated: bool, @@ -71,7 +71,7 @@ where impl State for GymRsInput where E: GymRsEnvExt, - E::Observation: Copy + Into>, + E::Observation: Copy + Into> + Send + Sync, { fn get_value(&self, idx: usize) -> f64 { self.observation[idx] @@ -98,7 +98,7 @@ where impl RlState for GymRsInput where T: GymRsEnvExt, - T::Observation: Copy + Into>, + T::Observation: Copy + Into> + Send + Sync, { fn is_terminal(&mut self) -> bool { self.terminated @@ -112,7 +112,7 @@ where impl Reset> for ResetEngine where T: GymRsEnvExt, - T::Observation: Copy + Into>, + T::Observation: Copy + Into> + Send + Sync, { fn reset(item: &mut GymRsInput) { item.environment.reset(None, false, None); @@ -126,7 +126,7 @@ where impl Generate<(), GymRsInput> for GenerateEngine where T: GymRsEnvExt, - T::Observation: Copy + Into>, + T::Observation: Copy + Into> + Send + Sync, { fn generate(_from: ()) -> GymRsInput { let mut environment: T = T::create(); @@ -151,7 +151,7 @@ pub struct GymRsEngine(PhantomData); impl Core for GymRsQEngine where T: GymRsEnvExt, - T::Observation: Copy + Into>, + T::Observation: Copy + Into> + Send + Sync, { type Individual = QProgram; type ProgramParameters = QProgramGeneratorParameters; @@ -169,7 +169,7 @@ where impl Core for GymRsEngine where T: GymRsEnvExt, - T::Observation: Copy + Into>, + T::Observation: Copy + Into> + Send + Sync, { type Individual = Program; type ProgramParameters = ProgramGeneratorParameters; diff --git a/crates/lgp/src/problems/iris.rs b/crates/lgp/src/problems/iris.rs index ad5fa6bb..30df7dbe 100644 --- a/crates/lgp/src/problems/iris.rs +++ b/crates/lgp/src/problems/iris.rs @@ -57,6 +57,7 @@ pub struct IrisInput { class: IrisClass, } +#[derive(Clone)] pub struct IrisState { data: Vec, idx: usize, diff --git a/crates/lgp/src/utils/benchmark_tools.rs b/crates/lgp/src/utils/benchmark_tools.rs index 982fcdd3..e52ca345 100644 --- a/crates/lgp/src/utils/benchmark_tools.rs +++ b/crates/lgp/src/utils/benchmark_tools.rs @@ -21,12 +21,12 @@ where let program = C::Individual::load(program_path); let original_fitness = C::Status::get_fitness(&program); - let mut trials: Vec = repeat_with(|| C::Generate::generate(())) + let trials: Vec = repeat_with(|| C::Generate::generate(())) .take(n_trials) .collect_vec(); let mut population = vec![program]; - C::eval_fitness(&mut population, &mut trials, default_fitness); + C::eval_fitness(&mut population, &trials, default_fitness); let new_fitness = C::Status::get_fitness(population.first().unwrap()); diff --git a/crates/lgp/src/utils/tracing.rs b/crates/lgp/src/utils/tracing.rs index a846c0f0..fbabf7ba 100644 --- a/crates/lgp/src/utils/tracing.rs +++ b/crates/lgp/src/utils/tracing.rs @@ -201,10 +201,17 @@ impl TracingConfig { } } +/// Guards returned by tracing initialization. Must be held for the program +/// lifetime to ensure all non-blocking log writes are flushed. +pub struct TracingGuard { + _guards: Vec, +} + /// Initialize the tracing subscriber with the given configuration. /// -/// Returns a `WorkerGuard` if file logging is enabled. This guard must be held -/// for the duration of the program to ensure all logs are flushed to the file. +/// Returns a [`TracingGuard`] that must be held for the duration of the program +/// to ensure all logs are flushed. All writers (stdout and file) use non-blocking +/// I/O so that high-volume debug/trace logging does not block computation. /// /// This function should be called once at application startup, before any /// tracing macros are used. @@ -223,7 +230,7 @@ impl TracingConfig { /// /// This function will panic if called more than once, as the global subscriber /// can only be set once. -pub fn init_tracing(config: TracingConfig) -> Option { +pub fn init_tracing(config: TracingConfig) -> TracingGuard { // Check for format override via environment variable let format = env::var("LGP_LOG_FORMAT") .ok() @@ -257,23 +264,26 @@ pub fn init_tracing(config: TracingConfig) -> Option { .open(log_path) .expect("Failed to open log file"); - let (non_blocking, guard) = tracing_appender::non_blocking(file); + let (non_blocking, file_guard) = tracing_appender::non_blocking(file); // Build subscriber with file layer (and optionally stdout) + let mut guards = vec![file_guard]; if config.log_to_stdout { - // Both file and stdout - init_with_file_and_stdout(format, filter, span_events, &config, non_blocking); + let stdout_guard = + init_with_file_and_stdout(format, filter, span_events, &config, non_blocking); + guards.push(stdout_guard); } else { - // File only init_with_file_only(format, filter, span_events, &config, non_blocking); } - return Some(guard); + return TracingGuard { _guards: guards }; } - // Standard stdout-only setup - init_stdout_only(format, filter, span_events, &config); - None + // Standard stdout-only setup (also non-blocking) + let stdout_guard = init_stdout_only(format, filter, span_events, &config); + TracingGuard { + _guards: vec![stdout_guard], + } } /// Initialize tracing with file output only. @@ -336,13 +346,18 @@ fn init_with_file_only( } /// Initialize tracing with both file and stdout output. +/// +/// Returns a `WorkerGuard` for the non-blocking stdout writer that must be held +/// alongside the file guard for proper cleanup. fn init_with_file_and_stdout( format: TracingFormat, filter: EnvFilter, span_events: FmtSpan, config: &TracingConfig, file_writer: tracing_appender::non_blocking::NonBlocking, -) { +) -> WorkerGuard { + let (nb_stdout, stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); + match format { TracingFormat::Pretty => { let file_layer = fmt::layer() @@ -356,6 +371,7 @@ fn init_with_file_and_stdout( .with_thread_names(config.thread_names) .with_target(config.target); let stdout_layer = fmt::layer() + .with_writer(nb_stdout) .pretty() .with_span_events(span_events) .with_file(config.file_info) @@ -382,6 +398,7 @@ fn init_with_file_and_stdout( .with_thread_names(config.thread_names) .with_target(config.target); let stdout_layer = fmt::layer() + .with_writer(nb_stdout) .compact() .with_span_events(span_events) .with_file(config.file_info) @@ -407,6 +424,7 @@ fn init_with_file_and_stdout( .with_thread_names(config.thread_names) .with_target(config.target); let stdout_layer = fmt::layer() + .with_writer(nb_stdout) .json() .with_span_events(span_events) .with_file(config.file_info) @@ -422,19 +440,27 @@ fn init_with_file_and_stdout( .expect("Failed to set tracing subscriber"); } } + + stdout_guard } -/// Initialize tracing with stdout only. +/// Initialize tracing with stdout only (non-blocking). +/// +/// Returns a `WorkerGuard` for the non-blocking stdout writer that must be held +/// for the program lifetime to ensure all logs are flushed. fn init_stdout_only( format: TracingFormat, filter: EnvFilter, span_events: FmtSpan, config: &TracingConfig, -) { +) -> WorkerGuard { + let (nb_stdout, guard) = tracing_appender::non_blocking(std::io::stdout()); + match format { TracingFormat::Pretty => { let subscriber = tracing_subscriber::registry().with(filter).with( fmt::layer() + .with_writer(nb_stdout) .pretty() .with_span_events(span_events) .with_file(config.file_info) @@ -449,6 +475,7 @@ fn init_stdout_only( TracingFormat::Compact => { let subscriber = tracing_subscriber::registry().with(filter).with( fmt::layer() + .with_writer(nb_stdout) .compact() .with_span_events(span_events) .with_file(config.file_info) @@ -463,6 +490,7 @@ fn init_stdout_only( TracingFormat::Json => { let subscriber = tracing_subscriber::registry().with(filter).with( fmt::layer() + .with_writer(nb_stdout) .json() .with_span_events(span_events) .with_file(config.file_info) @@ -475,6 +503,8 @@ fn init_stdout_only( .expect("Failed to set tracing subscriber"); } } + + guard } /// Try to initialize tracing, returning Ok if successful or if already initialized.