Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ name: ci

on: [push, pull_request]

env:
FORCE_COLOR: 1

jobs:
pre-commit:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v5

- name: Set up Python 3.13
- name: Set up Python 3.14
uses: actions/setup-python@v5
with:
python-version: '3.13'
python-version: '3.14'
allow-prereleases: true

- name: Install Python dependencies
run: pip install -e .[pre-commit]
Expand All @@ -25,7 +29,7 @@ jobs:

strategy:
matrix:
python-version: ['3.9', '3.10', '3.11', '3.12', '3.13']
python-version: ['3.9', '3.10', '3.11', '3.12', '3.13', '3.14']
fail-fast: false

services:
Expand All @@ -37,19 +41,18 @@ jobs:
steps:
- uses: actions/checkout@v5

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
- name: Set up uv
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uv installs python itself

uses: astral-sh/setup-uv@v7
with:
python-version: ${{ matrix.python-version }}
version: 0.9.4
activate-environment: true

- name: Install python dependencies
run: pip install .[tests]
- name: Install dependencies
run: uv sync

- name: Run pytest
run: pytest -s --cov=plumpy tests/

- name: Create xml coverage
run: coverage xml
run: pytest --cov=plumpy tests/

- name: Upload coverage to Codecov
if: github.repository == 'aiidateam/plumpy'
Expand Down
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ repos:
exclude: *exclude_ruff
args: [--fix, --exit-non-zero-on-fix, --show-fixes]

- repo: https://github.com/astral-sh/uv-pre-commit
rev: 0.10.4
hooks:
- id: uv-lock

- repo: local
hooks:
- id: mypy
Expand Down
2 changes: 1 addition & 1 deletion examples/process_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def run(self):

def main():
with kiwipy.connect('amqp://127.0.0.1') as communicator, tempfile.TemporaryDirectory() as tmpdir:
loop = asyncio.get_event_loop()
loop = plumpy.get_or_create_event_loop()
persister = plumpy.PicklePersister(tmpdir)
task_receiver = plumpy.ProcessLauncher(loop=loop, persister=persister)

Expand Down
22 changes: 10 additions & 12 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ classifiers = [
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13',
'Programming Language :: Python :: 3.14',
]
keywords = ['workflow', 'multithreaded', 'rabbitmq']
requires-python = '>=3.9'
dependencies = [
'greenback~=1.0',
'kiwipy[rmq]~=0.9.0',
'pyyaml~=6.0',
"greenback~=1.0",
"greenlet>=3.2.4", # We need to expicitly set our lowest bond, unfortunatly greenback does not pin the exact version
"kiwipy[rmq]~=0.9.0",
"pyyaml~=6.0",
]

[project.urls]
Home = 'https://github.com/aiidateam/plumpy'
Source = 'https://github.com/aiidateam/plumpy'
Expand All @@ -54,21 +55,17 @@ docs = [
'myst-nb~=1.2.0',
'sphinx~=7.2.0',
'sphinx-book-theme~=1.1.4',
'importlib-metadata~=4.12.0',
]
pre-commit = [
'mypy==1.18.2',
'pre-commit~=3.6',
'types-pyyaml~=6.0'
]
tests = [
'ipykernel==6.12.1',
'pytest~=8.4',
'pytest-asyncio~=0.12,<0.17',
'pytest-cov~=4.1',
'pytest-notebook>=0.8.0',
'shortuuid==1.0.8',
'importlib-resources~=5.2',
'pytest-cov~=7.0',
'shortuuid==1.0.13',
]

