diff --git a/api/config.py b/api/config.py index 4fbd9a33..9fd3e21e 100644 --- a/api/config.py +++ b/api/config.py @@ -152,6 +152,12 @@ logger.fatal("VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS is not set in .env") VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS = int(VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS) +VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS = os.getenv("VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS", "900") +VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS = int(VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS) + +VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS = os.getenv("VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS", "60") +VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS = int(VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS) + # Load validator configuration (sent to validator upon registration) @@ -238,6 +244,8 @@ logger.info(f"Validator Heartbeat Timeout: {VALIDATOR_HEARTBEAT_TIMEOUT_SECONDS} second(s)") logger.info(f"Validator Heartbeat Timeout Interval: {VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS} second(s)") +logger.info(f"Validator Stalled Evaluation Timeout: {VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS} second(s)") +logger.info(f"Validator Stalled Evaluation Check Interval: {VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS} second(s)") logger.info("-------------------------") logger.info(f"Validator Running Agent Timeout: {VALIDATOR_RUNNING_AGENT_TIMEOUT_SECONDS} second(s)") diff --git a/api/endpoints/validator.py b/api/endpoints/validator.py index 3ca5e29d..c9e654a7 100644 --- a/api/endpoints/validator.py +++ b/api/endpoints/validator.py @@ -101,6 +101,65 @@ async def delete_validators_that_have_not_sent_a_heartbeat() -> None: +# Detects validators/screeners that are alive (sending heartbeats) but whose +# evaluation runs have made no progress. This happens when a screener's +# inference gateway is broken — the agent retries forever, no runs complete, +# and the miner's agent is stuck until the evaluation times out. +async def detect_and_handle_stalled_evaluations() -> None: + logger.info("Checking for stalled evaluations...") + + now = datetime.now(timezone.utc) + _validators = list(SESSION_ID_TO_VALIDATOR.values()) + + for validator in _validators: + # Skip validators not currently running an evaluation + if validator.current_evaluation_id is None: + continue + + # Skip if we don't have a current_evaluation with a created_at timestamp + if validator.current_evaluation is None: + continue + + evaluation_age_seconds = (now - validator.current_evaluation.created_at).total_seconds() + + # Only check evaluations that have been running longer than the threshold + if evaluation_age_seconds < config.VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS: + continue + + # Query the evaluation runs to check for progress + evaluation_runs = await get_all_evaluation_runs_in_evaluation_id(validator.current_evaluation_id) + + # Count how many runs have progressed beyond running_agent (i.e. actually produced output) + completed_or_progressed = sum( + 1 for run in evaluation_runs + if run.status in [ + EvaluationRunStatus.initializing_eval, + EvaluationRunStatus.running_eval, + EvaluationRunStatus.finished, + EvaluationRunStatus.error, + ] + ) + + if completed_or_progressed == 0: + logger.warning( + f"Stalled evaluation detected for validator '{validator.name}': " + f"evaluation {validator.current_evaluation_id} has been running for " + f"{int(evaluation_age_seconds)}s with 0/{len(evaluation_runs)} runs progressed past running_agent. " + f"Disconnecting validator." + ) + + async with DebugLock(validator._lock, f"detect_and_handle_stalled_evaluations() for {validator.name}'s lock"): + if validator.session_id in SESSION_ID_TO_VALIDATOR: + await delete_validator( + validator, + f"The validator was disconnected because its evaluation had 0 completed runs " + f"after {int(evaluation_age_seconds)} seconds (stalled evaluation detection)." + ) + + logger.info("Finished checking for stalled evaluations") + + + # Dependency to get the validator associated with the request # Requires that the request has a valid "Authorization: Bearer " header # See validator_request_evaluation() and other endpoints for usage examples diff --git a/api/loops/validator_stalled_evaluation.py b/api/loops/validator_stalled_evaluation.py new file mode 100644 index 00000000..ab6134ec --- /dev/null +++ b/api/loops/validator_stalled_evaluation.py @@ -0,0 +1,15 @@ +import asyncio +import api.config as config +import utils.logger as logger + +from api.endpoints.validator import detect_and_handle_stalled_evaluations + + + +async def validator_stalled_evaluation_loop(): + logger.info("Starting validator stalled evaluation detection loop...") + + while True: + await detect_and_handle_stalled_evaluations() + + await asyncio.sleep(config.VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS) diff --git a/api/src/main.py b/api/src/main.py index 4e90f925..4cd4bb53 100644 --- a/api/src/main.py +++ b/api/src/main.py @@ -9,6 +9,7 @@ from api.loops.fetch_metagraph import fetch_metagraph_loop from api.loops.validator_heartbeat_timeout import validator_heartbeat_timeout_loop +from api.loops.validator_stalled_evaluation import validator_stalled_evaluation_loop from queries.evaluation import set_all_unfinished_evaluation_runs_to_errored @@ -60,7 +61,8 @@ async def lifespan(app: FastAPI): # Loops if config.SHOULD_RUN_LOOPS: # validator loops; TODO: rename env var asyncio.create_task(validator_heartbeat_timeout_loop()) - + asyncio.create_task(validator_stalled_evaluation_loop()) + asyncio.create_task(fetch_metagraph_loop())