Skip to content

Commit ee68f27

Browse files
Feature: Support spinning a single metaflow step (Rebased) (#2506)
To test `spin` on a new flow you can do the following: ### Simple case: ``` python <flow_name.py> --environment=conda spin <step_name> ``` ### Pass in specific pathspec: ``` python runtime_dag_flow.py --environment=conda spin RuntimeDAGFlow/13/step_c/275232971 ``` ### Pass in custom artifacts via module: ``` python runtime_dag_flow.py spin RuntimeDAGFlow/13/step_d/275233082 --artifacts-module ./my_artifacts.py ``` ### Skip decorators (including the whitelisted ones): ``` python complex_dag_flow.py --environment=conda spin step_d --skip-decorators ``` ### Use with Runner API: ``` with Runner('complex_dag_flow.py', environment="conda").spin( "<Some Val>", artifacts_module='./artifacts/complex_dag_step_d.py', ) as spin: print("-" * 50) print(f"Running test for step: step_a") spin_task = spin.task print(f"my_output: {spin_task['my_output']}") assert spin_task['my_output'].data == [-1] ``` See the tests for more examples on hot to use this command. --------- Co-authored-by: Romain Cledat <rcledat@netflix.com>
1 parent f09bab3 commit ee68f27

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2672
-361
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ __pycache__/
22
*.py[cod]
33
*$py.class
44
*.metaflow
5+
*.metaflow_spin
6+
metaflow_card_cache/
57

68
build/
79
dist/

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,3 @@ We'd love to hear from you. Join our community [Slack workspace](http://slack.ou
6060

6161
## Contributing
6262
We welcome contributions to Metaflow. Please see our [contribution guide](https://docs.metaflow.org/introduction/contributing-to-metaflow) for more details.
63-
64-

metaflow/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ class and related decorators.
146146
metadata,
147147
get_metadata,
148148
default_metadata,
149+
inspect_spin,
149150
Metaflow,
150151
Flow,
151152
Run,

metaflow/cli.py

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import functools
23
import inspect
34
import os
@@ -7,7 +8,6 @@
78

89
import metaflow.tracing as tracing
910
from metaflow._vendor import click
10-
from metaflow.system import _system_logger, _system_monitor
1111

1212
from . import decorators, lint, metaflow_version, parameters, plugins
1313
from .cli_args import cli_args
@@ -27,6 +27,8 @@
2727
DEFAULT_PACKAGE_SUFFIXES,
2828
)
2929
from .metaflow_current import current
30+
from .metaflow_profile import from_start
31+
from metaflow.system import _system_monitor, _system_logger
3032
from .metaflow_environment import MetaflowEnvironment
3133
from .packaging_sys import MetaflowCodeContent
3234
from .plugins import (
@@ -38,9 +40,9 @@
3840
)
3941
from .pylint_wrapper import PyLint
4042
from .R import metaflow_r_version, use_r
43+
from .util import get_latest_run_id, resolve_identity, decompress_list
4144
from .user_configs.config_options import LocalFileInput, config_options
4245
from .user_configs.config_parameters import ConfigValue
43-
from .util import get_latest_run_id, resolve_identity
4446

4547
ERASE_TO_EOL = "\033[K"
4648
HIGHLIGHT = "red"
@@ -125,6 +127,8 @@ def logger(body="", system_msg=False, head="", bad=False, timestamp=True, nl=Tru
125127
"step": "metaflow.cli_components.step_cmd.step",
126128
"run": "metaflow.cli_components.run_cmds.run",
127129
"resume": "metaflow.cli_components.run_cmds.resume",
130+
"spin": "metaflow.cli_components.run_cmds.spin",
131+
"spin-step": "metaflow.cli_components.step_cmd.spin_step",
128132
},
129133
)
130134
def cli(ctx):
@@ -318,6 +322,13 @@ def version(obj):
318322
hidden=True,
319323
is_eager=True,
320324
)
325+
@click.option(
326+
"--mode",
327+
type=click.Choice(["spin"]),
328+
default=None,
329+
help="Execution mode for metaflow CLI commands. Use 'spin' to enable "
330+
"spin metadata and spin datastore for executions",
331+
)
321332
@click.pass_context
322333
def start(
323334
ctx,
@@ -335,6 +346,7 @@ def start(
335346
local_config_file=None,
336347
config=None,
337348
config_value=None,
349+
mode=None,
338350
**deco_options
339351
):
340352
if quiet:
@@ -347,6 +359,7 @@ def start(
347359
if use_r():
348360
version = metaflow_r_version()
349361

362+
from_start("MetaflowCLI: Starting")
350363
echo("Metaflow %s" % version, fg="magenta", bold=True, nl=False)
351364
echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False)
352365
echo(" for *%s*" % resolve_identity(), fg="magenta")
@@ -366,6 +379,7 @@ def start(
366379
ctx.obj.check = functools.partial(_check, echo)
367380
ctx.obj.top_cli = cli
368381
ctx.obj.package_suffixes = package_suffixes.split(",")
382+
ctx.obj.spin_mode = mode == "spin"
369383

370384
ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == datastore][0]
371385

@@ -472,19 +486,12 @@ def start(
472486
# set force rebuild flag for environments that support it.
473487
ctx.obj.environment._force_rebuild = force_rebuild_environments
474488
ctx.obj.environment.validate_environment(ctx.obj.logger, datastore)
475-
476489
ctx.obj.event_logger = LOGGING_SIDECARS[event_logger](
477490
flow=ctx.obj.flow, env=ctx.obj.environment
478491
)
479-
ctx.obj.event_logger.start()
480-
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)
481-
482492
ctx.obj.monitor = MONITOR_SIDECARS[monitor](
483493
flow=ctx.obj.flow, env=ctx.obj.environment
484494
)
485-
ctx.obj.monitor.start()
486-
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)
487-
488495
ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0](
489496
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
490497
)
@@ -498,6 +505,57 @@ def start(
498505
)
499506