[tool.flit.module]
Expand Down Expand Up @@ -138,9 +135,10 @@ module = [
ignore_missing_imports = true

[tool.pytest.ini_options]
minversion = '6.0'
addopts = '--strict-config --strict-markers -ra --cov-report xml --cov-append'
minversion = '7.0'
testpaths = [
'test',
'tests',
]
filterwarnings = []

Expand Down
6 changes: 3 additions & 3 deletions src/plumpy/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import kiwipy

from . import futures
from . import events, futures
from .utils import ensure_coroutine

__all__ = [
Expand Down Expand Up @@ -124,7 +124,7 @@ def wrap_communicator(
return LoopCommunicator(communicator, loop)


class LoopCommunicator(kiwipy.Communicator): # type: ignore
class LoopCommunicator(kiwipy.Communicator): # type: ignore[misc]
"""Wrapper around a `kiwipy.Communicator` that schedules any subscriber messages on a given event loop."""

def __init__(self, communicator: kiwipy.Communicator, loop: Optional[asyncio.AbstractEventLoop] = None):
Expand All @@ -136,7 +136,7 @@ def __init__(self, communicator: kiwipy.Communicator, loop: Optional[asyncio.Abs
assert communicator is not None

self._communicator = communicator
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
self._loop = loop or events.get_or_create_event_loop()

def loop(self) -> asyncio.AbstractEventLoop:
return self._loop
Expand Down
66 changes: 19 additions & 47 deletions src/plumpy/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,32 @@

import asyncio
import sys
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Sequence
from typing import TYPE_CHECKING, Any, Callable, Dict, Sequence

__all__ = [
'PlumpyEventLoopPolicy',
'get_event_loop',
'new_event_loop',
'reset_event_loop_policy',
'set_event_loop',
'set_event_loop_policy',
]
__all__: list[str] = ['get_or_create_event_loop']

if TYPE_CHECKING:
from .processes import Process

get_event_loop = asyncio.get_event_loop


def set_event_loop(*args: Any, **kwargs: Any) -> None:
raise NotImplementedError('this method is not implemented because `plumpy` uses a single cached event loop')


def new_event_loop(*args: Any, **kwargs: Any) -> asyncio.AbstractEventLoop:
raise NotImplementedError('this method is not implemented because `plumpy` uses a single cached event loop')


class PlumpyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
"""Custom event policy that always returns the same cached event loop.

Reentrancy for nested process execution is handled via greenback bridging
in Process.execute() rather than by patching the event loop.
def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has the most important changes

"""Get the running event loop, or the current set loop, or create and set a new one.
Note: aiida should never call on asyncio.get_event_loop() directly.
"""

_loop: Optional[asyncio.AbstractEventLoop] = None

def get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Return the cached event loop."""
if self._loop is None:
self._loop = self.new_event_loop()
self.set_event_loop(self._loop)

return self._loop


def set_event_loop_policy() -> None:
"""Enable plumpy's event loop policy that caches a single event loop."""
asyncio.set_event_loop_policy(PlumpyEventLoopPolicy())
# Need to call the following explicitly for `asyncio.get_event_loop` to start calling the method of the new policy
# in case an loop is already active.
asyncio.get_event_loop_policy().get_event_loop()


def reset_event_loop_policy() -> None:
"""Reset the event loop policy to the default."""
asyncio.set_event_loop_policy(None)
try:
return asyncio.get_running_loop()
except RuntimeError:
pass
try:
# See issue https://github.com/aiidateam/plumpy/issues/336
loop = asyncio.get_event_loop()
if not loop.is_closed():
return loop
except RuntimeError:
pass
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop


class ProcessCallback:
Expand Down
4 changes: 3 additions & 1 deletion src/plumpy/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import kiwipy

from . import events

__all__ = ['CancelledError', 'Future', 'chain', 'copy_future', 'create_task', 'gather']

CancelledError = kiwipy.CancelledError
Expand Down Expand Up @@ -65,7 +67,7 @@ def create_task(coro: Callable[[], Awaitable[Any]], loop: Optional[asyncio.Abstr
:return: the future representing the outcome of the coroutine

"""
loop = loop or asyncio.get_event_loop()
loop = loop or events.get_or_create_event_loop()

future = loop.create_future()

Expand Down
4 changes: 2 additions & 2 deletions src/plumpy/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import yaml

from . import futures, loaders, utils
from . import events, futures, loaders, utils
from .base.utils import call_with_super_check, super_check
from .utils import PID_TYPE, SAVED_STATE_TYPE

Expand Down Expand Up @@ -636,7 +636,7 @@ def recreate_from(cls, saved_state: SAVED_STATE_TYPE, load_context: Optional[Loa
try:
loop = load_context.loop
except AttributeError:
loop = asyncio.get_event_loop()
loop = events.get_or_create_event_loop()

state = saved_state['_state']

Expand Down
4 changes: 2 additions & 2 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def __init__(
# Don't allow the spec to be changed anymore
self.spec().seal()

self._loop = loop if loop is not None else asyncio.get_event_loop()
self._loop = loop if loop is not None else events.get_or_create_event_loop()

self._setup_event_hooks()

Expand Down Expand Up @@ -655,7 +655,7 @@ def load_instance_state(self, saved_state: SAVED_STATE_TYPE, load_context: persi
if 'loop' in load_context:
self._loop = load_context.loop
else:
self._loop = asyncio.get_event_loop()
self._loop = events.get_or_create_event_loop()

self._state: process_states.State = self.recreate_state(saved_state['_state'])

Expand Down
5 changes: 2 additions & 3 deletions src/plumpy/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
import asyncio
import functools
import importlib
import inspect
Expand Down Expand Up @@ -195,10 +194,10 @@ def ensure_coroutine(coro_or_fn: Any) -> Callable[..., Awaitable[Any]]:
:param fct: the function
:returns: the coroutine
"""
if asyncio.iscoroutinefunction(coro_or_fn):
if inspect.iscoroutinefunction(coro_or_fn):
return coro_or_fn

if asyncio.iscoroutinefunction(coro_or_fn.__call__):
if inspect.iscoroutinefunction(coro_or_fn.__call__):
return coro_or_fn

if callable(coro_or_fn):
Expand Down
9 changes: 0 additions & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +0,0 @@
# -*- coding: utf-8 -*-
import pytest


@pytest.fixture(scope='session')
def set_event_loop_policy():
from plumpy import set_event_loop_policy

set_event_loop_policy()
40 changes: 0 additions & 40 deletions tests/notebooks/get_event_loop.ipynb

This file was deleted.

Loading
Loading