From 584892464b32d4501704be18d252fd86b678fbd9 Mon Sep 17 00:00:00 2001 From: Marnik Bercx Date: Wed, 12 Nov 2025 12:08:34 +1000 Subject: [PATCH 1/8] =?UTF-8?q?=F0=9F=92=A5=20`BaseRestartWorkChain`:=20ad?= =?UTF-8?q?d=20`on=5Funhandled=5Ffailure`=20input?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently, the `BaseRestartWorkChain` has hardcoded behavior for unhandled failures: it restarts once, then aborts on the second consecutive failure with the `ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE` exit code. This approach lacks flexibility for different use cases where users might want immediate abort or allow for human evaluation through pausing. This commit introduces a new optional input `on_unhandled_failure` that allows users to configure how the work chain handles unhandled failures. The available options are: - `abort` (default): Abort immediately with ERROR_UNHANDLED_FAILURE - `pause`: Pause the work chain for user inspection - `restart_once`: Restart once, then abort if it fails again (similar to old behavior) - `restart_and_pause`: Restart once, then pause if it still fails BREAKING: The default behavior is set to `abort`, which is the most conservative option. In many cases this is the desired behavior, since doing a restart without changing the inputs will typically fail again, wasting resources. Users who want the old "restart once" behavior can explicitly set `on_unhandled_failure='restart_once'`. BREAKING: The exit code `ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE` has been renamed to `ERROR_UNHANDLED_FAILURE` to better reflect the new flexible behavior where failure doesn't necessarily mean "second consecutive" anymore. --- .../engine/processes/workchains/restart.py | 88 ++++++++++++++++--- .../processes/workchains/test_restart.py | 38 +++++--- 2 files changed, 99 insertions(+), 27 deletions(-) diff --git a/src/aiida/engine/processes/workchains/restart.py b/src/aiida/engine/processes/workchains/restart.py index 34544704f2..f5b5e99784 100644 --- a/src/aiida/engine/processes/workchains/restart.py +++ b/src/aiida/engine/processes/workchains/restart.py @@ -8,6 +8,8 @@ ########################################################################### """Base implementation of `WorkChain` class that implements a simple automated restart mechanism for sub processes.""" +from __future__ import annotations + import functools from inspect import getmembers from types import FunctionType @@ -28,6 +30,21 @@ __all__ = ('BaseRestartWorkChain',) +def validate_on_unhandled_failure(value: None | orm.Str | orm.EnumData, _) -> None | str: + """Validator for the `on_unhandled_failure` input port. + + :param value: the input `Str` or `EnumData` node + """ + if value is None: + return None + + valid_options = ('abort', 'pause', 'restart_once', 'restart_and_pause') + if value.value not in valid_options: + return f"Invalid value '{value.value}'. Must be one of: {', '.join(valid_options)}" + + return None + + def validate_handler_overrides( process_class: type['BaseRestartWorkChain'], handler_overrides: Optional[orm.Dict], ctx: 'PortNamespace' ) -> Optional[str]: @@ -163,6 +180,15 @@ def define(cls, spec: 'ProcessSpec') -> None: # type: ignore[override] 'can define the ``enabled`` and ``priority`` key, which can be used to toggle the values set on ' 'the original process handler declaration.', ) + spec.input( + 'on_unhandled_failure', + valid_type=(orm.Str, orm.EnumData), + required=False, + validator=validate_on_unhandled_failure, + help='Action to take when an unhandled failure occurs. Options: "abort" (default, fail immediately), ' + '"pause" (pause the workchain for inspection), "restart_once" (restart once then abort), ' + '"restart_and_pause" (restart once then pause if still failing).', + ) spec.exit_code(301, 'ERROR_SUB_PROCESS_EXCEPTED', message='The sub process excepted.') spec.exit_code(302, 'ERROR_SUB_PROCESS_KILLED', message='The sub process was killed.') spec.exit_code( @@ -170,8 +196,8 @@ def define(cls, spec: 'ProcessSpec') -> None: # type: ignore[override] ) spec.exit_code( 402, - 'ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE', - message='The process failed for an unknown reason, twice in a row.', + 'ERROR_UNHANDLED_FAILURE', + message='The process failed with an unhandled failure.', ) def setup(self) -> None: @@ -225,9 +251,16 @@ def inspect_process(self) -> Optional['ExitCode']: If the process is excepted or killed, the work chain will abort. Otherwise any attached handlers will be called in order of their specified priority. If the process was failed and no handler returns a report indicating that - the error was handled, it is considered an unhandled process failure and the process is relaunched. If this - happens twice in a row, the work chain is aborted. In the case that at least one handler returned a report the - following matrix determines the logic that is followed: + the error was handled, it is considered an unhandled process failure. The behavior depends on the + `on_unhandled_failure` input: + + - `abort` (default): Abort immediately with ERROR_UNHANDLED_FAILURE + - `pause`: Pause the workchain for user inspection + - `restart_once`: Restart once, then abort if it fails again + - `restart_and_pause`: Restart once, then pause if it fails again + + In the case that at least one handler returned a report the following matrix determines the logic that is + followed: Process Handler Handler Action result report? exit code @@ -274,14 +307,43 @@ def inspect_process(self) -> Optional['ExitCode']: # If the process failed and no handler returned a report we consider it an unhandled failure if node.is_failed and not last_report: - if self.ctx.unhandled_failure: - template = '{}<{}> failed and error was not handled for the second consecutive time, aborting' - self.report(template.format(*report_args)) - return self.exit_codes.ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE - - self.ctx.unhandled_failure = True - self.report('{}<{}> failed and error was not handled, restarting once more'.format(*report_args)) - return None + action = self.inputs.get('on_unhandled_failure', None) + action = action.value if action is not None else 'abort' + + if action == 'abort': + self.report(f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure, aborting') + return self.exit_codes.ERROR_UNHANDLED_FAILURE + elif action == 'pause': + self.report( + f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure, pausing for inspection' + ) + self.pause() + return None + elif action == 'restart_once': + if self.ctx.unhandled_failure: + self.report( + f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure for the second ' + 'consecutive time, aborting' + ) + return self.exit_codes.ERROR_UNHANDLED_FAILURE + self.ctx.unhandled_failure = True + self.report( + f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure, restarting once more' + ) + return None + elif action == 'restart_and_pause': + if self.ctx.unhandled_failure: + self.report( + f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure for the second ' + 'consecutive time, pausing for inspection' + ) + self.pause() + return None + self.ctx.unhandled_failure = True + self.report( + f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure, restarting once more' + ) + return None # Here either the process finished successful or at least one handler returned a report so it can no longer be # considered to be an unhandled failed process and therefore we reset the flag diff --git a/tests/engine/processes/workchains/test_restart.py b/tests/engine/processes/workchains/test_restart.py index bf870292d7..2eee9ec4b6 100644 --- a/tests/engine/processes/workchains/test_restart.py +++ b/tests/engine/processes/workchains/test_restart.py @@ -113,28 +113,38 @@ def test_killed_process(generate_work_chain, generate_calculation_node): @pytest.mark.requires_rmq -def test_unhandled_failure(generate_work_chain, generate_calculation_node): - """Test the unhandled failure mechanism. - - The workchain should be aborted if there are two consecutive failed sub processes that went unhandled. We simulate - it by setting `ctx.unhandled_failure` to True and append two failed process nodes in `ctx.children`. - """ - process = generate_work_chain(SomeWorkChain, {}) +@pytest.mark.parametrize('on_unhandled_failure', (None, 'abort', 'pause', 'restart_once', 'restart_and_pause')) +def test_unhandled_failure(generate_work_chain, generate_calculation_node, on_unhandled_failure): + """Test the `on_unhandled_failure` input and behavior.""" + process = generate_work_chain(SomeWorkChain, {'on_unhandled_failure': on_unhandled_failure}) process.setup() process.ctx.children = [generate_calculation_node(exit_status=100)] - assert process.inspect_process() is None - assert process.ctx.unhandled_failure is True + result = process.inspect_process() + if on_unhandled_failure in (None, 'abort'): + assert result == engine.BaseRestartWorkChain.exit_codes.ERROR_UNHANDLED_FAILURE + return + elif on_unhandled_failure == 'pause': + assert result is None + assert process.paused + return + assert result is None + assert process.ctx.unhandled_failure is True process.ctx.children.append(generate_calculation_node(exit_status=100)) - assert ( - process.inspect_process() == engine.BaseRestartWorkChain.exit_codes.ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE - ) + result = process.inspect_process() + + if on_unhandled_failure == 'restart_once': + assert result == engine.BaseRestartWorkChain.exit_codes.ERROR_UNHANDLED_FAILURE + return + elif on_unhandled_failure == 'pause': + assert result is None + assert process.paused @pytest.mark.requires_rmq def test_unhandled_reset_after_success(generate_work_chain, generate_calculation_node): """Test `ctx.unhandled_failure` is reset to `False` in `inspect_process` after a successful process.""" - process = generate_work_chain(SomeWorkChain, {}) + process = generate_work_chain(SomeWorkChain, {'on_unhandled_failure': orm.Str('restart_once')}) process.setup() process.ctx.children = [generate_calculation_node(exit_status=100)] assert process.inspect_process() is None @@ -148,7 +158,7 @@ def test_unhandled_reset_after_success(generate_work_chain, generate_calculation @pytest.mark.requires_rmq def test_unhandled_reset_after_handled(generate_work_chain, generate_calculation_node): """Test `ctx.unhandled_failure` is reset to `False` in `inspect_process` after a handled failed process.""" - process = generate_work_chain(SomeWorkChain, {}) + process = generate_work_chain(SomeWorkChain, {'on_unhandled_failure': orm.Str('restart_once')}) process.setup() process.ctx.children = [generate_calculation_node(exit_status=300)] assert process.inspect_process() is None From fe0e95f91fd0275d5a2333870de913f016ebeecd Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Wed, 3 Dec 2025 17:46:45 +0100 Subject: [PATCH 2/8] Reinstating adaption of verdi process list message In #7069, we had logic to adapt the output of `verdi process list` when the process was paused by the handler (and only in this case, i.e., not when a user was pausing it). This, in our (C. Pignedoli's and mine) opinion is very important, otherwise the user most probably is not aware of why the calculation was paused (or might not even realize it's paused). To achieve this, a variable is used to mark if the pause was triggered by the handlers. Moreover, `on_paused` is overridden to set the process status accordingly. Importantly, we also changed the exact logic of `restart_and_pause` to the one we had implemented in the other PR: namely, after pausing, the `self.ctx.unhandled_failure` is reset to `False`, so that if another error occurs after replaying, two attempts are tried before pausing again. --- .../engine/processes/workchains/restart.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/aiida/engine/processes/workchains/restart.py b/src/aiida/engine/processes/workchains/restart.py index f5b5e99784..76e1a5a965 100644 --- a/src/aiida/engine/processes/workchains/restart.py +++ b/src/aiida/engine/processes/workchains/restart.py @@ -200,6 +200,17 @@ def define(cls, spec: 'ProcessSpec') -> None: # type: ignore[override] message='The process failed with an unhandled failure.', ) + def on_paused(self, msg: Optional[str] = None) -> None: + """Adapt the message shown by `verdi process list` to make the request for user action explicit. + + In particular, if the process was paused by a handler, change the process status to hint that + user inspection is needed.""" + super().on_paused(msg) + if self.ctx.paused_by_handler: + # Only show the message if the process was paused by a handler, not if the process was paused + # by other means (e.g. user request). + self.node.set_process_status(f"Need user inspection, see: 'verdi process report {self.node.pk}'") + def setup(self) -> None: """Initialize context variables that are used during the logical flow of the `BaseRestartWorkChain`.""" overrides = ( @@ -210,6 +221,7 @@ def setup(self) -> None: self.ctx.unhandled_failure = False self.ctx.is_finished = False self.ctx.iteration = 0 + self.ctx.paused_by_handler = False def should_run_process(self) -> bool: """Return whether a new process should be run. @@ -217,6 +229,10 @@ def should_run_process(self) -> bool: This is the case as long as the last process has not finished successfully and the maximum number of restarts has not yet been exceeded. """ + # This should be the first step to be run after a process was paused by the handler, and is + # then replayed. I reset the flag here. + self.ctx.paused_by_handler = False + max_iterations = self.inputs.max_iterations.value return not self.ctx.is_finished and self.ctx.iteration < max_iterations @@ -317,6 +333,9 @@ def inspect_process(self) -> Optional['ExitCode']: self.report( f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure, pausing for inspection' ) + # Set the flag to indicate that the process was paused by a handler (and not by user request), + # so that the message shown in `verdi process list` is adapted accordingly + self.ctx.paused_by_handler = True self.pause() return None elif action == 'restart_once': @@ -337,6 +356,12 @@ def inspect_process(self) -> Optional['ExitCode']: f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure for the second ' 'consecutive time, pausing for inspection' ) + # Set the flag to indicate that the process was paused by a handler (and not by user request), + # so that the message shown in `verdi process list` is adapted accordingly + self.ctx.paused_by_handler = True + # reset the unhandled failure flag, so that after replaying, it will + # try again twice for future errors + self.ctx.unhandled_failure = False self.pause() return None self.ctx.unhandled_failure = True From ece6c299ed0a1c7222df5a8b8edb1d83d3614eaf Mon Sep 17 00:00:00 2001 From: Giovanni Pizzi Date: Wed, 3 Dec 2025 18:25:44 +0100 Subject: [PATCH 3/8] Adding more explicit instructions in `verdi process report` We want to be as explicit as possible on what the user is supposed to do. --- src/aiida/engine/processes/workchains/restart.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/aiida/engine/processes/workchains/restart.py b/src/aiida/engine/processes/workchains/restart.py index 76e1a5a965..99499d75e5 100644 --- a/src/aiida/engine/processes/workchains/restart.py +++ b/src/aiida/engine/processes/workchains/restart.py @@ -333,6 +333,12 @@ def inspect_process(self) -> Optional['ExitCode']: self.report( f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure, pausing for inspection' ) + self.report( + 'If you believe that the issue can be resolved by just retrying the same execution ' + '(e.g., in case of a node failure), you can just replay this work chain using ' + f'`verdi process play {self.node.pk}`. Otherwise, you can kill the work chain using ' + f'`verdi process kill {self.node.pk}`.' + ) # Set the flag to indicate that the process was paused by a handler (and not by user request), # so that the message shown in `verdi process list` is adapted accordingly self.ctx.paused_by_handler = True @@ -356,6 +362,12 @@ def inspect_process(self) -> Optional['ExitCode']: f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure for the second ' 'consecutive time, pausing for inspection' ) + self.report( + 'If you believe that the issue can be resolved by just retrying the same execution ' + '(e.g., in case of a node failure), you can just replay this work chain using ' + f'`verdi process play {self.node.pk}`. Otherwise, you can kill the work chain using ' + f'`verdi process kill {self.node.pk}`.' + ) # Set the flag to indicate that the process was paused by a handler (and not by user request), # so that the message shown in `verdi process list` is adapted accordingly self.ctx.paused_by_handler = True From 5582ab403512d6365b2ba7841e4dc6362c2ded44 Mon Sep 17 00:00:00 2001 From: Marnik Bercx Date: Thu, 4 Dec 2025 18:52:10 +1000 Subject: [PATCH 4/8] Remove ctx variable in favor of passing status to `self.pause` --- .../engine/processes/workchains/restart.py | 26 ++----------------- 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/src/aiida/engine/processes/workchains/restart.py b/src/aiida/engine/processes/workchains/restart.py index 99499d75e5..eb80da4b35 100644 --- a/src/aiida/engine/processes/workchains/restart.py +++ b/src/aiida/engine/processes/workchains/restart.py @@ -200,17 +200,6 @@ def define(cls, spec: 'ProcessSpec') -> None: # type: ignore[override] message='The process failed with an unhandled failure.', ) - def on_paused(self, msg: Optional[str] = None) -> None: - """Adapt the message shown by `verdi process list` to make the request for user action explicit. - - In particular, if the process was paused by a handler, change the process status to hint that - user inspection is needed.""" - super().on_paused(msg) - if self.ctx.paused_by_handler: - # Only show the message if the process was paused by a handler, not if the process was paused - # by other means (e.g. user request). - self.node.set_process_status(f"Need user inspection, see: 'verdi process report {self.node.pk}'") - def setup(self) -> None: """Initialize context variables that are used during the logical flow of the `BaseRestartWorkChain`.""" overrides = ( @@ -221,7 +210,6 @@ def setup(self) -> None: self.ctx.unhandled_failure = False self.ctx.is_finished = False self.ctx.iteration = 0 - self.ctx.paused_by_handler = False def should_run_process(self) -> bool: """Return whether a new process should be run. @@ -229,10 +217,6 @@ def should_run_process(self) -> bool: This is the case as long as the last process has not finished successfully and the maximum number of restarts has not yet been exceeded. """ - # This should be the first step to be run after a process was paused by the handler, and is - # then replayed. I reset the flag here. - self.ctx.paused_by_handler = False - max_iterations = self.inputs.max_iterations.value return not self.ctx.is_finished and self.ctx.iteration < max_iterations @@ -339,10 +323,7 @@ def inspect_process(self) -> Optional['ExitCode']: f'`verdi process play {self.node.pk}`. Otherwise, you can kill the work chain using ' f'`verdi process kill {self.node.pk}`.' ) - # Set the flag to indicate that the process was paused by a handler (and not by user request), - # so that the message shown in `verdi process list` is adapted accordingly - self.ctx.paused_by_handler = True - self.pause() + self.pause(f"Paused for user inspection, see: 'verdi process report {self.node.pk}'") return None elif action == 'restart_once': if self.ctx.unhandled_failure: @@ -368,13 +349,10 @@ def inspect_process(self) -> Optional['ExitCode']: f'`verdi process play {self.node.pk}`. Otherwise, you can kill the work chain using ' f'`verdi process kill {self.node.pk}`.' ) - # Set the flag to indicate that the process was paused by a handler (and not by user request), - # so that the message shown in `verdi process list` is adapted accordingly - self.ctx.paused_by_handler = True # reset the unhandled failure flag, so that after replaying, it will # try again twice for future errors self.ctx.unhandled_failure = False - self.pause() + self.pause(f"Paused for user inspection, see: 'verdi process report {self.node.pk}'") return None self.ctx.unhandled_failure = True self.report( From 4ee98c9cb09aef01ccd6adbf1e4426711a374859 Mon Sep 17 00:00:00 2001 From: Marnik Bercx Date: Fri, 5 Dec 2025 14:20:27 +1000 Subject: [PATCH 5/8] =?UTF-8?q?=F0=9F=93=9A=20Update=20documentation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/source/howto/workchains_restart.rst | 148 ++++++++++++++++------- 1 file changed, 101 insertions(+), 47 deletions(-) diff --git a/docs/source/howto/workchains_restart.rst b/docs/source/howto/workchains_restart.rst index cb6d852c3b..be04fe332c 100644 --- a/docs/source/howto/workchains_restart.rst +++ b/docs/source/howto/workchains_restart.rst @@ -57,22 +57,102 @@ The work chain runs the subprocess. Once it has finished, it then inspects the status. If the subprocess finished successfully, the work chain returns the results and its job is done. If, instead, the subprocess failed, the work chain should inspect the cause of failure, and attempt to fix the problem and restart the subprocess. -This cycle is repeated until the subprocess finishes successfully. -Of course this runs the risk of entering into an infinite loop if the work chain never manages to fix the problem, so we want to build in a limit to the maximum number of calculations that can be re-run: +This cycle is repeated until either the subprocess finishes successfully or a maximum number of iterations is reached. -.. _workflow-error-handling-flow-loop: -.. figure:: include/images/workflow_error_handling_flow_loop.png - :align: center - :height: 500px - - An improved flow diagram for the base work chain that limits the maximum number of iterations that the work chain can try and get the calculation to finish successfully. - -Since this is such a common logical flow for a base work chain that is to wrap another :py:class:`~aiida.engine.processes.process.Process` and restart it until it is finished successfully, we have implemented it as an abstract base class in ``aiida-core``. +Since this is such a common logical flow for a base work chain that wraps another :py:class:`~aiida.engine.processes.process.Process` and restarts it until it is finished successfully, we have implemented it as an abstract base class in ``aiida-core``. The :py:class:`~aiida.engine.processes.workchains.restart.BaseRestartWorkChain` implements the logic of the flow diagram shown above. Although the ``BaseRestartWorkChain`` is a subclass of :py:class:`~aiida.engine.processes.workchains.workchain.WorkChain` itself, you cannot launch it. The reason is that it is completely general and so does not know which :py:class:`~aiida.engine.processes.process.Process` class it should run. Instead, to make use of the base restart work chain, you should subclass it for the process class that you want to wrap. +Running a ``BaseRestartWorkChain`` +================================== + +Many plugin packages have already implemented a ``BaseRestartWorkChain`` for the codes they support, such as the ``PwBaseWorkChain`` in ``aiida-quantumespresso`` or the ``Cp2kBaseWorkChain`` in ``aiida-cp2k``. +The inputs will depend on the calculation and choices of the developer, but there are several default inputs you can configure to control the behavior of any ``BaseRestartWorkChain``. + +Specifying the maximum number of iterations +-------------------------------------------- + +To prevent a work chain from entering an infinite loop if it never manages to fix the problem, the ``BaseRestartWorkChain`` limits the maximum number of times the subprocess can be restarted. +This is controlled by the ``max_iterations`` input, which defaults to ``5``: + +.. code-block:: python + + from aiida.orm import Int + + inputs = { + 'process_inputs': { + 'input_1': value_1, + 'input_2': value_2 + }, + 'max_iterations': Int(10) + } + submit(SomeBaseWorkChain, **inputs) + +If the subprocess fails and is restarted repeatedly until ``max_iterations`` is reached without succeeding, the work chain will abort with exit code ``401`` (``ERROR_MAXIMUM_ITERATIONS_EXCEEDED``). + + +Handler overrides +----------------- + +It is possible to change the priority of handlers and enable/disable them without changing the source code of the work chain. +These properties of the handlers can be controlled through the ``handler_overrides`` input of the work chain. +This input takes a ``Dict`` node, that has the following form: + +.. code-block:: python + + handler_overrides = Dict({ + 'handler_negative_sum': { # Insert the name of the process handler here + 'enabled': True, + 'priority': 10000 + } + }) + +As you can see, the keys are the name of the handler to affect and the value is a dictionary that can take two keys: ``enabled`` and ``priority``. +To enable or disable a handler, set ``enabled`` to ``True`` or ``False``, respectively. +The ``priority`` key takes an integer and determines the priority of the handler. +Note that the values of the ``handler_overrides`` are fully optional and will override the values configured by the process handler decorator in the source code of the work chain. +The changes also only affect the work chain instance that receives the ``handler_overrides`` input, all other instances of the work chain that will be launched will be unaffected. + + +Configuring unhandled failure behavior +-------------------------------------- + +.. versionadded:: 2.8 + + Before v2.8, a ``BaseRestartWorkChain`` would always restart once for an unhandled failure. + +There may be cases where a process experience a failure that has no corresponding error handler, but you still want to restart the process. +A typical example here is a node failure, where you simply want to restart the process without any changes to the input. +By default, a ``BaseRestartWorkChain`` will abort when it encounters a failure it cannot handle, but this behaviour can be changed through the ``on_unhandled_failure`` input. +The options are: + +``abort`` (default) + The work chain immediately aborts with exit code ``402`` (``ERROR_UNHANDLED_FAILURE``). + This is the most conservative option and prevents wasting computational resources by rerunning calculations that will likely fail again with the same inputs. + +``pause`` + The work chain pauses for user inspection. + This allows you to examine the failed subprocess and decide whether to continue or abort. + When paused, you can: + + - Use ``verdi process report `` to inspect the work chain's progress and error messages. + - Use ``verdi process play `` to resume the work chain (e.g., if the failure was due to a transient issue like a node failure). + - Use ``verdi process kill `` to abort the work chain if the problem cannot be resolved. + +``restart_once`` + The work chain will automatically restart the subprocess once. + If the subprocess fails again with another unhandled failure, the work chain aborts with ``ERROR_UNHANDLED_FAILURE``. + +``restart_and_pause`` + The work chain combines the previous two strategies: it restarts the subprocess once, and if that also results in an unhandled failure, it pauses for user inspection rather than aborting. + This provides a balance between automatic recovery and user control. + +.. seealso:: + + You may be wondering if it's also possible to change the inputs and restart after inspecting the failure of a paused ``BaseRestartWorkChain``. + This is currently not yet possible, but we're exploring this possibility, see `the following blog post `_. Writing a base restart work chain ================================= @@ -120,7 +200,7 @@ Next, as with all work chains, we should *define* its process specification: The inputs and output that we define are essentially determined by the sub process that the work chain will be running. Since the ``ArithmeticAddCalculation`` requires the inputs ``x`` and ``y``, and produces the ``sum`` as output, we `mirror` those in the specification of the work chain, otherwise we wouldn't be able to pass the necessary inputs. -Finally, we define the logical outline, which if you look closely, resembles the logical flow chart presented in :numref:`workflow-error-handling-flow-loop` a lot. +Finally, we define the logical outline. We start by *setting up* the work chain and then enter a loop: *while* the subprocess has not yet finished successfully *and* we haven't exceeded the maximum number of iterations, we *run* another instance of the process and then *inspect* the results. The while conditions are implemented in the ``should_run_process`` outline step. When the process finishes successfully or we have to abandon, we report the *results*. @@ -190,7 +270,7 @@ As you can see the work chain launched a single instance of the ``ArithmeticAddC Indeed, when updating an existing work chain file or adding a new one, it is **necessary** to restart the daemon **every time** after all changes have taken place. Exposing inputs and outputs -=========================== +--------------------------- Any base restart work chain *needs* to *expose* the inputs of the subprocess it wraps, and most likely *wants* to do the same for the outputs it produces, although the latter is not necessary. For the simple example presented in the previous section, simply copy-pasting the input and output port definitions of the subprocess ``ArithmeticAddCalculation`` was not too troublesome. @@ -253,7 +333,7 @@ When submitting or running the work chain using namespaced inputs (``add`` in th Customizing outputs -=================== +------------------- By default, the ``BaseRestartWorkChain`` will attach the exposed outputs of the last completed calculation job. In most cases this is the correct behavior, but there might be use-cases where one wants to modify exactly what outputs are attached to the work chain. @@ -273,7 +353,7 @@ In this case, it is important to go through a ``calcfunction``, as always, as to Attaching outputs -================= +----------------- In a normal run, the ``results`` method is the last step in the outline of the ``BaseRestartWorkChain``. In this step, the outputs of the last completed calculation job are "attached" to the work chain itself. @@ -284,7 +364,7 @@ In this case the work chain will be stopped immediately and the ``results`` step Error handling -============== +-------------- So far you have seen how easy it is to get a work chain up and running that will run a subprocess using the ``BaseRestartWorkChain``. However, the whole point of this exercise, as described in the introduction, was for the work chain to be able to deal with *failing* processes, yet in the previous example it finished without any problems. @@ -309,14 +389,11 @@ This time we will see that the work chain takes quite a different path: $ verdi process status 1930 ArithmeticAddBaseWorkChain<1930> Finished [402] [1:while_(should_run_process)(1:inspect_process)] - ├── ArithmeticAddCalculation<1931> Finished [410] - └── ArithmeticAddCalculation<1934> Finished [410] - -As expected, the ``ArithmeticAddCalculation`` failed this time with a ``410``. -The work chain noticed the failure when inspecting the result of the subprocess in ``inspect_process``, and in keeping with its name and design, restarted the calculation. -However, since the inputs were not changed, the calculation inevitably and wholly expectedly failed once more with the exact same error code. -Unlike after the first iteration, however, the work chain did not restart again, but gave up and returned the exit code ``402`` itself, which stands for ``ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE``. -As the name suggests, the work chain tried to run the subprocess but it failed twice in a row without the problem being *handled*. + └── ArithmeticAddCalculation<1931> Finished [410] + +As expected, the ``ArithmeticAddCalculation`` failed with a ``410``. +The work chain noticed the failure when inspecting the result of the subprocess in ``inspect_process``, but since no process handler dealt with this error (we haven't written any yet), it is considered an *unhandled failure*. +By default, when encountering an unhandled failure, the work chain will abort immediately and return the exit code ``402`` (``ERROR_UNHANDLED_FAILURE``). The obvious question now of course is: "How exactly can we instruct the base work chain to handle certain problems?" Since the problems are necessarily dependent on the subprocess that the work chain will run, it cannot be implemented by the ``BaseRestartWorkChain`` class itself, but rather will have to be implemented by the subclass. @@ -384,7 +461,7 @@ Instead of having a conditional at the start of each handler to compare the exit If the ``exit_codes`` keyword is defined, which can be either a single instance of :class:`~aiida.engine.processes.exit_code.ExitCode` or a list thereof, the process handler will only be called if the exit status of the node corresponds to one of those exit codes, otherwise it will simply be skipped. Multiple process handlers -========================= +------------------------- Since typically a base restart work chain implementation will have more than one process handler, one might want to control the order in which they are called. This can be done through the ``priority`` keyword: @@ -432,26 +509,3 @@ The base restart work chain will detect this exit code and abort the work chain, └── ArithmeticAddCalculation<1952> Finished [410] With these basic tools, a broad range of use-cases can be addressed while preventing a lot of boilerplate code. - - -Handler overrides -================= - -It is possible to change the priority of handlers and enable/disable them without changing the source code of the work chain. -These properties of the handlers can be controlled through the ``handler_overrides`` input of the work chain. -This input takes a ``Dict`` node, that has the following form: - -.. code-block:: python - - handler_overrides = Dict({ - 'handler_negative_sum': { - 'enabled': True, - 'priority': 10000 - } - }) - -As you can see, the keys are the name of the handler to affect and the value is a dictionary that can take two keys: ``enabled`` and ``priority``. -To enable or disable a handler, set ``enabled`` to ``True`` or ``False``, respectively. -The ``priority`` key takes an integer and determines the priority of the handler. -Note that the values of the ``handler_overrides`` are fully optional and will override the values configured by the process handler decorator in the source code of the work chain. -The changes also only affect the work chain instance that receives the ``handler_overrides`` input, all other instances of the work chain that will be launched will be unaffected. From 78f70217b1cf5ce9c206276f4f0d8460ef6759d6 Mon Sep 17 00:00:00 2001 From: Marnik Bercx Date: Fri, 5 Dec 2025 14:34:06 +1000 Subject: [PATCH 6/8] =?UTF-8?q?=F0=9F=91=8C=20Improve=20validation=20error?= =?UTF-8?q?=20message?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/aiida/engine/processes/workchains/restart.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aiida/engine/processes/workchains/restart.py b/src/aiida/engine/processes/workchains/restart.py index eb80da4b35..7754c9b37d 100644 --- a/src/aiida/engine/processes/workchains/restart.py +++ b/src/aiida/engine/processes/workchains/restart.py @@ -40,7 +40,7 @@ def validate_on_unhandled_failure(value: None | orm.Str | orm.EnumData, _) -> No valid_options = ('abort', 'pause', 'restart_once', 'restart_and_pause') if value.value not in valid_options: - return f"Invalid value '{value.value}'. Must be one of: {', '.join(valid_options)}" + return f"`on_unhandled_failure`: '{value.value}'. Must be one of: {', '.join(valid_options)}" return None From 18c3fdb8471af4bdf9e622a8c2e6afffe7594800 Mon Sep 17 00:00:00 2001 From: Marnik Bercx Date: Fri, 5 Dec 2025 14:36:50 +1000 Subject: [PATCH 7/8] =?UTF-8?q?=E2=9D=8C=20Remove=20`EnumData`=20as=20a=20?= =?UTF-8?q?valid=20type?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/aiida/engine/processes/workchains/restart.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/aiida/engine/processes/workchains/restart.py b/src/aiida/engine/processes/workchains/restart.py index 7754c9b37d..1177466d4e 100644 --- a/src/aiida/engine/processes/workchains/restart.py +++ b/src/aiida/engine/processes/workchains/restart.py @@ -30,10 +30,10 @@ __all__ = ('BaseRestartWorkChain',) -def validate_on_unhandled_failure(value: None | orm.Str | orm.EnumData, _) -> None | str: +def validate_on_unhandled_failure(value: None | orm.Str, _) -> None | str: """Validator for the `on_unhandled_failure` input port. - :param value: the input `Str` or `EnumData` node + :param value: the input `Str` node """ if value is None: return None @@ -182,7 +182,7 @@ def define(cls, spec: 'ProcessSpec') -> None: # type: ignore[override] ) spec.input( 'on_unhandled_failure', - valid_type=(orm.Str, orm.EnumData), + valid_type=orm.Str, required=False, validator=validate_on_unhandled_failure, help='Action to take when an unhandled failure occurs. Options: "abort" (default, fail immediately), ' From 810732845600836ce6ef954070b695fbd915e468 Mon Sep 17 00:00:00 2001 From: Marnik Bercx Date: Tue, 9 Dec 2025 08:25:34 +1000 Subject: [PATCH 8/8] Review 1 --- docs/source/howto/workchains_restart.rst | 2 +- tests/engine/processes/workchains/test_restart.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/howto/workchains_restart.rst b/docs/source/howto/workchains_restart.rst index be04fe332c..d01e17e2f0 100644 --- a/docs/source/howto/workchains_restart.rst +++ b/docs/source/howto/workchains_restart.rst @@ -123,7 +123,7 @@ Configuring unhandled failure behavior Before v2.8, a ``BaseRestartWorkChain`` would always restart once for an unhandled failure. -There may be cases where a process experience a failure that has no corresponding error handler, but you still want to restart the process. +There may be cases where a process experiences a failure that has no corresponding error handler, but you still want to restart the process. A typical example here is a node failure, where you simply want to restart the process without any changes to the input. By default, a ``BaseRestartWorkChain`` will abort when it encounters a failure it cannot handle, but this behaviour can be changed through the ``on_unhandled_failure`` input. The options are: diff --git a/tests/engine/processes/workchains/test_restart.py b/tests/engine/processes/workchains/test_restart.py index 2eee9ec4b6..21d23e5eec 100644 --- a/tests/engine/processes/workchains/test_restart.py +++ b/tests/engine/processes/workchains/test_restart.py @@ -136,7 +136,7 @@ def test_unhandled_failure(generate_work_chain, generate_calculation_node, on_un if on_unhandled_failure == 'restart_once': assert result == engine.BaseRestartWorkChain.exit_codes.ERROR_UNHANDLED_FAILURE return - elif on_unhandled_failure == 'pause': + elif on_unhandled_failure == 'restart_and_pause': assert result is None assert process.paused