Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@
logger.fatal("VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS is not set in .env")
VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS = int(VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS)

VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS = os.getenv("VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS", "900")
VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS = int(VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS)

VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS = os.getenv("VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS", "60")
VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS = int(VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS)



# Load validator configuration (sent to validator upon registration)
Expand Down Expand Up @@ -238,6 +244,8 @@

logger.info(f"Validator Heartbeat Timeout: {VALIDATOR_HEARTBEAT_TIMEOUT_SECONDS} second(s)")
logger.info(f"Validator Heartbeat Timeout Interval: {VALIDATOR_HEARTBEAT_TIMEOUT_INTERVAL_SECONDS} second(s)")
logger.info(f"Validator Stalled Evaluation Timeout: {VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS} second(s)")
logger.info(f"Validator Stalled Evaluation Check Interval: {VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS} second(s)")
logger.info("-------------------------")

logger.info(f"Validator Running Agent Timeout: {VALIDATOR_RUNNING_AGENT_TIMEOUT_SECONDS} second(s)")
Expand Down
59 changes: 59 additions & 0 deletions api/endpoints/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,65 @@ async def delete_validators_that_have_not_sent_a_heartbeat() -> None:



# Detects validators/screeners that are alive (sending heartbeats) but whose
# evaluation runs have made no progress. This happens when a screener's
# inference gateway is broken — the agent retries forever, no runs complete,
# and the miner's agent is stuck until the evaluation times out.
async def detect_and_handle_stalled_evaluations() -> None:
logger.info("Checking for stalled evaluations...")

now = datetime.now(timezone.utc)
_validators = list(SESSION_ID_TO_VALIDATOR.values())

for validator in _validators:
# Skip validators not currently running an evaluation
if validator.current_evaluation_id is None:
continue

# Skip if we don't have a current_evaluation with a created_at timestamp
if validator.current_evaluation is None:
continue

evaluation_age_seconds = (now - validator.current_evaluation.created_at).total_seconds()

# Only check evaluations that have been running longer than the threshold
if evaluation_age_seconds < config.VALIDATOR_STALLED_EVALUATION_TIMEOUT_SECONDS:
continue

# Query the evaluation runs to check for progress
evaluation_runs = await get_all_evaluation_runs_in_evaluation_id(validator.current_evaluation_id)

# Count how many runs have progressed beyond running_agent (i.e. actually produced output)
completed_or_progressed = sum(
1 for run in evaluation_runs
if run.status in [
EvaluationRunStatus.initializing_eval,
EvaluationRunStatus.running_eval,
EvaluationRunStatus.finished,
EvaluationRunStatus.error,
]
)

if completed_or_progressed == 0:
logger.warning(
f"Stalled evaluation detected for validator '{validator.name}': "
f"evaluation {validator.current_evaluation_id} has been running for "
f"{int(evaluation_age_seconds)}s with 0/{len(evaluation_runs)} runs progressed past running_agent. "
f"Disconnecting validator."
)

async with DebugLock(validator._lock, f"detect_and_handle_stalled_evaluations() for {validator.name}'s lock"):
if validator.session_id in SESSION_ID_TO_VALIDATOR:
await delete_validator(
validator,
f"The validator was disconnected because its evaluation had 0 completed runs "
f"after {int(evaluation_age_seconds)} seconds (stalled evaluation detection)."
)

logger.info("Finished checking for stalled evaluations")



# Dependency to get the validator associated with the request
# Requires that the request has a valid "Authorization: Bearer <session_id>" header
# See validator_request_evaluation() and other endpoints for usage examples
Expand Down
15 changes: 15 additions & 0 deletions api/loops/validator_stalled_evaluation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import asyncio
import api.config as config
import utils.logger as logger

from api.endpoints.validator import detect_and_handle_stalled_evaluations



async def validator_stalled_evaluation_loop():
logger.info("Starting validator stalled evaluation detection loop...")

while True:
await detect_and_handle_stalled_evaluations()

await asyncio.sleep(config.VALIDATOR_STALLED_EVALUATION_CHECK_INTERVAL_SECONDS)
4 changes: 3 additions & 1 deletion api/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from api.loops.fetch_metagraph import fetch_metagraph_loop
from api.loops.validator_heartbeat_timeout import validator_heartbeat_timeout_loop
from api.loops.validator_stalled_evaluation import validator_stalled_evaluation_loop


from queries.evaluation import set_all_unfinished_evaluation_runs_to_errored
Expand Down Expand Up @@ -60,7 +61,8 @@ async def lifespan(app: FastAPI):
# Loops
if config.SHOULD_RUN_LOOPS: # validator loops; TODO: rename env var
asyncio.create_task(validator_heartbeat_timeout_loop())

asyncio.create_task(validator_stalled_evaluation_loop())

asyncio.create_task(fetch_metagraph_loop())


Expand Down