From 47f742969c2f6208278e78a635e5d4fe89601b17 Mon Sep 17 00:00:00 2001 From: Charlie Date: Thu, 5 Feb 2026 15:56:39 -0500 Subject: [PATCH] Fix pipeline workers for non-picklable callbacks --- src/heyfastqlib/pipelines.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/heyfastqlib/pipelines.py b/src/heyfastqlib/pipelines.py index fdfa276..7b1beb3 100644 --- a/src/heyfastqlib/pipelines.py +++ b/src/heyfastqlib/pipelines.py @@ -1,5 +1,5 @@ from itertools import islice -from multiprocessing import Pool +from multiprocessing.pool import ThreadPool from typing import Callable, Iterable, Iterator from .read import count_bases, R, ReadPipe @@ -82,8 +82,16 @@ def filter_reads( if chunk_size < 1: raise ValueError("chunk_size must be at least 1") + if threads == 1: + for chunk in _chunk_reads(rs, chunk_size): + out_chunk, chunk_counter = _filter_worker((chunk, f, kwargs)) + _merge_counters(counter, chunk_counter) + for r in out_chunk: + yield r + return + task_iter = ((chunk, f, kwargs) for chunk in _chunk_reads(rs, chunk_size)) - with Pool(processes=threads) as pool: + with ThreadPool(processes=threads) as pool: for out_chunk, chunk_counter in pool.imap( _filter_worker, task_iter, chunksize=1 ): @@ -109,8 +117,16 @@ def map_reads( if chunk_size < 1: raise ValueError("chunk_size must be at least 1") + if threads == 1: + for chunk in _chunk_reads(rs, chunk_size): + out_chunk, chunk_counter = _map_worker((chunk, f, kwargs)) + _merge_counters(counter, chunk_counter) + for r in out_chunk: + yield r + return + task_iter = ((chunk, f, kwargs) for chunk in _chunk_reads(rs, chunk_size)) - with Pool(processes=threads) as pool: + with ThreadPool(processes=threads) as pool: for out_chunk, chunk_counter in pool.imap(_map_worker, task_iter, chunksize=1): _merge_counters(counter, chunk_counter) for r in out_chunk: