Skip to content

Commit ad214ed

Browse files
committed
Re-add backward compatible _flow_decorators
1 parent 4b97826 commit ad214ed

File tree

7 files changed

+40
-46
lines changed

7 files changed

+40
-46
lines changed

metaflow/flowspec.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,11 @@ def script_name(self) -> str:
326326
fname = fname[:-1]
327327
return os.path.basename(fname)
328328

329+
@property
330+
def _flow_decorators(self):
331+
# Backward compatible method to access flow decorators
332+
return self._flow_state[FlowStateItems.FLOW_DECORATORS]
333+
329334
@classmethod
330335
def _check_parameters(cls, config_parameters=False):
331336
seen = set()

metaflow/plugins/airflow/airflow.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from metaflow import current
1111
from metaflow.decorators import flow_decorators
1212
from metaflow.exception import MetaflowException
13-
from metaflow.flowspec import FlowStateItems
1413
from metaflow.includefile import FilePathClass
1514
from metaflow.metaflow_config import (
1615
AIRFLOW_KUBERNETES_CONN_ID,
@@ -151,7 +150,7 @@ def save_deployment_token(cls, owner, name, token, flow_datastore):
151150
def _get_schedule(self):
152151
# Using the cron presets provided here :
153152
# https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html?highlight=schedule%20interval#cron-presets
154-
schedule = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS].get("schedule")
153+
schedule = self.flow._flow_decorators.get("schedule")
155154
if not schedule:
156155
return None
157156
schedule = schedule[0]
@@ -634,11 +633,10 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
634633
return cmds
635634

636635
def _collect_flow_sensors(self):
637-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
638636
decos_lists = [
639-
flow_decos.get(s.name)
637+
self.flow._flow_decorators.get(s.name)
640638
for s in SUPPORTED_SENSORS
641-
if flow_decos.get(s.name) is not None
639+
if self.flow._flow_decorators.get(s.name) is not None
642640
]
643641
af_tasks = [deco.create_task() for decos in decos_lists for deco in decos]
644642
if len(af_tasks) > 0:
@@ -652,14 +650,15 @@ def _contains_foreach(self):
652650
return False
653651

654652
def compile(self):
655-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
656-
if flow_decos.get("trigger") or flow_decos.get("trigger_on_finish"):
653+
if self.flow._flow_decorators.get("trigger") or self.flow._flow_decorators.get(
654+
"trigger_on_finish"
655+
):
657656
raise AirflowException(
658657
"Deploying flows with @trigger or @trigger_on_finish decorator(s) "
659658
"to Airflow is not supported currently."
660659
)
661660

662-
if flow_decos.get("exit_hook"):
661+
if self.flow._flow_decorators.get("exit_hook"):
663662
raise AirflowException(
664663
"Deploying flows with the @exit_hook decorator "
665664
"to Airflow is not currently supported."

metaflow/plugins/airflow/airflow_cli.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from metaflow import current, decorators
88
from metaflow._vendor import click
99
from metaflow.exception import MetaflowException, MetaflowInternalError
10-
from metaflow.flowspec import FlowStateItems
1110
from metaflow.metaflow_config import FEAT_ALWAYS_UPLOAD_CODE_PACKAGE
1211
from metaflow.package import MetaflowPackage
1312
from metaflow.plugins.aws.step_functions.production_token import (
@@ -418,7 +417,7 @@ def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout):
418417
)
419418
)
420419

421-
schedule = flow._flow_state[FlowStateItems.FLOW_DECORATORS].get("schedule")
420+
schedule = flow._flow_decorators.get("schedule")
422421
if not schedule:
423422
return
424423

metaflow/plugins/argo/argo_workflows.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from metaflow import JSONType, current
1313
from metaflow.decorators import flow_decorators
1414
from metaflow.exception import MetaflowException
15-
from metaflow.flowspec import FlowStateItems
1615
from metaflow.graph import FlowGraph
1716
from metaflow.includefile import FilePathClass
1817
from metaflow.metaflow_config import (
@@ -434,7 +433,7 @@ def _base_kubernetes_annotations(self):
434433
return annotations
435434

436435
def _get_schedule(self):
437-
schedule = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS].get("schedule")
436+
schedule = self.flow._flow_decorators.get("schedule")
438437
if schedule:
439438
# Remove the field "Year" if it exists
440439
schedule = schedule[0]
@@ -460,15 +459,14 @@ def schedule(self):
460459

