Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ jobs:
- uses: julia-actions/cache@v1
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
with:
num_threads: 4
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v3
with:
Expand Down
31 changes: 16 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
# ReadWorkWrite.jl

[![CI](https://github.com/jerlich/ReadWorkWrite.jl/actions/workflows/CI.yml/badge.svg)](https://github.com/jerlich/ReadWorkWrite.jl/actions/workflows/CI.yml)

A Julia package for efficient parallel processing pipelines that separates IO-bound operations from CPU-intensive work.

## Overview

ReadWorkWrite.jl implements a pattern where:
- **Read**: Single-threaded IO operations (loading files from disk)
- **Read**: Single-threaded IO (loading files from disk)
- **Work**: Multi-threaded CPU-intensive processing (e.g., MCMC sampling, data analysis)
- **Write**: Single-threaded IO operations (writing to databases, files)
- **Write**: Single-threaded IO (writing to databases, files)

This design prevents threading issues with IO operations, and minimizing memory requirements, while maximizing parallelization for computational work.

## Motivation

This design prevents threading issues with IO operations while maximizing parallelization for computational work.
I am a [neuroscientist](https://www.sainsburywellcome.org/web/groups/erlich-lab). In my work, we often need to process data in batches and these processes are CPU bound, not IO bound. For example, we might need to do model comparison on many thousands of neurons.

Other packages, like [Folds.jl](https://github.com/JuliaFolds/Folds.jl) or [ThreadsX.jl](https://github.com/tkf/ThreadsX.jl) are convenient for multi-core or multi-threaded `map` like functions. But they process an entire iterator together so if you have thousands of elements the process may be more memory intensive than is necessary.

This package, ReadWorkWrite.jl, takes advantage of Base Channels and Threads to read in data _only as fast as the workers can handle them_.

## Installation

Expand Down Expand Up @@ -56,6 +66,8 @@ readworkwrite(load_data, process_data, save_results, filenames)

For complete working examples, see the `examples/` directory.

For additional usage patterns and advanced features (like early stopping, type inference, and structured data handling), check out `test/runtests.jl`.

## API Reference

### `readworkwrite(readfn, workfn, writefn, data; nworkers=Threads.nthreads(), buf=nworkers+2)`
Expand Down Expand Up @@ -83,6 +95,7 @@ Execute work-write pipeline, skipping the read step.

See the `examples/` directory for complete working examples including MCMC analysis with Turing.jl.


## Key Features

- **Thread Safety**: IO operations remain single-threaded to avoid concurrency issues
Expand All @@ -91,15 +104,3 @@ See the `examples/` directory for complete working examples including MCMC analy
- **Scalable**: Automatically uses available CPU threads for work processing
- **Order Independence**: Handles unordered results from parallel processing

## Design Rationale

Many applications need to:
1. Load data from files/databases (IO-bound, often not thread-safe)
2. Perform expensive computations (CPU-bound, benefits from parallelization)
3. Save results (IO-bound, often requires serialization)

ReadWorkWrite.jl provides a clean abstraction for this common pattern while handling the complexities of thread coordination and backpressure management.

## License

MIT License
95 changes: 68 additions & 27 deletions src/ReadWorkWrite.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,92 @@ Run a pipeline:
Data flows through channels with backpressure, up to `buf` in-flight items.
"""# General form: explicit writer
function readworkwrite(rf, wf, wrf::Function, data;
nworkers=Threads.nthreads(), buf=nworkers+2)
in_ch = Channel{Any}(buf)
out_ch = Channel{Any}(buf)
nworkers=Threads.nthreads(), buf=nworkers+2, T=nothing,
return_on_completion=true)
# Type inference from first element
if T === nothing
first_elem = first(data)
first_read = rf(first_elem)
first_work = wf(first_read)
InType = typeof(first_read)
OutType = typeof(first_work)
in_ch = Channel{InType}(buf)
out_ch = Channel{OutType}(buf)

# Put the already-computed first work item
put!(out_ch, first_work)
remaining_data = Iterators.drop(data, 1)
else
in_ch = Channel{T}(buf)
out_ch = Channel{T}(buf)
remaining_data = data
end

writer = @async begin
for out in out_ch
wrf(out)
end
end

@async begin
for d in data
put!(in_ch, rf(d))
reader = @async begin
try
for d in remaining_data
put!(in_ch, rf(d))
end
catch e
@debug e
if e isa MethodError || e isa TypeError
throw(ArgumentError("Type mismatch detected. Your reader `rf` produces inconsistent types. Try using T=Any for heterogeneous data processing."))
else
rethrow(e)
end
finally
close(in_ch)
end
close(in_ch)
end

@sync begin
for wid in 1:nworkers
Threads.@spawn begin
for item in in_ch
put!(out_ch, wf(item))
end
workers = [Threads.@spawn begin
try
for item in in_ch
put!(out_ch, wf(item))
end
catch e
@debug e
if e isa MethodError || e isa TypeError
throw(ArgumentError("Type mismatch detected in work function. Your pipeline produces inconsistent types. Try using T=Any for heterogeneous data processing."))
else
rethrow(e)
end
end
end for _ in 1:nworkers]

if return_on_completion
try
wait(reader) # Wait for reader to finish
wait.(workers) # Wait for all worker tasks
close(out_ch) # Close output channel after workers finish
wait(writer) # Now writer can finish
catch e
if e isa TaskFailedException
# Unwrap the nested exception for cleaner error messages
rethrow(e.task.exception)
else
rethrow(e)
end
end
return nothing
else
return (;in_ch, out_ch, writer, workers, reader)
end

close(out_ch)
wait(writer)
return nothing
end

# Mutating variant: push into given vector
function readworkwrite(rf, wf, results::Vector, data;
nworkers=Threads.nthreads(), buf=nworkers+2)
readworkwrite(rf, wf, x -> push!(results, x), data;
nworkers=nworkers, buf=buf)
return results
function readworkwrite(rf, wf, results::Vector, data; kwargs...)
readworkwrite(rf, wf, x -> push!(results, x), data; kwargs...)
end

function workwrite(wf, results::Vector, data;
nworkers=Threads.nthreads(), buf=nworkers+2)
readworkwrite((x)->x, wf, x -> push!(results, x), data;
nworkers=nworkers, buf=buf)
return results
function workwrite(wf, results::Vector, data; kwargs...)
readworkwrite((x)->x, wf, x -> push!(results, x), data; kwargs...)
end


Expand Down
95 changes: 95 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ io = IOBuffer()
dummy_write(x) = println(io, JSON.json(x))

@testset "ReadWorkWrite basic test" begin
if Threads.nthreads() == 1
@warn "Tests running with single thread - multithreading behavior not fully tested. Run with julia --threads=4 for complete testing."
end

ReadWorkWrite.readworkwrite(dummy_read, dummy_work, results, 1:3)
# Sort by key since order is not guaranteed with multithreading
sort!(results, by=x->x.key)
Expand All @@ -33,3 +37,94 @@ dummy_write(x) = println(io, JSON.json(x))
@test work_results == expected_work

end

@testset "Type inference and async features" begin
# Test automatic type inference - should infer concrete types
type_results_inferred = NamedTuple{(:key, :data), Tuple{Symbol, Int64}}[]
ReadWorkWrite.readworkwrite(x -> (key=Symbol("item_$x"), data=x), x -> (key=x.key, data=x.data * 2), type_results_inferred, [1, 2, 3])
sort!(type_results_inferred, by=x->x.key)
@test type_results_inferred == [(key=:item_1, data=2), (key=:item_2, data=4), (key=:item_3, data=6)]
@test eltype(type_results_inferred) == NamedTuple{(:key, :data), Tuple{Symbol, Int64}}

# Test manual type specification with Any - should allow mixed types
manual_results_any = Any[]
ReadWorkWrite.readworkwrite(x -> (key=Symbol("item_$x"), data=x), x -> (key=x.key, data=x.data * 2), manual_results_any, [1, 2, 3]; T=Any)
sort!(manual_results_any, by=x->x.key)
@test manual_results_any == [(key=:item_1, data=2), (key=:item_2, data=4), (key=:item_3, data=6)]
@test eltype(manual_results_any) == Any

# Test return_on_completion=false
async_results = Int[]
handles = ReadWorkWrite.readworkwrite(x -> x, x -> x * 2, async_results, [1, 2, 3]; return_on_completion=false)
@test haskey(handles, :in_ch)
@test haskey(handles, :out_ch)
@test haskey(handles, :writer)
@test haskey(handles, :workers)

# Wait for completion
wait.(handles.workers) # workers is now an array
close(handles.out_ch) # need to close output channel
wait(handles.writer)
sort!(async_results)
@test async_results == [2, 4, 6]

# Test error handling for inconsistent types
inconsistent_data = [1, "hello", 3.14]
@test_throws ArgumentError ReadWorkWrite.readworkwrite(x -> x, x -> x, Int[], inconsistent_data)

# But should work with T=Any
any_results = Any[]
ReadWorkWrite.readworkwrite(x -> x, x -> x, any_results, inconsistent_data; T=Any)
sort!(any_results, by=string) # Sort by string representation since mixed types
@test any_results == [1, 3.14, "hello"]
end

@testset "Early Stopping" begin
stop_results = NamedTuple{(:key, :data), Tuple{Symbol, Int64}}[]

# Start processing with one slow item
task = ReadWorkWrite.readworkwrite(
x -> (key=Symbol("item_$x"), data=x),
x -> begin
if x.key == :item_2
sleep(2) # This should be interrupted
end
(key=x.key, data=x.data * 10)
end,
stop_results,
[1, 2, 3];
return_on_completion=false
)

# Give it a moment to start processing
sleep(0.1)

# Stop early - this should interrupt the sleep(2) for :item_2
close(task.out_ch)

# Clean up all tasks properly
try
# Give a moment for tasks to notice the closed channel and exit
sleep(0.1)
# Force cleanup if needed
for worker in task.workers
if !istaskdone(worker)
Base.schedule(worker, InterruptException(), error=true)
end
end
catch
# Ignore cleanup errors
end

# Should have processed :item_1 but not :item_2 (which was sleeping)
# :item_3 might or might not be processed depending on timing
sort!(stop_results, by=x->x.key)

# At minimum, we should have :item_1, and we should NOT have the full set
@test length(stop_results) >= 1
@test length(stop_results) < 3 # Should be stopped before completing all
@test (key=:item_1, data=10) in stop_results

# The sleeping :item_2 should not be completed
@test (key=:item_2, data=20) ∉ stop_results
end
Loading