Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,19 @@ def _killall(self):
bad=True,
)
for worker in live_workers:
self._metadata.register_metadata(
worker.task.run_id,
worker.task.step,
worker.task.task_id,
[
MetaDatum(
field="task_fail_reason",
value="killed",
type="metaflow.task_fail_reason",
tags=["attempt_id:{0}".format(worker.task.retries)],
)
],
)
worker.kill()
self._logger("Flushing logs...", system_msg=True, bad=True)
# give killed workers a chance to flush their logs to datastore
Expand Down
52 changes: 45 additions & 7 deletions metaflow/task.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
from __future__ import print_function
from io import BytesIO
import math
import sys
import os
import time
import signal
import traceback


from types import MethodType, FunctionType

from metaflow.sidecar import Message, MessageTypes
from metaflow.datastore.exceptions import DataException

from .metaflow_config import MAX_ATTEMPTS
from .metadata_provider import MetaDatum
from .mflog import TASK_LOG_SOURCE
from .datastore import Inputs, TaskDataStoreSet
from .exception import (
MetaflowInternalError,
Expand All @@ -25,7 +20,6 @@
from .util import all_equal, get_username, resolve_identity, unicode_type
from .clone_util import clone_task_helper
from .metaflow_current import current
from metaflow.user_configs.config_parameters import ConfigValue
from metaflow.system import _system_logger, _system_monitor
from metaflow.tracing import get_trace_id
from metaflow.tuple_util import ForeachFrame
Expand All @@ -34,6 +28,33 @@
MAX_FOREACH_PATH_LENGTH = 256


class SystemSignalHandler(object):

def __init__(self, metadata, run_id, step_name, task_id, retry_count):
self.metadata = metadata
self.run_id = run_id
self.step_name = step_name
self.task_id = task_id
self.retry_count = retry_count
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)

def signal_handler(self, signum, frame):
metadata = [
MetaDatum(
field="task_fail_reason",
value="signal_sigint" if signum == signal.SIGINT else "signal_sigterm",
type="metaflow.task_fail_reason",
tags=["attempt_id:{0}".format(self.retry_count)],
)
]
self.metadata.register_metadata(
self.run_id, self.step_name, self.task_id, metadata
)
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)


class MetaflowTask(object):
"""
MetaflowTask prepares a Flow instance for execution of a single step.
Expand Down Expand Up @@ -733,6 +754,9 @@ def run_step(
"project_flow_name": current.get("project_flow_name"),
"trace_id": trace_id or None,
}

SystemSignalHandler(self.metadata, run_id, step_name, task_id, retry_count)

start = time.time()
self.metadata.start_task_heartbeat(self.flow.name, run_id, step_name, task_id)
with self.monitor.measure("metaflow.task.duration"):
Expand Down Expand Up @@ -862,6 +886,20 @@ def run_step(
self.flow._success = True

except Exception as ex:
self.metadata.register_metadata(
run_id,
step_name,
task_id,
[
MetaDatum(
field="task_fail_reason",
value="exception",
type="metaflow.task_fail_reason",
tags=["attempt_id:{0}".format(retry_count)],
)
],
)

with self.monitor.count("metaflow.task.exception"):
_system_logger.log_event(
level="error",
Expand Down
Loading