Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.

Commit 890d886

Browse files
committed
DW-5997: Adding the check for pending dart actions to workflow_Service.run_triggered_workflow, leaving in /do-manual-trigger entry point because it checks for max concurrency.
1 parent 6aedf9c commit 890d886

File tree

3 files changed

+91
-72
lines changed

3 files changed

+91
-72
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from dart.context.locator import injectable
2+
from dart.model.workflow import WorkflowState, WorkflowInstanceState
3+
from dart.model.action import ActionState
4+
5+
import logging
6+
import boto3
7+
8+
_logger = logging.getLogger(__name__)
9+
10+
@injectable
11+
class PendingActionsCheck(object):
12+
def __init__(self, action_service):
13+
self._action_service = action_service
14+
self._batch_client = boto3.client('batch')
15+
16+
def get_not_completed_workflow_instances(self, workflow_id, workflow_service):
17+
wf = workflow_service.get_workflow(workflow_id, raise_when_missing=False)
18+
if not wf:
19+
_logger.info('Zombie Check: workflow (id={wf_id}) not found. log-info: {log_info}'.
20+
format(wf_id=workflow_id, log_info=workflow_msg.get('log_info')))
21+
return None
22+
23+
if wf.data.state != WorkflowState.ACTIVE:
24+
_logger.info('Zombie Check: expected workflow (id={wf_id}) to be in ACTIVE state. log-info: {log_info}'.
25+
format(wf_id=workflow_id, log_info=workflow_msg.get('log_info')))
26+
27+
# get all workflow_instances of current workflow:
28+
NOT_COMPLETE_STATES = ['QUEUED', 'RUNNING']
29+
all_wf_instances = workflow_service.find_workflow_instances(workflow_id)
30+
current_wf_instances = [wf for wf in all_wf_instances if wf.data.state in NOT_COMPLETE_STATES]
31+
_logger.info('Zombie Check: Found workflow instance ids (workflow_id={0}) instances = {1}'.format(workflow_id, current_wf_instances))
32+
33+
return current_wf_instances
34+
35+
def get_instance_actions(self, current_wf_instances):
36+
# get all actions of not completed workflow_instances
37+
incomplete_actions = []
38+
action_2_wf_instance = {}
39+
for wf_instance in current_wf_instances:
40+
wf_instance_actions = self._action_service.find_actions(workflow_instance_id=wf_instance.id)
41+
incomplete_actions.extend(wf_instance_actions)
42+
for action in wf_instance_actions:
43+
action_2_wf_instance[action.id] = wf_instance
44+
45+
jobs_2_actions = {}
46+
for action in incomplete_actions:
47+
if action.data.batch_job_id:
48+
jobs_2_actions[action.data.batch_job_id] = action
49+
50+
return incomplete_actions, jobs_2_actions, action_2_wf_instance
51+
52+
def handle_done_batch_jobs_with_not_complete_wf_instances(self, batch_jobs, jobs_2_actions, action_2_wf_instance, workflow_service):
53+
for job in batch_jobs.get('jobs'):
54+
# jobs fail + action not-failed => fail workflow instance and action
55+
action = jobs_2_actions[job.get('jobId')]
56+
if action:
57+
wf_instance = action_2_wf_instance[action.id]
58+
if job.get('status') == 'FAILED' and not (action.data.state in ['FAILED', 'COMPLETED']):
59+
_logger.info("Zombie Check: Job {0} is failed but action {0} is not failed/completed. Updating action and workflow_instance to FAILED".format(job.get('jobId'), action.id))
60+
self._action_service.update_action_state(action, ActionState.FAILED, action.data.error_message)
61+
workflow_service.update_workflow_instance_state(wf_instance, WorkflowInstanceState.FAILED)
62+
63+
# Jobs complete + action not-failed => mark workflow instance as complete and mark actions as complete
64+
if job.get('status') == 'COMPLETED' and not (action.data.state in ['FAILED', 'COMPLETED']):
65+
_logger.info("Zombie Check: Job {0} is completed but action {0} is not failed/completed. Updating action to COMPLETED".format(job.get("jobId"), action.id))
66+
self._action_service.update_action_state(action, ActionState.COMPLETED, action.data.error_message)
67+
workflow_service.update_workflow_instance_state(wf_instance, WorkflowInstanceState.FAILED)
68+
69+
def find_pending_dart_actions(self, workflow_id, workflow_service):
70+
''' We send workflow_service to avoid cyclical injection from workflow_service '''
71+
current_wf_instances = self.get_not_completed_workflow_instances(workflow_id, workflow_service)
72+
if current_wf_instances:
73+
incomplete_actions, jobs_2_actions, action_2_wf_instance = self.get_instance_actions(current_wf_instances)
74+
batch_job_ids = [job.data.batch_job_id for job in incomplete_actions]
75+
_logger.info("Zombie Check: extract job_ids {0} form incomplete actions {1}".format(batch_job_ids, [act.id for act in incomplete_actions]))
76+
77+
try:
78+
batch_jobs = self._batch_client.describe_jobs(jobs=batch_job_ids)
79+
except Exception as err:
80+
_logger.error("Zombie Check: failed to execute batch's describe_jobs. err = {0}".format(err))
81+
else:
82+
self.handle_done_batch_jobs_with_not_complete_wf_instances(batch_jobs, jobs_2_actions, action_2_wf_instance, workflow_service)
83+
84+

