Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from .datastore import FlowDataStore, TaskDataStoreSet
from .debug import debug
from .exception import CommandException, MetaflowException
from .flowspec import _FlowState
from .flowspec import FlowStateItems
from .graph import FlowGraph
from .metaflow_config import (
DEFAULT_DATASTORE,
Expand Down Expand Up @@ -444,8 +444,8 @@ def start(
# We can now set the the CONFIGS value in the flow properly. This will overwrite
# anything that may have been passed in by default and we will use exactly what
# the original flow had. Note that these are accessed through the parameter name
ctx.obj.flow._flow_state[_FlowState.CONFIGS].clear()
d = ctx.obj.flow._flow_state[_FlowState.CONFIGS]
ctx.obj.flow._flow_state[FlowStateItems.CONFIGS].clear()
d = ctx.obj.flow._flow_state[FlowStateItems.CONFIGS]
for param_name, var_name in zip(config_param_names, config_var_names):
val = param_ds[var_name]
debug.userconf_exec("Loaded config %s as: %s" % (param_name, val))
Expand All @@ -457,7 +457,7 @@ def start(
raise ctx.obj.delayed_config_exception

# Init all values in the flow mutators and then process them
for decorator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []):
for decorator in ctx.obj.flow._flow_state[FlowStateItems.FLOW_MUTATORS]:
decorator.external_init()

new_cls = ctx.obj.flow._process_config_decorators(config_options)
Expand Down Expand Up @@ -533,9 +533,9 @@ def start(
ctx.obj.flow_datastore,
{
k: ConfigValue(v) if v is not None else None
for k, v in ctx.obj.flow.__class__._flow_state.get(
_FlowState.CONFIGS, {}
).items()
for k, v in ctx.obj.flow.__class__._flow_state[
FlowStateItems.CONFIGS
].items()
},
)

Expand Down
3 changes: 2 additions & 1 deletion metaflow/cmd/code/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Callable, List, Mapping, Optional, cast

from metaflow import Run
from metaflow.util import walk_without_cycles
from metaflow._vendor import click
from metaflow.cli import echo_always

Expand Down Expand Up @@ -51,7 +52,7 @@ def perform_diff(
target_dir = os.getcwd()

diffs = []
for dirpath, dirnames, filenames in os.walk(source_dir, followlinks=True):
for dirpath, _, filenames in walk_without_cycles(source_dir):
for fname in filenames:
# NOTE: the paths below need to be set up carefully
# for the `patch` command to work. Better not to touch
Expand Down
34 changes: 24 additions & 10 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from functools import partial
from typing import Any, Callable, Dict, List, NewType, Tuple, TypeVar, Union, overload

from .flowspec import FlowSpec, _FlowState
from .flowspec import FlowSpec, FlowStateItems
from .exception import (
MetaflowInternalError,
MetaflowException,
Expand Down Expand Up @@ -294,7 +294,11 @@ def add_decorator_options(cmd):


def flow_decorators(flow_cls):
return [d for deco_list in flow_cls._flow_decorators.values() for d in deco_list]
return [
d
for deco_list in flow_cls._flow_state[FlowStateItems.FLOW_DECORATORS].values()
for d in deco_list
]


class StepDecorator(Decorator):
Expand Down Expand Up @@ -492,12 +496,20 @@ def _base_flow_decorator(decofunc, *args, **kwargs):
cls = args[0]
if isinstance(cls, type) and issubclass(cls, FlowSpec):
# flow decorators add attributes in the class dictionary,
# _flow_decorators. _flow_decorators is of type `{key:[decos]}`
if decofunc.name in cls._flow_decorators and not decofunc.allow_multiple:
# cls._flow_state[FlowStateItems.FLOW_DECORATORS]. This is of type `{key:[decos]}`
self_flow_decos = cls._flow_state.self_data[FlowStateItems.FLOW_DECORATORS]
inherited_flow_decos = cls._flow_state.inherited_data.get(
FlowStateItems.FLOW_DECORATORS, {}
)

if (
decofunc.name in self_flow_decos
or decofunc.name in inherited_flow_decos
) and not decofunc.allow_multiple:
raise DuplicateFlowDecoratorException(decofunc.name)
else:
deco_instance = decofunc(attributes=kwargs, statically_defined=True)
cls._flow_decorators.setdefault(decofunc.name, []).append(deco_instance)
self_flow_decos.setdefault(decofunc.name, []).append(deco_instance)
else:
raise BadFlowDecoratorException(decofunc.name)
return cls
Expand Down Expand Up @@ -659,7 +671,8 @@ def _attach_decorators_to_step(step, decospecs):


def _init(flow, only_non_static=False):
for decorators in flow._flow_decorators.values():
flow_decos = flow._flow_state[FlowStateItems.FLOW_DECORATORS]
for decorators in flow_decos.values():
for deco in decorators:
deco.external_init()

Expand All @@ -676,7 +689,8 @@ def _init_flow_decorators(
flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options
):
# Since all flow decorators are stored as `{key:[deco]}` we iterate through each of them.
for decorators in flow._flow_decorators.values():
flow_decos = flow._flow_state[FlowStateItems.FLOW_DECORATORS]
for decorators in flow_decos.values():
# First resolve the `options` for the flow decorator.
# Options are passed from cli.
# For example `@project` can take a `--name` / `--branch` from the cli as options.
Expand Down Expand Up @@ -724,7 +738,7 @@ def _init_step_decorators(flow, graph, environment, flow_datastore, logger):
# and then the step level ones to maintain a consistent order with how
# other decorators are run.

for deco in cls._flow_state.get(_FlowState.FLOW_MUTATORS, []):
for deco in cls._flow_state[FlowStateItems.FLOW_MUTATORS]:
if isinstance(deco, FlowMutator):
inserted_by_value = [deco.decorator_name] + (deco.inserted_by or [])
mutable_flow = MutableFlow(
Expand All @@ -746,8 +760,8 @@ def _init_step_decorators(flow, graph, environment, flow_datastore, logger):
deco.mutate(mutable_flow)
# We reset cached_parameters on the very off chance that the user added
# more configurations based on the configuration
if _FlowState.CACHED_PARAMETERS in cls._flow_state:
del cls._flow_state[_FlowState.CACHED_PARAMETERS]
if cls._flow_state[FlowStateItems.CACHED_PARAMETERS] is not None:
cls._flow_state[FlowStateItems.CACHED_PARAMETERS] = None
else:
raise MetaflowInternalError(
"A non FlowMutator found in flow custom decorators"
Expand Down
Loading