Skip to content

Commit 5ff9ff7

Browse files
committed
feat: Retry zombie jobs and emit events
Pass EventBus into zombie_handler_task and use fail_job_or_retry to determine the outcome. On permanent failure publish SchedulerEvent::JobCompleted (with final state and resources); on retry publish SchedulerEvent::JobUpdated. Add a NoopExecutor test stub and zombie_handler_retries_jobs_with_remaining_retry_budget test. Update the spawn call to pass a cloned EventBus instance.
1 parent 19b76a7 commit 5ff9ff7

2 files changed

Lines changed: 109 additions & 19 deletions

File tree

src/multicall/gflowd/scheduler_runtime/event_loop.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ pub async fn run_event_driven(
3737
),
3838
// Zombie handler - reacts to zombie events
3939
tokio::spawn(
40-
super::monitors::zombie_handler_task(event_bus.subscribe(), Arc::clone(&shared_state))
41-
.instrument(tracing::info_span!("zombie_handler_task")),
40+
super::monitors::zombie_handler_task(
41+
event_bus.subscribe(),
42+
Arc::clone(&shared_state),
43+
Arc::clone(&event_bus),
44+
)
45+
.instrument(tracing::info_span!("zombie_handler_task")),
4246
),
4347
// Timeout monitor - checks time limits every 10s
4448
tokio::spawn(

src/multicall/gflowd/scheduler_runtime/monitors.rs

Lines changed: 103 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ pub(super) async fn zombie_monitor_task(state: SharedState, event_bus: Arc<Event
112112
pub(super) async fn zombie_handler_task(
113113
mut events: tokio::sync::broadcast::Receiver<EventEnvelope>,
114114
state: SharedState,
115+
event_bus: Arc<EventBus>,
115116
) {
116117
loop {
117118
match events.recv().await {
@@ -121,27 +122,38 @@ pub(super) async fn zombie_handler_task(
121122
let SchedulerEvent::ZombieJobDetected { job_id } = event.event else {
122123
continue;
123124
};
124-
// Get run_name before acquiring write lock
125-
let run_name = {
125+
let (_success, gpu_ids, memory_mb) = {
126126
let state_guard = state.read().await;
127-
state_guard
128-
.scheduler
129-
.get_job_spec(job_id)
130-
.and_then(|spec| spec.run_name.clone())
127+
if let Some(job) = state_guard.get_job(job_id) {
128+
(true, job.gpu_ids.clone(), job.memory_limit_mb)
129+
} else {
130+
(false, None, None)
131+
}
131132
};
132133

133-
// Update job state (write lock)
134134
let mut state_guard = state.write().await;
135-
// Use fail_job to properly update group_running_count index
136-
state_guard.scheduler.fail_job(job_id);
137-
state_guard.mark_dirty();
138-
tracing::info!(job_id, "Marked zombie job as failed");
139-
drop(state_guard); // Release lock before disabling PipePane
140-
141-
// Disable PipePane if session still exists (no lock held)
142-
// This handles the case where the session was manually killed but PipePane might still be active
143-
if let Some(rn) = run_name {
144-
disable_pipe_pane_for_job(job_id, &rn, true);
135+
let outcome = state_guard.fail_job_or_retry(job_id).await;
136+
drop(state_guard);
137+
138+
match outcome {
139+
FailJobOutcome::Failed => {
140+
tracing::info!(job_id, "Marked zombie job as failed");
141+
event_bus.publish(SchedulerEvent::JobCompleted {
142+
job_id,
143+
final_state: JobState::Failed,
144+
gpu_ids,
145+
memory_mb,
146+
});
147+
}
148+
FailJobOutcome::Retried { retry_attempt } => {
149+
tracing::info!(
150+
job_id,
151+
retry_attempt,
152+
"Marked zombie job for automatic retry"
153+
);
154+
event_bus.publish(SchedulerEvent::JobUpdated { job_id });
155+
}
156+
FailJobOutcome::NotFound => {}
145157
}
146158
}
147159
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
@@ -381,9 +393,20 @@ fn calculate_next_reservation_transition(
381393
#[cfg(test)]
382394
mod tests {
383395
use super::*;
396+
use crate::multicall::gflowd::scheduler_runtime::SchedulerRuntime;
397+
use gflow::core::executor::Executor;
398+
use gflow::core::job::Job;
384399
use gflow::core::reservation::{GpuReservation, GpuSpec, ReservationStatus};
385400
use std::time::{Duration, SystemTime};
386401

402+
struct NoopExecutor;
403+
404+
impl Executor for NoopExecutor {
405+
fn execute(&self, _job: &Job) -> anyhow::Result<()> {
406+
Ok(())
407+
}
408+
}
409+
387410
#[test]
388411
fn zombie_check_allows_legacy_jobs_without_start_time() {
389412
let now = SystemTime::now();
@@ -411,6 +434,69 @@ mod tests {
411434
assert!(should_check_missing_session_as_zombie(started_at, now));
412435
}
413436

437+
#[tokio::test]
438+
async fn zombie_handler_retries_jobs_with_remaining_retry_budget() {
439+
let dir = tempfile::tempdir().unwrap();
440+
let state = Arc::new(tokio::sync::RwLock::new(
441+
SchedulerRuntime::with_state_path(
442+
Box::new(NoopExecutor),
443+
dir.path().to_path_buf(),
444+
None,
445+
gflow::core::gpu_allocation::GpuAllocationStrategy::Sequential,
446+
gflow::config::ProjectsConfig::default(),
447+
)
448+
.unwrap(),
449+
));
450+
let event_bus = Arc::new(EventBus::new(16));
451+
let mut observed_events = event_bus.subscribe();
452+
453+
let job_id = {
454+
let mut state_guard = state.write().await;
455+
let job = Job::builder()
456+
.command("false")
457+
.submitted_by("alice")
458+
.max_retry(Some(1))
459+
.build();
460+
let (job_id, _run_name, _job) = state_guard.submit_job(job).await.unwrap();
461+
let prepared = state_guard.scheduler.prepare_jobs_for_execution();
462+
assert_eq!(prepared.len(), 1);
463+
assert_eq!(prepared[0].id, job_id);
464+
job_id
465+
};
466+
467+
let handler = tokio::spawn(zombie_handler_task(
468+
event_bus.subscribe(),
469+
Arc::clone(&state),
470+
Arc::clone(&event_bus),
471+
));
472+
473+
event_bus.publish(SchedulerEvent::ZombieJobDetected { job_id });
474+
475+
let event = tokio::time::timeout(Duration::from_secs(1), async {
476+
loop {
477+
let envelope = observed_events.recv().await.unwrap();
478+
if let SchedulerEvent::JobUpdated {
479+
job_id: updated_job_id,
480+
} = envelope.event
481+
{
482+
break updated_job_id;
483+
}
484+
}
485+
})
486+
.await
487+
.unwrap();
488+
489+
assert_eq!(event, job_id);
490+
491+
let state_guard = state.read().await;
492+
let job = state_guard.get_job(job_id).unwrap();
493+
assert_eq!(job.state, JobState::Queued);
494+
assert_eq!(job.retry_attempt, 1);
495+
drop(state_guard);
496+
497+
handler.abort();
498+
}
499+
414500
#[test]
415501
fn test_calculate_next_transition_no_reservations() {
416502
let reservations = vec![];

0 commit comments

Comments
 (0)