Skip to content

Commit ef82012

Browse files
committed
Add an option to resume only until a certain step
The goal is to be complementary to spin and allow incremental development
1 parent ade0b99 commit ef82012

File tree

2 files changed

+96
-14
lines changed

2 files changed

+96
-14
lines changed

metaflow/cli_components/run_cmds.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,24 @@ def wrapper(*args, **kwargs):
191191
hidden=True,
192192
help="If specified, it identifies the task that started this resume call. It is in the form of {step_name}-{task_id}",
193193
)
194-
@click.argument("step-to-rerun", required=False)
194+
@click.option(
195+
"--step-only/--no-step-only",
196+
default=False,
197+
show_default=True,
198+
help="If specified, runs up to the specified step(s) (inclusive) and stops. "
199+
"If the steps are not reachable in the cloned run (ie: parent steps were "
200+
"not executed), an error will be raised.",
201+
)
202+
@click.argument("steps-to-rerun", required=False, nargs=-1)
195203
@click.command(help="Resume execution of a previous run of this flow.")
196204
@tracing.cli("cli/resume")
197205
@common_run_options
198206
@click.pass_obj
199207
def resume(
200208
obj,
201209
tags=None,
202-
step_to_rerun=None,
210+
steps_to_rerun=None,
211+
step_only=False,
203212
origin_run_id=None,
204213
run_id=None,
205214
clone_only=False,
@@ -221,17 +230,22 @@ def resume(
221230
"A previous run id was not found. Specify --origin-run-id."
222231
)
223232

224-
if step_to_rerun is None:
233+
if steps_to_rerun is None:
225234
steps_to_rerun = set()
235+
if step_only:
236+
raise CommandException(
237+
"Cannot step-only resume without specifying at least one step to execute"
238+
)
226239
else:
227240
# validate step name
228-
if step_to_rerun not in obj.graph.nodes:
229-
raise CommandException(
230-
"invalid step name {0} specified, must be step present in "
231-
"current form of execution graph. Valid step names include: {1}".format(
232-
step_to_rerun, ",".join(list(obj.graph.nodes.keys()))
241+
for step_to_rerun in steps_to_rerun:
242+
if step_to_rerun not in obj.graph.nodes:
243+
raise CommandException(
244+
"invalid step name {0} specified, must be step present in "
245+
"current form of execution graph. Valid step names include: {1}".format(
246+
step_to_rerun, ",".join(list(obj.graph.nodes.keys()))
247+
)
233248
)
234-
)
235249

236250
## TODO: instead of checking execution path here, can add a warning later
237251
## instead of throwing an error. This is for resuming a step which was not
@@ -245,8 +259,10 @@ def resume(
245259
# f"part of the original execution path for run '{origin_run_id}'."
246260
# )
247261

248-
steps_to_rerun = {step_to_rerun}
262+
steps_to_rerun = set(steps_to_rerun)
249263

264+
if step_only:
265+
clone_only = False
250266
if run_id:
251267
# Run-ids that are provided by the metadata service are always integers.
252268
# External providers or run-ids (like external schedulers) always need to
@@ -274,12 +290,16 @@ def resume(
274290
clone_only=clone_only,
275291
reentrant=reentrant,
276292
steps_to_rerun=steps_to_rerun,
293+
step_only=step_only,
277294
max_workers=max_workers,
278295
max_num_splits=max_num_splits,
279296
max_log_size=max_log_size * 1024 * 1024,
280297
resume_identifier=resume_identifier,
281298
)
282299
write_file(run_id_file, runtime.run_id)
300+
if step_only:
301+
write_latest_run_id(obj, runtime.run_id)
302+
283303
runtime.print_workflow_info()
284304

285305
runtime.persist_constants()

metaflow/runtime.py

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ def __init__(
103103
clone_only=False,
104104
reentrant=False,
105105
steps_to_rerun=None,
106+
step_only=False,
106107
max_workers=MAX_WORKERS,
107108
max_num_splits=MAX_NUM_SPLITS,
108109
max_log_size=MAX_LOG_SIZE,
@@ -145,14 +146,51 @@ def __init__(
145146
self._skip_decorator_hooks = skip_decorator_hooks
146147

147148
# If steps_to_rerun is specified, we will not clone them in resume mode.
148-
self._steps_to_rerun = steps_to_rerun or {}
149+
self._steps_to_rerun = steps_to_rerun or set()
150+
self._steps_can_clone = set()
151+
self._steps_ran = set()
152+
self._step_only = step_only
153+
all_steps = set()
154+
cannot_clone_steps = set(self._steps_to_rerun)
149155
# sorted_nodes are in topological order already, so we only need to
150156
# iterate through the nodes once to get a stable set of rerun steps.
157+
# A few modes:
158+
# - no steps_to_rerun:
159+
# - not clone_only and not step_only: clone all previously executed steps and
160+
# continue execution.
161+
# - clone_only and not step_only: clone all steps that have previously executed
162+
# and stop
163+
# - not clone_only and step_only: NOT possible (requires a steps_to_rerun)
164+
# - clone_only and step_only: NOT possible (requires a steps_to_rerun)
165+
# => in all these cases, _steps_to_rerun is empty and so _steps_can_clone is
166+
# all_steps
167+
# - steps_to_rerun:
168+
# - not clone_only and not step_only: clone all previously executed steps *except*
169+
# any of the steps in steps_to_rerun and the subsequent steps. Continue execution.
170+
# => _steps_to_rerun contains the steps to rerun and all descendants. _steps_can_clone
171+
# contains all other steps
172+
# - clone_only and not step_only: clone all steps that have previously executed
173+
# up to (but not including) any of the steps in steps_to_rerun and
174+
# subsequent steps.
175+
# => same as above but steps_to_rerun is not used to run anything
176+
# - not clone_only and step_only: clone all steps that have previously executed
177+
# up to (but not including) any of the steps in steps_to_rerun and
178+
# subsequent steps. Execute *only* the steps in steps_to_rerun if possible
179+
# and stop.
180+
# - clone_only and step_only: NOT possible (if step_only is specified, we turn
181+
# off clone_only -- clone_only implies no further execution since task
182+
# objects will not be generated).
183+
# => _steps_to_rerun contains *only* the initially passed steps to run and
184+
# _steps_can_clone contains the same as in the other cases.
151185
for step_name in self._graph.sorted_nodes:
152-
if step_name in self._steps_to_rerun:
186+
all_steps.add(step_name)
187+
if step_name in cannot_clone_steps:
153188
out_funcs = self._graph[step_name].out_funcs or []
154189
for next_step in out_funcs:
155-
self._steps_to_rerun.add(next_step)
190+
cannot_clone_steps.add(next_step)
191+
self._steps_can_clone = all_steps - cannot_clone_steps
192+
if not self._step_only:
193+
self._steps_to_rerun = cannot_clone_steps
156194

157195
self._origin_ds_set = None
158196
if clone_run_id:
@@ -399,7 +437,7 @@ def clone_original_run(self, generate_task_obj=False, verbose=True):
399437
if (
400438
task_ds["_task_ok"]
401439
and step_name != "_parameters"
402-
and (step_name not in self._steps_to_rerun)
440+
and (step_name in self._steps_can_clone)
403441
):
404442
# "_unbounded_foreach" is a special flag to indicate that the transition
405443
# is an unbounded foreach.
@@ -677,6 +715,19 @@ def execute(self):
677715
system_msg=True,
678716
)
679717
self._params_task.mark_resume_done()
718+
elif self._step_only:
719+
# Check that we ran all the steps in self._steps_to_rerun
720+
steps_missing = self._steps_to_rerun - self._steps_ran
721+
if steps_missing:
722+
raise MetaflowInternalError(
723+
"The following steps were not executed: {0}".format(
724+
", ".join(steps_missing)
725+
)
726+
)
727+
self._logger(
728+
"Step-only resume complete -- all specified steps were executed!",
729+
system_msg=True,
730+
)
680731
else:
681732
raise MetaflowInternalError(
682733
"The *end* step was not successful by the end of flow."
@@ -1073,6 +1124,8 @@ def _queue_task_foreach(self, task, next_steps):
10731124
def _queue_tasks(self, finished_tasks):
10741125
# finished tasks include only successful tasks
10751126
for task in finished_tasks:
1127+
step_name, _, _ = task.finished_id
1128+
self._steps_ran.add(step_name)
10761129
self._finished[task.finished_id] = task.path
10771130
self._is_cloned[task.path] = task.is_cloned
10781131

@@ -1137,6 +1190,15 @@ def _queue_tasks(self, finished_tasks):
11371190
)
11381191
)
11391192

1193+
if self._step_only:
1194+
# We need to filter next_steps to only include steps that are in
1195+
# self._steps_to_rerun
1196+
next_steps = [
1197+
step for step in next_steps if step in self._steps_to_rerun
1198+
]
1199+
if not next_steps:
1200+
# No steps to execute, so we can stop
1201+
return
11401202
# Different transition types require different treatment
11411203
if any(self._graph[f].type == "join" for f in next_steps):
11421204
# Next step is a join

0 commit comments

Comments
 (0)