Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/testmaster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
with:
activate-environment: pyleo
environment-file: environment.yml
python-version: "3.11.0"
python-version: "3.12.11"
auto-activate-base: false

- name: Conda list
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<!---[![PyPI](https://img.shields.io/pypi/dm/pyleoclim.svg)](https://pypi.python.org/pypi/Pyleoclim)-->
[![PyPI version](https://badge.fury.io/py/pyleoclim.svg)](https://badge.fury.io/py/pyleoclim)
[![PyPI](https://img.shields.io/badge/python-3.10-yellow.svg)]()
[![PyPI](https://img.shields.io/badge/python-3.12-yellow.svg)]()
[![license](https://img.shields.io/github/license/linkedearth/Pyleoclim_util.svg)]()
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.6999279.svg)](https://doi.org/10.5281/zenodo.6999279)
[![NSF-1541029](https://img.shields.io/badge/NSF-1541029-blue.svg)](https://nsf.gov/awardsearch/showAward?AWD_ID=1541029)
Expand Down
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ channels:
- default
- conda-forge
dependencies:
- python=3.11.0
- python=3.12.11
- cartopy
- numba
- scipy
Expand Down
112 changes: 67 additions & 45 deletions pyleoclim/core/multipleseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@
to such a collection at once (e.g. process a bunch of series in a consistent fashion).
"""

import sys
import multiprocessing as mp

if sys.platform == 'darwin': # macOS
try:
mp.set_start_method('fork', force=True)
except RuntimeError:
pass # Already set



from ..utils import tsutils, plotting, jsonutils
from ..utils import correlation as corrutils

Expand All @@ -29,22 +40,6 @@
from scipy import stats
from statsmodels.multivariate.pca import PCA

import dill
import multiprocessing

# Set `dill` as the pickler for multiprocessing
multiprocessing.set_start_method("spawn", force=True)
multiprocessing.get_context("spawn").reduce = dill.dumps
multiprocessing.get_context("spawn").rebuild = dill.loads

from contextlib import contextmanager

@contextmanager
def _get_process_pool():
ctx = multiprocessing.get_context("spawn")
with ProcessPoolExecutor(mp_context=ctx) as executor:
yield executor


def _run_parallel_spectral(args):
"""Helper function to call Series.spectral in parallel."""
Expand Down Expand Up @@ -1380,33 +1375,64 @@ def spectral(self, method='lomb_scargle', freq=None, settings=None, mute_pbar=Fa

# main function
settings = {} if settings is None else settings.copy()
psd_list =[]

if method in ['wwz','cwt'] and scalogram_list:

# Prepare arguments
if method in ['wwz', 'cwt'] and scalogram_list:
scalogram_list_len = len(scalogram_list.scalogram_list)
series_len = len(self.series_list)

# Prepare arguments for parallel execution

args = [
(s, idx, scalogram_list if scalogram_list_len >= series_len else None, method, settings, freq, freq_kwargs, label, verbose)
(s, idx, scalogram_list if scalogram_list_len >= series_len else None,
method, settings, freq, freq_kwargs, label, verbose)
for idx, s in enumerate(self.series_list)
]
]
else:
args = [
(s, idx, None, method, settings, freq, freq_kwargs, label, verbose)
for idx, s in enumerate(self.series_list)
]


# Parallel processing with ProcessPoolExecutor
with _get_process_pool() as executor:
psd_list = list(tqdm(executor.map(_run_parallel_spectral, args),
total=len(args),
desc='Performing spectral analysis on individual series',
position=0, leave=True, disable=mute_pbar))
]

if method == 'lomb_scargle' and sys.platform == 'darwin':
print('Using serial execution for this method on a Mac platform')
psd_list = []

for idx, s in enumerate(tqdm(self.series_list,
desc='Performing spectral analysis on individual series',
disable=mute_pbar)):
if method in ['wwz', 'cwt'] and scalogram_list:
scalogram_list_len = len(scalogram_list.scalogram_list)
series_len = len(self.series_list)
if scalogram_list_len >= series_len and idx < scalogram_list_len:
psd = s.spectral(
method=method, settings=settings, freq=freq,
freq_kwargs=freq_kwargs, label=label, verbose=verbose,
scalogram=scalogram_list.scalogram_list[idx],
)
else:
psd = s.spectral(
method=method, settings=settings, freq=freq,
freq_kwargs=freq_kwargs, label=label, verbose=verbose,
)
else:
psd = s.spectral(
method=method, settings=settings, freq=freq,
freq_kwargs=freq_kwargs, label=label, verbose=verbose,
)
psd_list.append(psd)

else:
# Parallel processing
with mp.Pool(processes=mp.cpu_count()) as pool:
psd_list = list(tqdm(
pool.imap(_run_parallel_spectral, args),
total=len(args),
desc='Performing spectral analysis on individual series',
disable=mute_pbar
))

return MultiplePSD(psd_list=psd_list)


def wavelet(self, method='cwt', settings={}, freq=None, freq_kwargs=None, verbose=False, mute_pbar=False):
'''Wavelet analysis

Expand Down Expand Up @@ -1490,24 +1516,20 @@ def wavelet(self, method='cwt', settings={}, freq=None, freq_kwargs=None, verbos

settings = {} if settings is None else settings.copy()

# Prepare arguments for parallel execution
# Prepare arguments
args = [
(s, method, settings, freq, freq_kwargs, verbose)
for s in self.series_list
]

# Parallel processing of the wavelet functionality
with _get_process_pool() as executor:
scal_list = list(
tqdm(
executor.map(_run_parallel_wavelet, args),
total=len(args),
desc='Performing wavelet analysis on individual series',
position=0,
leave=True,
disable=mute_pbar,
)
)
# Parallel processing
with mp.Pool(processes=mp.cpu_count()) as pool:
scal_list = list(tqdm(
pool.imap(_run_parallel_wavelet, args),
total=len(args),
desc='Performing wavelet analysis on individual series',
disable=mute_pbar
))

return MultipleScalogram(scalogram_list=scal_list)

Expand Down
11 changes: 10 additions & 1 deletion pyleoclim/tests/test_core_PSD.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import pytest

import pyleoclim as pyleo

import pyleoclim as pyleo

# Tests below
class TestUiPsdPlot:
Expand All @@ -41,3 +41,12 @@ def test_signif_test_t0(self, method, gen_ts):
psd_signif = psd.signif_test(number=10, method=method)
fig, ax = psd_signif.plot()
pyleo.closefig(fig)

# Test specifically for Lomb-Scargle method

def test_signif_t1(self, gen_ts):
"""Test with Lomb-Scargle to make sure parallel processing is working or bypassed correctly"""
ts = gen_ts()
psd = ts.spectral(method="lomb_scargle")
psd_signif = psd.signif_test(number=10)

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ dependencies = [
"pyyaml",
"beautifulsoup4",
"scipy>=1.15.0",
"requests",
"dill",
"requests"
]

[project.optional-dependencies]
Expand Down
Loading