Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions src/heyfastqlib/pipelines.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from itertools import islice
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from typing import Callable, Iterable, Iterator

from .read import count_bases, R, ReadPipe
Expand Down Expand Up @@ -82,8 +82,16 @@ def filter_reads(
if chunk_size < 1:
raise ValueError("chunk_size must be at least 1")

if threads == 1:
for chunk in _chunk_reads(rs, chunk_size):
out_chunk, chunk_counter = _filter_worker((chunk, f, kwargs))
_merge_counters(counter, chunk_counter)
for r in out_chunk:
yield r
return

task_iter = ((chunk, f, kwargs) for chunk in _chunk_reads(rs, chunk_size))
with Pool(processes=threads) as pool:
with ThreadPool(processes=threads) as pool:
for out_chunk, chunk_counter in pool.imap(
_filter_worker, task_iter, chunksize=1
):
Expand All @@ -109,8 +117,16 @@ def map_reads(
if chunk_size < 1:
raise ValueError("chunk_size must be at least 1")

if threads == 1:
for chunk in _chunk_reads(rs, chunk_size):
out_chunk, chunk_counter = _map_worker((chunk, f, kwargs))
_merge_counters(counter, chunk_counter)
for r in out_chunk:
yield r
return

task_iter = ((chunk, f, kwargs) for chunk in _chunk_reads(rs, chunk_size))
with Pool(processes=threads) as pool:
with ThreadPool(processes=threads) as pool:
for out_chunk, chunk_counter in pool.imap(_map_worker, task_iter, chunksize=1):
_merge_counters(counter, chunk_counter)
for r in out_chunk:
Expand Down
Loading