461460
def trigger_explanation(self):
462461
# Trigger explanation for cron workflows
463-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
464-
if flow_decos.get("schedule"):
462+
if self.flow._flow_decorators.get("schedule"):
465463
return (
466464
"This workflow triggers automatically via the CronWorkflow *%s*."
467465
% self.name
468466
)
469467

470468
# Trigger explanation for @trigger
471-
elif flow_decos.get("trigger"):
469+
elif self.flow._flow_decorators.get("trigger"):
472470
return (
473471
"This workflow triggers automatically when the upstream %s "
474472
"is/are published."
@@ -478,7 +476,7 @@ def trigger_explanation(self):
478476
)
479477

480478
# Trigger explanation for @trigger_on_finish
481-
elif flow_decos.get("trigger_on_finish"):
479+
elif self.flow._flow_decorators.get("trigger_on_finish"):
482480
return (
483481
"This workflow triggers automatically when the upstream %s succeed(s)"
484482
% self.list_to_prose(
@@ -541,10 +539,7 @@ def get_execution(cls, name):
541539

542540
def _process_parameters(self):
543541
parameters = {}
544-
has_schedule = (
545-
self.flow._flow_state[FlowStateItems.FLOW_DECORATORS].get("schedule")
546-
is not None
547-
)
542+
has_schedule = self.flow._flow_decorators.get("schedule") is not None
548543
seen = set()
549544
for var, param in self.flow._get_parameters():
550545
# Throw an exception if the parameter is specified twice.
@@ -630,9 +625,10 @@ def _process_triggers(self):
630625
# Impute triggers for Argo Workflow Template specified through @trigger and
631626
# @trigger_on_finish decorators
632627

633-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
634628
# Disallow usage of @trigger and @trigger_on_finish together for now.
635-
if flow_decos.get("trigger") and flow_decos.get("trigger_on_finish"):
629+
if self.flow._flow_decorators.get("trigger") and self.flow._flow_decorators.get(
630+
"trigger_on_finish"
631+
):
636632
raise ArgoWorkflowsException(
637633
"Argo Workflows doesn't support both *@trigger* and "
638634
"*@trigger_on_finish* decorators concurrently yet. Use one or the "
@@ -642,7 +638,7 @@ def _process_triggers(self):
642638
options = None
643639

644640
# @trigger decorator
645-
if flow_decos.get("trigger"):
641+
if self.flow._flow_decorators.get("trigger"):
646642
# Parameters are not duplicated, and exist in the flow. Additionally,
647643
# convert them to lower case since Metaflow parameters are case
648644
# insensitive.
@@ -655,7 +651,7 @@ def _process_triggers(self):
655651
if not param.IS_CONFIG_PARAMETER
656652
]
657653
)
658-
trigger_deco = flow_decos.get("trigger")[0]
654+
trigger_deco = self.flow._flow_decorators.get("trigger")[0]
659655
trigger_deco.format_deploytime_value()
660656
for event in trigger_deco.triggers:
661657
parameters = {}
@@ -687,17 +683,19 @@ def _process_triggers(self):
687683
parameters[key.lower()] = value
688684
event["parameters"] = parameters
689685
event["type"] = "event"
690-
triggers.extend(flow_decos.get("trigger")[0].triggers)
686+
triggers.extend(self.flow._flow_decorators.get("trigger")[0].triggers)
691687

692688
# Set automatic parameter mapping iff only a single event dependency is
693689
# specified with no explicit parameter mapping.
694690
if len(triggers) == 1 and not triggers[0].get("parameters"):
695691
triggers[0]["parameters"] = dict(zip(params, params))
696-
options = flow_decos.get("trigger")[0].options
692+
options = self.flow._flow_decorators.get("trigger")[0].options
697693

698694
# @trigger_on_finish decorator
699-
if flow_decos.get("trigger_on_finish"):
700-
trigger_on_finish_deco = flow_decos.get("trigger_on_finish")[0]
695+
if self.flow._flow_decorators.get("trigger_on_finish"):
696+
trigger_on_finish_deco = self.flow._flow_decorators.get(
697+
"trigger_on_finish"
698+
)[0]
701699
trigger_on_finish_deco.format_deploytime_value()
702700
for event in trigger_on_finish_deco.triggers:
703701
# Actual filters are deduced here since we don't have access to
@@ -736,7 +734,7 @@ def _process_triggers(self):
736734
"flow": event["flow"],
737735
}
738736
)
739-
options = flow_decos.get("trigger_on_finish")[0].options
737+
options = self.flow._flow_decorators.get("trigger_on_finish")[0].options
740738

