diff --git a/docs/source/howto/workchains_restart.rst b/docs/source/howto/workchains_restart.rst index cb6d852c3b..d01e17e2f0 100644 --- a/docs/source/howto/workchains_restart.rst +++ b/docs/source/howto/workchains_restart.rst @@ -57,22 +57,102 @@ The work chain runs the subprocess. Once it has finished, it then inspects the status. If the subprocess finished successfully, the work chain returns the results and its job is done. If, instead, the subprocess failed, the work chain should inspect the cause of failure, and attempt to fix the problem and restart the subprocess. -This cycle is repeated until the subprocess finishes successfully. -Of course this runs the risk of entering into an infinite loop if the work chain never manages to fix the problem, so we want to build in a limit to the maximum number of calculations that can be re-run: +This cycle is repeated until either the subprocess finishes successfully or a maximum number of iterations is reached. -.. _workflow-error-handling-flow-loop: -.. figure:: include/images/workflow_error_handling_flow_loop.png - :align: center - :height: 500px - - An improved flow diagram for the base work chain that limits the maximum number of iterations that the work chain can try and get the calculation to finish successfully. - -Since this is such a common logical flow for a base work chain that is to wrap another :py:class:`~aiida.engine.processes.process.Process` and restart it until it is finished successfully, we have implemented it as an abstract base class in ``aiida-core``. +Since this is such a common logical flow for a base work chain that wraps another :py:class:`~aiida.engine.processes.process.Process` and restarts it until it is finished successfully, we have implemented it as an abstract base class in ``aiida-core``. The :py:class:`~aiida.engine.processes.workchains.restart.BaseRestartWorkChain` implements the logic of the flow diagram shown above. Although the ``BaseRestartWorkChain`` is a subclass of :py:class:`~aiida.engine.processes.workchains.workchain.WorkChain` itself, you cannot launch it. The reason is that it is completely general and so does not know which :py:class:`~aiida.engine.processes.process.Process` class it should run. Instead, to make use of the base restart work chain, you should subclass it for the process class that you want to wrap. +Running a ``BaseRestartWorkChain`` +================================== + +Many plugin packages have already implemented a ``BaseRestartWorkChain`` for the codes they support, such as the ``PwBaseWorkChain`` in ``aiida-quantumespresso`` or the ``Cp2kBaseWorkChain`` in ``aiida-cp2k``. +The inputs will depend on the calculation and choices of the developer, but there are several default inputs you can configure to control the behavior of any ``BaseRestartWorkChain``. + +Specifying the maximum number of iterations +-------------------------------------------- + +To prevent a work chain from entering an infinite loop if it never manages to fix the problem, the ``BaseRestartWorkChain`` limits the maximum number of times the subprocess can be restarted. +This is controlled by the ``max_iterations`` input, which defaults to ``5``: + +.. code-block:: python + + from aiida.orm import Int + + inputs = { + 'process_inputs': { + 'input_1': value_1, + 'input_2': value_2 + }, + 'max_iterations': Int(10) + } + submit(SomeBaseWorkChain, **inputs) + +If the subprocess fails and is restarted repeatedly until ``max_iterations`` is reached without succeeding, the work chain will abort with exit code ``401`` (``ERROR_MAXIMUM_ITERATIONS_EXCEEDED``). + + +Handler overrides +----------------- + +It is possible to change the priority of handlers and enable/disable them without changing the source code of the work chain. +These properties of the handlers can be controlled through the ``handler_overrides`` input of the work chain. +This input takes a ``Dict`` node, that has the following form: + +.. code-block:: python + + handler_overrides = Dict({ + 'handler_negative_sum': { # Insert the name of the process handler here + 'enabled': True, + 'priority': 10000 + } + }) + +As you can see, the keys are the name of the handler to affect and the value is a dictionary that can take two keys: ``enabled`` and ``priority``. +To enable or disable a handler, set ``enabled`` to ``True`` or ``False``, respectively. +The ``priority`` key takes an integer and determines the priority of the handler. +Note that the values of the ``handler_overrides`` are fully optional and will override the values configured by the process handler decorator in the source code of the work chain. +The changes also only affect the work chain instance that receives the ``handler_overrides`` input, all other instances of the work chain that will be launched will be unaffected. + + +Configuring unhandled failure behavior +-------------------------------------- + +.. versionadded:: 2.8 + + Before v2.8, a ``BaseRestartWorkChain`` would always restart once for an unhandled failure. + +There may be cases where a process experiences a failure that has no corresponding error handler, but you still want to restart the process. +A typical example here is a node failure, where you simply want to restart the process without any changes to the input. +By default, a ``BaseRestartWorkChain`` will abort when it encounters a failure it cannot handle, but this behaviour can be changed through the ``on_unhandled_failure`` input. +The options are: + +``abort`` (default) + The work chain immediately aborts with exit code ``402`` (``ERROR_UNHANDLED_FAILURE``). + This is the most conservative option and prevents wasting computational resources by rerunning calculations that will likely fail again with the same inputs. + +``pause`` + The work chain pauses for user inspection. + This allows you to examine the failed subprocess and decide whether to continue or abort. + When paused, you can: + + - Use ``verdi process report `` to inspect the work chain's progress and error messages. + - Use ``verdi process play `` to resume the work chain (e.g., if the failure was due to a transient issue like a node failure). + - Use ``verdi process kill `` to abort the work chain if the problem cannot be resolved. + +``restart_once`` + The work chain will automatically restart the subprocess once. + If the subprocess fails again with another unhandled failure, the work chain aborts with ``ERROR_UNHANDLED_FAILURE``. + +``restart_and_pause`` + The work chain combines the previous two strategies: it restarts the subprocess once, and if that also results in an unhandled failure, it pauses for user inspection rather than aborting. + This provides a balance between automatic recovery and user control. + +.. seealso:: + + You may be wondering if it's also possible to change the inputs and restart after inspecting the failure of a paused ``BaseRestartWorkChain``. + This is currently not yet possible, but we're exploring this possibility, see `the following blog post `_. Writing a base restart work chain ================================= @@ -120,7 +200,7 @@ Next, as with all work chains, we should *define* its process specification: The inputs and output that we define are essentially determined by the sub process that the work chain will be running. Since the ``ArithmeticAddCalculation`` requires the inputs ``x`` and ``y``, and produces the ``sum`` as output, we `mirror` those in the specification of the work chain, otherwise we wouldn't be able to pass the necessary inputs. -Finally, we define the logical outline, which if you look closely, resembles the logical flow chart presented in :numref:`workflow-error-handling-flow-loop` a lot. +Finally, we define the logical outline. We start by *setting up* the work chain and then enter a loop: *while* the subprocess has not yet finished successfully *and* we haven't exceeded the maximum number of iterations, we *run* another instance of the process and then *inspect* the results. The while conditions are implemented in the ``should_run_process`` outline step. When the process finishes successfully or we have to abandon, we report the *results*. @@ -190,7 +270,7 @@ As you can see the work chain launched a single instance of the ``ArithmeticAddC Indeed, when updating an existing work chain file or adding a new one, it is **necessary** to restart the daemon **every time** after all changes have taken place. Exposing inputs and outputs -=========================== +--------------------------- Any base restart work chain *needs* to *expose* the inputs of the subprocess it wraps, and most likely *wants* to do the same for the outputs it produces, although the latter is not necessary. For the simple example presented in the previous section, simply copy-pasting the input and output port definitions of the subprocess ``ArithmeticAddCalculation`` was not too troublesome. @@ -253,7 +333,7 @@ When submitting or running the work chain using namespaced inputs (``add`` in th Customizing outputs -=================== +------------------- By default, the ``BaseRestartWorkChain`` will attach the exposed outputs of the last completed calculation job. In most cases this is the correct behavior, but there might be use-cases where one wants to modify exactly what outputs are attached to the work chain. @@ -273,7 +353,7 @@ In this case, it is important to go through a ``calcfunction``, as always, as to Attaching outputs -================= +----------------- In a normal run, the ``results`` method is the last step in the outline of the ``BaseRestartWorkChain``. In this step, the outputs of the last completed calculation job are "attached" to the work chain itself. @@ -284,7 +364,7 @@ In this case the work chain will be stopped immediately and the ``results`` step Error handling -============== +-------------- So far you have seen how easy it is to get a work chain up and running that will run a subprocess using the ``BaseRestartWorkChain``. However, the whole point of this exercise, as described in the introduction, was for the work chain to be able to deal with *failing* processes, yet in the previous example it finished without any problems. @@ -309,14 +389,11 @@ This time we will see that the work chain takes quite a different path: $ verdi process status 1930 ArithmeticAddBaseWorkChain<1930> Finished [402] [1:while_(should_run_process)(1:inspect_process)] - ├── ArithmeticAddCalculation<1931> Finished [410] - └── ArithmeticAddCalculation<1934> Finished [410] - -As expected, the ``ArithmeticAddCalculation`` failed this time with a ``410``. -The work chain noticed the failure when inspecting the result of the subprocess in ``inspect_process``, and in keeping with its name and design, restarted the calculation. -However, since the inputs were not changed, the calculation inevitably and wholly expectedly failed once more with the exact same error code. -Unlike after the first iteration, however, the work chain did not restart again, but gave up and returned the exit code ``402`` itself, which stands for ``ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE``. -As the name suggests, the work chain tried to run the subprocess but it failed twice in a row without the problem being *handled*. + └── ArithmeticAddCalculation<1931> Finished [410] + +As expected, the ``ArithmeticAddCalculation`` failed with a ``410``. +The work chain noticed the failure when inspecting the result of the subprocess in ``inspect_process``, but since no process handler dealt with this error (we haven't written any yet), it is considered an *unhandled failure*. +By default, when encountering an unhandled failure, the work chain will abort immediately and return the exit code ``402`` (``ERROR_UNHANDLED_FAILURE``). The obvious question now of course is: "How exactly can we instruct the base work chain to handle certain problems?" Since the problems are necessarily dependent on the subprocess that the work chain will run, it cannot be implemented by the ``BaseRestartWorkChain`` class itself, but rather will have to be implemented by the subclass. @@ -384,7 +461,7 @@ Instead of having a conditional at the start of each handler to compare the exit If the ``exit_codes`` keyword is defined, which can be either a single instance of :class:`~aiida.engine.processes.exit_code.ExitCode` or a list thereof, the process handler will only be called if the exit status of the node corresponds to one of those exit codes, otherwise it will simply be skipped. Multiple process handlers -========================= +------------------------- Since typically a base restart work chain implementation will have more than one process handler, one might want to control the order in which they are called. This can be done through the ``priority`` keyword: @@ -432,26 +509,3 @@ The base restart work chain will detect this exit code and abort the work chain, └── ArithmeticAddCalculation<1952> Finished [410] With these basic tools, a broad range of use-cases can be addressed while preventing a lot of boilerplate code. - - -Handler overrides -================= - -It is possible to change the priority of handlers and enable/disable them without changing the source code of the work chain. -These properties of the handlers can be controlled through the ``handler_overrides`` input of the work chain. -This input takes a ``Dict`` node, that has the following form: - -.. code-block:: python - - handler_overrides = Dict({ - 'handler_negative_sum': { - 'enabled': True, - 'priority': 10000 - } - }) - -As you can see, the keys are the name of the handler to affect and the value is a dictionary that can take two keys: ``enabled`` and ``priority``. -To enable or disable a handler, set ``enabled`` to ``True`` or ``False``, respectively. -The ``priority`` key takes an integer and determines the priority of the handler. -Note that the values of the ``handler_overrides`` are fully optional and will override the values configured by the process handler decorator in the source code of the work chain. -The changes also only affect the work chain instance that receives the ``handler_overrides`` input, all other instances of the work chain that will be launched will be unaffected. diff --git a/src/aiida/engine/processes/workchains/restart.py b/src/aiida/engine/processes/workchains/restart.py index 34544704f2..1177466d4e 100644 --- a/src/aiida/engine/processes/workchains/restart.py +++ b/src/aiida/engine/processes/workchains/restart.py @@ -8,6 +8,8 @@ ########################################################################### """Base implementation of `WorkChain` class that implements a simple automated restart mechanism for sub processes.""" +from __future__ import annotations + import functools from inspect import getmembers from types import FunctionType @@ -28,6 +30,21 @@ __all__ = ('BaseRestartWorkChain',) +def validate_on_unhandled_failure(value: None | orm.Str, _) -> None | str: + """Validator for the `on_unhandled_failure` input port. + + :param value: the input `Str` node + """ + if value is None: + return None + + valid_options = ('abort', 'pause', 'restart_once', 'restart_and_pause') + if value.value not in valid_options: + return f"`on_unhandled_failure`: '{value.value}'. Must be one of: {', '.join(valid_options)}" + + return None + + def validate_handler_overrides( process_class: type['BaseRestartWorkChain'], handler_overrides: Optional[orm.Dict], ctx: 'PortNamespace' ) -> Optional[str]: @@ -163,6 +180,15 @@ def define(cls, spec: 'ProcessSpec') -> None: # type: ignore[override] 'can define the ``enabled`` and ``priority`` key, which can be used to toggle the values set on ' 'the original process handler declaration.', ) + spec.input( + 'on_unhandled_failure', + valid_type=orm.Str, + required=False, + validator=validate_on_unhandled_failure, + help='Action to take when an unhandled failure occurs. Options: "abort" (default, fail immediately), ' + '"pause" (pause the workchain for inspection), "restart_once" (restart once then abort), ' + '"restart_and_pause" (restart once then pause if still failing).', + ) spec.exit_code(301, 'ERROR_SUB_PROCESS_EXCEPTED', message='The sub process excepted.') spec.exit_code(302, 'ERROR_SUB_PROCESS_KILLED', message='The sub process was killed.') spec.exit_code( @@ -170,8 +196,8 @@ def define(cls, spec: 'ProcessSpec') -> None: # type: ignore[override] ) spec.exit_code( 402, - 'ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE', - message='The process failed for an unknown reason, twice in a row.', + 'ERROR_UNHANDLED_FAILURE', + message='The process failed with an unhandled failure.', ) def setup(self) -> None: @@ -225,9 +251,16 @@ def inspect_process(self) -> Optional['ExitCode']: If the process is excepted or killed, the work chain will abort. Otherwise any attached handlers will be called in order of their specified priority. If the process was failed and no handler returns a report indicating that - the error was handled, it is considered an unhandled process failure and the process is relaunched. If this - happens twice in a row, the work chain is aborted. In the case that at least one handler returned a report the - following matrix determines the logic that is followed: + the error was handled, it is considered an unhandled process failure. The behavior depends on the + `on_unhandled_failure` input: + + - `abort` (default): Abort immediately with ERROR_UNHANDLED_FAILURE + - `pause`: Pause the workchain for user inspection + - `restart_once`: Restart once, then abort if it fails again + - `restart_and_pause`: Restart once, then pause if it fails again + + In the case that at least one handler returned a report the following matrix determines the logic that is + followed: Process Handler Handler Action result report? exit code @@ -274,14 +307,58 @@ def inspect_process(self) -> Optional['ExitCode']: # If the process failed and no handler returned a report we consider it an unhandled failure if node.is_failed and not last_report: - if self.ctx.unhandled_failure: - template = '{}<{}> failed and error was not handled for the second consecutive time, aborting' - self.report(template.format(*report_args)) - return self.exit_codes.ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE - - self.ctx.unhandled_failure = True - self.report('{}<{}> failed and error was not handled, restarting once more'.format(*report_args)) - return None + action = self.inputs.get('on_unhandled_failure', None) + action = action.value if action is not None else 'abort' + + if action == 'abort': + self.report(f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure, aborting') + return self.exit_codes.ERROR_UNHANDLED_FAILURE + elif action == 'pause': + self.report( + f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure, pausing for inspection' + ) + self.report( + 'If you believe that the issue can be resolved by just retrying the same execution ' + '(e.g., in case of a node failure), you can just replay this work chain using ' + f'`verdi process play {self.node.pk}`. Otherwise, you can kill the work chain using ' + f'`verdi process kill {self.node.pk}`.' + ) + self.pause(f"Paused for user inspection, see: 'verdi process report {self.node.pk}'") + return None + elif action == 'restart_once': + if self.ctx.unhandled_failure: + self.report( + f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure for the second ' + 'consecutive time, aborting' + ) + return self.exit_codes.ERROR_UNHANDLED_FAILURE + self.ctx.unhandled_failure = True + self.report( + f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure, restarting once more' + ) + return None + elif action == 'restart_and_pause': + if self.ctx.unhandled_failure: + self.report( + f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure for the second ' + 'consecutive time, pausing for inspection' + ) + self.report( + 'If you believe that the issue can be resolved by just retrying the same execution ' + '(e.g., in case of a node failure), you can just replay this work chain using ' + f'`verdi process play {self.node.pk}`. Otherwise, you can kill the work chain using ' + f'`verdi process kill {self.node.pk}`.' + ) + # reset the unhandled failure flag, so that after replaying, it will + # try again twice for future errors + self.ctx.unhandled_failure = False + self.pause(f"Paused for user inspection, see: 'verdi process report {self.node.pk}'") + return None + self.ctx.unhandled_failure = True + self.report( + f'{self.ctx.process_name}<{node.pk}> failed with an unhandled failure, restarting once more' + ) + return None # Here either the process finished successful or at least one handler returned a report so it can no longer be # considered to be an unhandled failed process and therefore we reset the flag diff --git a/tests/engine/processes/workchains/test_restart.py b/tests/engine/processes/workchains/test_restart.py index bf870292d7..21d23e5eec 100644 --- a/tests/engine/processes/workchains/test_restart.py +++ b/tests/engine/processes/workchains/test_restart.py @@ -113,28 +113,38 @@ def test_killed_process(generate_work_chain, generate_calculation_node): @pytest.mark.requires_rmq -def test_unhandled_failure(generate_work_chain, generate_calculation_node): - """Test the unhandled failure mechanism. - - The workchain should be aborted if there are two consecutive failed sub processes that went unhandled. We simulate - it by setting `ctx.unhandled_failure` to True and append two failed process nodes in `ctx.children`. - """ - process = generate_work_chain(SomeWorkChain, {}) +@pytest.mark.parametrize('on_unhandled_failure', (None, 'abort', 'pause', 'restart_once', 'restart_and_pause')) +def test_unhandled_failure(generate_work_chain, generate_calculation_node, on_unhandled_failure): + """Test the `on_unhandled_failure` input and behavior.""" + process = generate_work_chain(SomeWorkChain, {'on_unhandled_failure': on_unhandled_failure}) process.setup() process.ctx.children = [generate_calculation_node(exit_status=100)] - assert process.inspect_process() is None - assert process.ctx.unhandled_failure is True + result = process.inspect_process() + if on_unhandled_failure in (None, 'abort'): + assert result == engine.BaseRestartWorkChain.exit_codes.ERROR_UNHANDLED_FAILURE + return + elif on_unhandled_failure == 'pause': + assert result is None + assert process.paused + return + assert result is None + assert process.ctx.unhandled_failure is True process.ctx.children.append(generate_calculation_node(exit_status=100)) - assert ( - process.inspect_process() == engine.BaseRestartWorkChain.exit_codes.ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE - ) + result = process.inspect_process() + + if on_unhandled_failure == 'restart_once': + assert result == engine.BaseRestartWorkChain.exit_codes.ERROR_UNHANDLED_FAILURE + return + elif on_unhandled_failure == 'restart_and_pause': + assert result is None + assert process.paused @pytest.mark.requires_rmq def test_unhandled_reset_after_success(generate_work_chain, generate_calculation_node): """Test `ctx.unhandled_failure` is reset to `False` in `inspect_process` after a successful process.""" - process = generate_work_chain(SomeWorkChain, {}) + process = generate_work_chain(SomeWorkChain, {'on_unhandled_failure': orm.Str('restart_once')}) process.setup() process.ctx.children = [generate_calculation_node(exit_status=100)] assert process.inspect_process() is None @@ -148,7 +158,7 @@ def test_unhandled_reset_after_success(generate_work_chain, generate_calculation @pytest.mark.requires_rmq def test_unhandled_reset_after_handled(generate_work_chain, generate_calculation_node): """Test `ctx.unhandled_failure` is reset to `False` in `inspect_process` after a handled failed process.""" - process = generate_work_chain(SomeWorkChain, {}) + process = generate_work_chain(SomeWorkChain, {'on_unhandled_failure': orm.Str('restart_once')}) process.setup() process.ctx.children = [generate_calculation_node(exit_status=300)] assert process.inspect_process() is None