500507
ctx.obj.config_options = config_options
508+
ctx.obj.is_spin = False
509+
ctx.obj.skip_decorators = False
510+
511+
# Override values for spin steps, or if we are in spin mode
512+
if (
513+
hasattr(ctx, "saved_args")
514+
and ctx.saved_args
515+
and "spin" in ctx.saved_args[0]
516+
or ctx.obj.spin_mode
517+
):
518+
# To minimize side effects for spin, we will only use the following:
519+
# - local metadata provider,
520+
# - local datastore,
521+
# - local environment,
522+
# - null event logger,
523+
# - null monitor
524+
ctx.obj.is_spin = True
525+
if "--skip-decorators" in ctx.saved_args:
526+
ctx.obj.skip_decorators = True
527+
528+
ctx.obj.event_logger = LOGGING_SIDECARS["nullSidecarLogger"](
529+
flow=ctx.obj.flow, env=ctx.obj.environment
530+
)
531+
ctx.obj.monitor = MONITOR_SIDECARS["nullSidecarMonitor"](
532+
flow=ctx.obj.flow, env=ctx.obj.environment
533+
)
534+
# Use spin metadata, spin datastore, and spin datastore root
535+
ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "spin"][0](
536+
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
537+
)
538+
ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == "spin"][0]
539+
datastore_root = ctx.obj.datastore_impl.get_datastore_root_from_config(
540+
ctx.obj.echo, create_on_absent=True
541+
)
542+
ctx.obj.datastore_impl.datastore_root = datastore_root
543+
544+
ctx.obj.flow_datastore = FlowDataStore(
545+
ctx.obj.flow.name,
546+
ctx.obj.environment, # Same environment as run/resume
547+
ctx.obj.metadata, # local metadata
548+
ctx.obj.event_logger, # null event logger
549+
ctx.obj.monitor, # null monitor
550+
storage_impl=ctx.obj.datastore_impl,
551+
)
552+
553+
# Start event logger and monitor
554+
ctx.obj.event_logger.start()
555+
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)
556+
557+
ctx.obj.monitor.start()
558+
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)
501559

502560
decorators._init(ctx.obj.flow)
503561

@@ -512,9 +570,11 @@ def start(
512570
ctx.obj.logger,
513571
echo,
514572
deco_options,
573+
ctx.obj.is_spin,
574+
ctx.obj.skip_decorators,
515575
)
516576

517-
# In the case of run/resume, we will want to apply the TL decospecs
577+
# In the case of run/resume/spin, we will want to apply the TL decospecs
518578
# *after* the run decospecs so that they don't take precedence. In other
519579
# words, for the same decorator, we want `myflow.py run --with foo` to
520580
# take precedence over any other `foo` decospec
@@ -542,11 +602,10 @@ def start(
542602
if (
543603
hasattr(ctx, "saved_args")
544604
and ctx.saved_args
545-
and ctx.saved_args[0] not in ("run", "resume")
605+
and ctx.saved_args[0] not in ("run", "resume", "spin")
546606
):
547-
# run/resume are special cases because they can add more decorators with --with,
607+
# run/resume/spin are special cases because they can add more decorators with --with,
548608
# so they have to take care of themselves.
549-
550609
all_decospecs = ctx.obj.tl_decospecs + list(
551610
ctx.obj.environment.decospecs() or []
552611
)
@@ -556,6 +615,9 @@ def start(
556615
# or a scheduler setting them up in their own way.
557616
if ctx.saved_args[0] not in ("step", "init"):
558617
all_decospecs += DEFAULT_DECOSPECS.split()
618+
elif ctx.saved_args[0] == "spin-step":
619+
# If we are in spin-args, we will not attach any decorators
620+
all_decospecs = []
559621
if all_decospecs:
560622
decorators._attach_decorators(ctx.obj.flow, all_decospecs)
561623
decorators._init(ctx.obj.flow)
@@ -569,6 +631,9 @@ def start(
569631
ctx.obj.environment,
570632
ctx.obj.flow_datastore,
571633
ctx.obj.logger,
634+
# The last two arguments are only used for spin steps
635+
ctx.obj.is_spin,
636+
ctx.obj.skip_decorators,
572637
)
573638

574639
# Check the graph again (mutators may have changed it)

0 commit comments

Comments
 (0)