From feab9e27226593eeac90aa1fd89da9fb30acebe6 Mon Sep 17 00:00:00 2001 From: Madhur Tandon Date: Tue, 26 Aug 2025 14:50:02 +0530 Subject: [PATCH] add task_failure_reason to metadata --- metaflow/runtime.py | 13 ++++++++++++ metaflow/task.py | 52 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/metaflow/runtime.py b/metaflow/runtime.py index 69c7ff4256d..929062fe07a 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -748,6 +748,19 @@ def _killall(self): bad=True, ) for worker in live_workers: + self._metadata.register_metadata( + worker.task.run_id, + worker.task.step, + worker.task.task_id, + [ + MetaDatum( + field="task_fail_reason", + value="killed", + type="metaflow.task_fail_reason", + tags=["attempt_id:{0}".format(worker.task.retries)], + ) + ], + ) worker.kill() self._logger("Flushing logs...", system_msg=True, bad=True) # give killed workers a chance to flush their logs to datastore diff --git a/metaflow/task.py b/metaflow/task.py index d00ddd0a3fd..5cc8466afe6 100644 --- a/metaflow/task.py +++ b/metaflow/task.py @@ -1,20 +1,15 @@ from __future__ import print_function -from io import BytesIO -import math import sys import os import time +import signal import traceback from types import MethodType, FunctionType -from metaflow.sidecar import Message, MessageTypes -from metaflow.datastore.exceptions import DataException - from .metaflow_config import MAX_ATTEMPTS from .metadata_provider import MetaDatum -from .mflog import TASK_LOG_SOURCE from .datastore import Inputs, TaskDataStoreSet from .exception import ( MetaflowInternalError, @@ -25,7 +20,6 @@ from .util import all_equal, get_username, resolve_identity, unicode_type from .clone_util import clone_task_helper from .metaflow_current import current -from metaflow.user_configs.config_parameters import ConfigValue from metaflow.system import _system_logger, _system_monitor from metaflow.tracing import get_trace_id from metaflow.tuple_util import ForeachFrame @@ -34,6 +28,33 @@ MAX_FOREACH_PATH_LENGTH = 256 +class SystemSignalHandler(object): + + def __init__(self, metadata, run_id, step_name, task_id, retry_count): + self.metadata = metadata + self.run_id = run_id + self.step_name = step_name + self.task_id = task_id + self.retry_count = retry_count + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, self.signal_handler) + + def signal_handler(self, signum, frame): + metadata = [ + MetaDatum( + field="task_fail_reason", + value="signal_sigint" if signum == signal.SIGINT else "signal_sigterm", + type="metaflow.task_fail_reason", + tags=["attempt_id:{0}".format(self.retry_count)], + ) + ] + self.metadata.register_metadata( + self.run_id, self.step_name, self.task_id, metadata + ) + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + + class MetaflowTask(object): """ MetaflowTask prepares a Flow instance for execution of a single step. @@ -733,6 +754,9 @@ def run_step( "project_flow_name": current.get("project_flow_name"), "trace_id": trace_id or None, } + + SystemSignalHandler(self.metadata, run_id, step_name, task_id, retry_count) + start = time.time() self.metadata.start_task_heartbeat(self.flow.name, run_id, step_name, task_id) with self.monitor.measure("metaflow.task.duration"): @@ -862,6 +886,20 @@ def run_step( self.flow._success = True except Exception as ex: + self.metadata.register_metadata( + run_id, + step_name, + task_id, + [ + MetaDatum( + field="task_fail_reason", + value="exception", + type="metaflow.task_fail_reason", + tags=["attempt_id:{0}".format(retry_count)], + ) + ], + ) + with self.monitor.count("metaflow.task.exception"): _system_logger.log_event( level="error",