src/python/dart/service/workflow.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@
2222
@injectable
2323
class WorkflowService(object):
2424
def __init__(self, datastore_service, action_service, trigger_proxy, filter_service, subscription_service,
25-
subscription_element_service, emailer):
25+
subscription_element_service, emailer, pending_actions_check):
2626
self._datastore_service = datastore_service
2727
self._action_service = action_service
2828
self._trigger_proxy = trigger_proxy
2929
self._filter_service = filter_service
3030
self._subscription_service = subscription_service
3131
self._subscription_element_service = subscription_element_service
3232
self._emailer = emailer
33+
self._pending_actions_check = pending_actions_check
3334

3435
@staticmethod
3536
def save_workflow(workflow, commit=True, flush=False):
@@ -263,6 +264,7 @@ def run_triggered_workflow(self, workflow_msg, trigger_type, trigger_id=None, re
263264
states = [WorkflowInstanceState.QUEUED, WorkflowInstanceState.RUNNING]
264265
if self.find_workflow_instances_count(wf.id, states) >= wf.data.concurrency:
265266
_logger.info('workflow (id={wf_id}) has already reached max concurrency of {concurrency}. log-info: {log_info}'.format(wf_id=wf.id, concurrency=wf.data.concurrency, log_info=workflow_msg.get('log_info')))
267+
self._pending_actions_check.find_pending_dart_actions(wf.id, self)
266268
return
267269

268270
wf_instance = self.save_workflow_instance(

src/python/dart/trigger/zombie_check.py

Lines changed: 4 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616

1717
@injectable
1818
class ZombieCheckTriggerProcessor(TriggerProcessor):
19-
def __init__(self, trigger_proxy, action_service, workflow_service):
19+
def __init__(self, trigger_proxy, action_service, workflow_service, pending_actions_check):
2020
self._trigger_proxy = trigger_proxy
2121
self._action_service = action_service
2222
self._workflow_service = workflow_service
2323
self._trigger_type = zombie_check_trigger
24+
self._pending_actions_check = pending_actions_check
2425
self._batch_client = boto3.client('batch')
2526

2627
def trigger_type(self):
@@ -33,80 +34,12 @@ def initialize_trigger(self, trigger, trigger_service):
3334
def update_trigger(self, unmodified_trigger, modified_trigger):
3435
return modified_trigger
3536

36-
def get_not_completed_workflow_instances(self, workflow_msg):
37-
# get workflow associated with currently triggered workflow
38-
workflow_id = workflow_msg.get('workflow_id')
39-
wf = self._workflow_service.get_workflow(workflow_id, raise_when_missing=False)
40-
if not wf:
41-
_logger.info('Zombie Check: workflow (id={wf_id}) not found. log-info: {log_info}'.
42-
format(wf_id=workflow_id, log_info=workflow_msg.get('log_info')))
43-
return None
44-
45-
if wf.data.state != WorkflowState.ACTIVE:
46-
_logger.info('Zombie Check: expected workflow (id={wf_id}) to be in ACTIVE state. log-info: {log_info}'.
47-
format(wf_id=workflow_id, log_info=workflow_msg.get('log_info')))
48-
49-
# get all workflow_instances of current workflow:
50-
NOT_COMPLETE_STATES = ['QUEUED', 'RUNNING']
51-
all_wf_instances = self._workflow_service.find_workflow_instances(workflow_id)
52-
current_wf_instances = [wf for wf in all_wf_instances if wf.data.state in NOT_COMPLETE_STATES]
53-
_logger.info('Zombie Check: Found workflow instance ids (workflow_id={0}) instances = {1}'.format(workflow_id, current_wf_instances))
54-
55-
return current_wf_instances
56-
57-
def get_instance_actions(self, current_wf_instances):
58-
# get all actions of nt completed workflow_instances and map ach action to its workflow_instance in action_2_wf_instance
59-
incomplete_actions = []
60-
action_2_wf_instance = {}
61-
for wf_instance in current_wf_instances:
62-
wf_instance_actions = self._action_service.find_actions(workflow_instance_id=wf_instance.id)
63-
incomplete_actions.extend(wf_instance_actions)
64-
for action in wf_instance_actions:
65-
action_2_wf_instance[action.id] = wf_instance
66-
67-
jobs_2_actions = {}
68-
for action in incomplete_actions:
69-
if action.data.batch_job_id:
70-
jobs_2_actions[action.data.batch_job_id] = action
71-
72-
return incomplete_actions, jobs_2_actions, action_2_wf_instance
73-
74-
def handle_done_batch_jobs_with_not_complete_wf_instances(self, batch_jobs, jobs_2_actions, action_2_wf_instance):
75-
for job in batch_jobs.get('jobs'):
76-
# jobs fail + action not-failed => fail workflow instance and action
77-
action = jobs_2_actions[job.get('jobId')]
78-
if action:
79-
if job.get('status') == 'FAILED' and not (action.data.state in ['FAILED', 'COMPLETED']):
80-
_logger.info("Zombie Check: Job {0} is failed but action {0} is not failed/completed. Updating action and workflow_instance to FAILED".format(job.get('jobId'), action.id))
81-
self._action_service.update_action_state(action, ActionState.FAILED, action.data.error_message)
82-
self._workflow_service.update_workflow_instance_state(action, WorkflowInstanceState.FAILED)
83-
84-
# Jobs complete + action not-failed => mark workflow instance as complete and mark actions as complete
85-
if job.get('status') == 'COMPLETED' and not (action.data.state in ['FAILED', 'COMPLETED']):
86-
_logger.info("Zombie Check: Job {0} is completed but action {0} is not failed/completed. Updating action to COMPLETED".format(job.get("jobId"), action.id))
87-
self._action_service.update_action_state(action, ActionState.COMPLETED, action.data.error_message)
88-
self._workflow_service.update_workflow_instance_state(action, WorkflowInstanceState.FAILED)
89-
9037
def evaluate_message(self, workflow_msg, trigger_service):
9138
""" :type message: dict
9239
:type trigger_service: dart.service.trigger.TriggerService """
9340

94-
current_wf_instances = self.get_not_completed_workflow_instances(workflow_msg)
95-
if not current_wf_instances:
96-
return []
97-
98-
# get all actions of not completed workflow_instances and map ach action to its workflow_instance in action_2_wf_instance
99-
incomplete_actions, jobs_2_actions, action_2_wf_instance = self.get_instance_actions(current_wf_instances)
100-
batch_job_ids = [job.data.batch_job_id for job in incomplete_actions]
101-
_logger.info("Zombie Check: extract job_ids {0} form incomplete actions {1}".format(batch_job_ids, [act.id for act in incomplete_actions]))
102-
103-
try:
104-
batch_jobs = self._batch_client.describe_jobs(jobs=batch_job_ids)
105-
except Exception as err:
106-
_logger.error("Zombie Check: failed to execute batch's describe_jobs. err = {0}".format(err))
107-
return []
108-
109-
self.handle_done_batch_jobs_with_not_complete_wf_instances(batch_jobs, jobs_2_actions, action_2_wf_instance)
41+
workflow_id = workflow_msg.get('workflow_id')
42+
self._pending_actions_check.find_pending_dart_actions(workflow_id, self._workflow_service)
11043

11144
# return an empty list since this is not associated with a particular trigger instance
11245
return []

0 commit comments

Comments
 (0)