741739
for event in triggers:
742740
# Assign a sanitized name since we need this at many places to please
@@ -2805,9 +2803,7 @@ def _lifecycle_hooks(self):
28052803
hooks.append(self._pager_duty_change_template())
28062804
hooks.append(self._incident_io_change_template())
28072805

2808-
exit_hook_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS].get(
2809-
"exit_hook", []
2810-
)
2806+
exit_hook_decos = self.flow._flow_decorators.get("exit_hook", [])
28112807

28122808
for deco in exit_hook_decos:
28132809
hooks.extend(self._lifecycle_hook_from_deco(deco))

metaflow/plugins/aws/step_functions/step_functions.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from metaflow import R
1010
from metaflow.decorators import flow_decorators
1111
from metaflow.exception import MetaflowException
12-
from metaflow.flowspec import FlowStateItems
1312
from metaflow.metaflow_config import (
1413
EVENTS_SFN_ACCESS_IAM_ROLE,
1514
S3_ENDPOINT_URL,
@@ -303,14 +302,15 @@ def get_execution(cls, state_machine_name, name):
303302
raise StepFunctionsException(repr(e))
304303

305304
def _compile(self):
306-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
307-
if flow_decos.get("trigger") or flow_decos.get("trigger_on_finish"):
305+
if self.flow._flow_decorators.get("trigger") or self.flow._flow_decorators.get(
306+
"trigger_on_finish"
307+
):
308308
raise StepFunctionsException(
309309
"Deploying flows with @trigger or @trigger_on_finish decorator(s) "
310310
"to AWS Step Functions is not supported currently."
311311
)
312312

313-
if flow_decos.get("exit_hook"):
313+
if self.flow._flow_decorators.get("exit_hook"):
314314
raise StepFunctionsException(
315315
"Deploying flows with the @exit_hook decorator "
316316
"to AWS Step Functions is not currently supported."
@@ -484,8 +484,7 @@ def _visit(node, workflow, exit_node=None):
484484
return _visit(self.graph["start"], workflow)
485485

486486
def _cron(self):
487-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
488-
schedule = flow_decos.get("schedule")
487+
schedule = self.flow._flow_decorators.get("schedule")
489488
if schedule:
490489
schedule = schedule[0]
491490
if schedule.timezone is not None:

metaflow/plugins/pypi/conda_decorator.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import tempfile
66

77
from metaflow.decorators import FlowDecorator, StepDecorator
8-
from metaflow.flowspec import FlowStateItems
98
from metaflow.metadata_provider import MetaDatum
109
from metaflow.metaflow_environment import InvalidEnvironmentException
1110
from metaflow.packaging_sys import ContentType
@@ -83,9 +82,8 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge
8382
self.datastore = flow_datastore
8483

8584
# Support flow-level decorator.
86-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
87-
if "conda_base" in flow_decos:
88-
conda_base = flow_decos["conda_base"][0]
85+
if "conda_base" in self.flow._flow_decorators:
86+
conda_base = self.flow._flow_decorators["conda_base"][0]
8987
super_attributes = conda_base.attributes
9088
self.attributes["packages"] = {
9189
**super_attributes["packages"],

metaflow/plugins/pypi/pypi_decorator.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from metaflow.decorators import FlowDecorator, StepDecorator
2-
from metaflow.flowspec import FlowStateItems
32
from metaflow.metaflow_environment import InvalidEnvironmentException
43

54

@@ -41,9 +40,8 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge
4140
self.step = step
4241

4342
# Support flow-level decorator
44-
flow_decos = self.flow._flow_state[FlowStateItems.FLOW_DECORATORS]
45-
if "pypi_base" in flow_decos:
46-
pypi_base = flow_decos["pypi_base"][0]
43+
if "pypi_base" in self.flow._flow_decorators:
44+
pypi_base = self.flow._flow_decorators["pypi_base"][0]
4745
super_attributes = pypi_base.attributes
4846
self._attributes_with_user_values.update(
4947
pypi_base._attributes_with_user_values

0 commit comments

Comments
 (0)