From 00511227db3cd29ef0162709445f34f162c1df85 Mon Sep 17 00:00:00 2001 From: Thomas van Gurp Date: Mon, 9 Mar 2026 15:19:09 -0400 Subject: [PATCH] Detect and disconnect stalled validators/screeners with zero evaluation progress When a screener's inference gateway is broken (e.g. returning 502 for all requests), the screener stays alive (heartbeats continue) but no evaluation runs ever complete. This blocks the miner's agent from being reassigned to a working screener. This adds a background loop that checks every 60s (configurable) for validators that have been running an evaluation for over 15 minutes (configurable) with zero runs progressed past the running_agent phase. When detected, the validator is disconnected and its evaluation runs are marked as errored, allowing the agent to be picked up by another screener. New env vars (with defaults, no .env changes required): - VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS (default: 900) - VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS (default: 60) Co-Authored-By: Claude Opus 4.6 --- api/config.py | 8 +++ api/endpoints/validator.py | 59 +++++++++++++++++++++++ api/loops/validator_stalled_evaluation.py | 15 ++++++ api/src/main.py | 4 +- 4 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 api/loops/validator_stalled_evaluation.py diff --git a/api/config.py b/api/config.py index 4fbd9a33..9fd3e21e 100644 --- a/api/config.py +++ b/api/config.py @@ -152,6 +152,12 @@ logger.fatal("VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS is not set in .env") VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS = int(VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS) +VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS = os.getenv("VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS", "900") +VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS = int(VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS) + +VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS = os.getenv("VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS", "60") +VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS = int(VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS) + # Load validator configuration (sent to validator upon registration) @@ -238,6 +244,8 @@ logger.info(f"Validator Heartbeat Timeout: {VALIDATOR_HEARTBEAT_TIMEOUT_SECONDS} second(s)") logger.info(f"Validator Heartbeat Timeout Interval: {VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS} second(s)") +logger.info(f"Validator Stalled Evaluation Timeout: {VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS} second(s)") +logger.info(f"Validator Stalled Evaluation Check Interval: {VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS} second(s)") logger.info("-------------------------") logger.info(f"Validator Running Agent Timeout: {VALIDATOR_RUNNING_AGENT_TIMEOUT_SECONDS} second(s)") diff --git a/api/endpoints/validator.py b/api/endpoints/validator.py index 3ca5e29d..c9e654a7 100644 --- a/api/endpoints/validator.py +++ b/api/endpoints/validator.py @@ -101,6 +101,65 @@ async def delete_validators_that_have_not_sent_a_heartbeat() -> None: +# Detects validators/screeners that are alive (sending heartbeats) but whose +# evaluation runs have made no progress. This happens when a screener's +# inference gateway is broken — the agent retries forever, no runs complete, +# and the miner's agent is stuck until the evaluation times out. +async def detect_and_handle_stalled_evaluations() -> None: + logger.info("Checking for stalled evaluations...") + + now = datetime.now(timezone.utc) + _validators = list(SESSION_ID_TO_VALIDATOR.values()) + + for validator in _validators: + # Skip validators not currently running an evaluation + if validator.current_evaluation_id is None: + continue + + # Skip if we don't have a current_evaluation with a created_at timestamp + if validator.current_evaluation is None: + continue + + evaluation_age_seconds = (now - validator.current_evaluation.created_at).total_seconds() + + # Only check evaluations that have been running longer than the threshold + if evaluation_age_seconds < config.VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS: + continue + + # Query the evaluation runs to check for progress + evaluation_runs = await get_all_evaluation_runs_in_evaluation_id(validator.current_evaluation_id) + + # Count how many runs have progressed beyond running_agent (i.e. actually produced output) + completed_or_progressed = sum( + 1 for run in evaluation_runs + if run.status in [ + EvaluationRunStatus.initializing_eval, + EvaluationRunStatus.running_eval, + EvaluationRunStatus.finished, + EvaluationRunStatus.error, + ] + ) + + if completed_or_progressed == 0: + logger.warning( + f"Stalled evaluation detected for validator '{validator.name}': " + f"evaluation {validator.current_evaluation_id} has been running for " + f"{int(evaluation_age_seconds)}s with 0/{len(evaluation_runs)} runs progressed past running_agent. " + f"Disconnecting validator." + ) + + async with DebugLock(validator._lock, f"detect_and_handle_stalled_evaluations() for {validator.name}'s lock"): + if validator.session_id in SESSION_ID_TO_VALIDATOR: + await delete_validator( + validator, + f"The validator was disconnected because its evaluation had 0 completed runs " + f"after {int(evaluation_age_seconds)} seconds (stalled evaluation detection)." + ) + + logger.info("Finished checking for stalled evaluations") + + + # Dependency to get the validator associated with the request # Requires that the request has a valid "Authorization: Bearer " header # See validator_request_evaluation() and other endpoints for usage examples diff --git a/api/loops/validator_stalled_evaluation.py b/api/loops/validator_stalled_evaluation.py new file mode 100644 index 00000000..ab6134ec --- /dev/null +++ b/api/loops/validator_stalled_evaluation.py @@ -0,0 +1,15 @@ +import asyncio +import api.config as config +import utils.logger as logger + +from api.endpoints.validator import detect_and_handle_stalled_evaluations + + + +async def validator_stalled_evaluation_loop(): + logger.info("Starting validator stalled evaluation detection loop...") + + while True: + await detect_and_handle_stalled_evaluations() + + await asyncio.sleep(config.VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS) diff --git a/api/src/main.py b/api/src/main.py index 4e90f925..4cd4bb53 100644 --- a/api/src/main.py +++ b/api/src/main.py @@ -9,6 +9,7 @@ from api.loops.fetch_metagraph import fetch_metagraph_loop from api.loops.validator_heartbeat_timeout import validator_heartbeat_timeout_loop +from api.loops.validator_stalled_evaluation import validator_stalled_evaluation_loop from queries.evaluation import set_all_unfinished_evaluation_runs_to_errored @@ -60,7 +61,8 @@ async def lifespan(app: FastAPI): # Loops if config.SHOULD_RUN_LOOPS: # validator loops; TODO: rename env var asyncio.create_task(validator_heartbeat_timeout_loop()) - + asyncio.create_task(validator_stalled_evaluation_loop()) + asyncio.create_task(fetch_metagraph_loop())