diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index ad5b145..c1348e8 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -29,6 +29,8 @@ jobs: - uses: julia-actions/cache@v1 - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 + with: + num_threads: 4 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v3 with: diff --git a/README.md b/README.md index b407a3c..19958e3 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,25 @@ # ReadWorkWrite.jl +[![CI](https://github.com/jerlich/ReadWorkWrite.jl/actions/workflows/CI.yml/badge.svg)](https://github.com/jerlich/ReadWorkWrite.jl/actions/workflows/CI.yml) + A Julia package for efficient parallel processing pipelines that separates IO-bound operations from CPU-intensive work. ## Overview ReadWorkWrite.jl implements a pattern where: -- **Read**: Single-threaded IO operations (loading files from disk) +- **Read**: Single-threaded IO (loading files from disk) - **Work**: Multi-threaded CPU-intensive processing (e.g., MCMC sampling, data analysis) -- **Write**: Single-threaded IO operations (writing to databases, files) +- **Write**: Single-threaded IO (writing to databases, files) + +This design prevents threading issues with IO operations, and minimizing memory requirements, while maximizing parallelization for computational work. + +## Motivation -This design prevents threading issues with IO operations while maximizing parallelization for computational work. +I am a [neuroscientist](https://www.sainsburywellcome.org/web/groups/erlich-lab). In my work, we often need to process data in batches and these processes are CPU bound, not IO bound. For example, we might need to do model comparison on many thousands of neurons. + +Other packages, like [Folds.jl](https://github.com/JuliaFolds/Folds.jl) or [ThreadsX.jl](https://github.com/tkf/ThreadsX.jl) are convenient for multi-core or multi-threaded `map` like functions. But they process an entire iterator together so if you have thousands of elements the process may be more memory intensive than is necessary. + +This package, ReadWorkWrite.jl, takes advantage of Base Channels and Threads to read in data _only as fast as the workers can handle them_. ## Installation @@ -56,6 +66,8 @@ readworkwrite(load_data, process_data, save_results, filenames) For complete working examples, see the `examples/` directory. +For additional usage patterns and advanced features (like early stopping, type inference, and structured data handling), check out `test/runtests.jl`. + ## API Reference ### `readworkwrite(readfn, workfn, writefn, data; nworkers=Threads.nthreads(), buf=nworkers+2)` @@ -83,6 +95,7 @@ Execute work-write pipeline, skipping the read step. See the `examples/` directory for complete working examples including MCMC analysis with Turing.jl. + ## Key Features - **Thread Safety**: IO operations remain single-threaded to avoid concurrency issues @@ -91,15 +104,3 @@ See the `examples/` directory for complete working examples including MCMC analy - **Scalable**: Automatically uses available CPU threads for work processing - **Order Independence**: Handles unordered results from parallel processing -## Design Rationale - -Many applications need to: -1. Load data from files/databases (IO-bound, often not thread-safe) -2. Perform expensive computations (CPU-bound, benefits from parallelization) -3. Save results (IO-bound, often requires serialization) - -ReadWorkWrite.jl provides a clean abstraction for this common pattern while handling the complexities of thread coordination and backpressure management. - -## License - -MIT License \ No newline at end of file diff --git a/src/ReadWorkWrite.jl b/src/ReadWorkWrite.jl index 99dd200..a895425 100644 --- a/src/ReadWorkWrite.jl +++ b/src/ReadWorkWrite.jl @@ -12,9 +12,26 @@ Run a pipeline: Data flows through channels with backpressure, up to `buf` in-flight items. """# General form: explicit writer function readworkwrite(rf, wf, wrf::Function, data; - nworkers=Threads.nthreads(), buf=nworkers+2) - in_ch = Channel{Any}(buf) - out_ch = Channel{Any}(buf) + nworkers=Threads.nthreads(), buf=nworkers+2, T=nothing, + return_on_completion=true) + # Type inference from first element + if T === nothing + first_elem = first(data) + first_read = rf(first_elem) + first_work = wf(first_read) + InType = typeof(first_read) + OutType = typeof(first_work) + in_ch = Channel{InType}(buf) + out_ch = Channel{OutType}(buf) + + # Put the already-computed first work item + put!(out_ch, first_work) + remaining_data = Iterators.drop(data, 1) + else + in_ch = Channel{T}(buf) + out_ch = Channel{T}(buf) + remaining_data = data + end writer = @async begin for out in out_ch @@ -22,41 +39,65 @@ function readworkwrite(rf, wf, wrf::Function, data; end end - @async begin - for d in data - put!(in_ch, rf(d)) + reader = @async begin + try + for d in remaining_data + put!(in_ch, rf(d)) + end + catch e + @debug e + if e isa MethodError || e isa TypeError + throw(ArgumentError("Type mismatch detected. Your reader `rf` produces inconsistent types. Try using T=Any for heterogeneous data processing.")) + else + rethrow(e) + end + finally + close(in_ch) end - close(in_ch) end - @sync begin - for wid in 1:nworkers - Threads.@spawn begin - for item in in_ch - put!(out_ch, wf(item)) - end + workers = [Threads.@spawn begin + try + for item in in_ch + put!(out_ch, wf(item)) + end + catch e + @debug e + if e isa MethodError || e isa TypeError + throw(ArgumentError("Type mismatch detected in work function. Your pipeline produces inconsistent types. Try using T=Any for heterogeneous data processing.")) + else + rethrow(e) end end + end for _ in 1:nworkers] + + if return_on_completion + try + wait(reader) # Wait for reader to finish + wait.(workers) # Wait for all worker tasks + close(out_ch) # Close output channel after workers finish + wait(writer) # Now writer can finish + catch e + if e isa TaskFailedException + # Unwrap the nested exception for cleaner error messages + rethrow(e.task.exception) + else + rethrow(e) + end + end + return nothing + else + return (;in_ch, out_ch, writer, workers, reader) end - - close(out_ch) - wait(writer) - return nothing end # Mutating variant: push into given vector -function readworkwrite(rf, wf, results::Vector, data; - nworkers=Threads.nthreads(), buf=nworkers+2) - readworkwrite(rf, wf, x -> push!(results, x), data; - nworkers=nworkers, buf=buf) - return results +function readworkwrite(rf, wf, results::Vector, data; kwargs...) + readworkwrite(rf, wf, x -> push!(results, x), data; kwargs...) end -function workwrite(wf, results::Vector, data; - nworkers=Threads.nthreads(), buf=nworkers+2) - readworkwrite((x)->x, wf, x -> push!(results, x), data; - nworkers=nworkers, buf=buf) - return results +function workwrite(wf, results::Vector, data; kwargs...) + readworkwrite((x)->x, wf, x -> push!(results, x), data; kwargs...) end diff --git a/test/runtests.jl b/test/runtests.jl index cd2f399..278b851 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -11,6 +11,10 @@ io = IOBuffer() dummy_write(x) = println(io, JSON.json(x)) @testset "ReadWorkWrite basic test" begin + if Threads.nthreads() == 1 + @warn "Tests running with single thread - multithreading behavior not fully tested. Run with julia --threads=4 for complete testing." + end + ReadWorkWrite.readworkwrite(dummy_read, dummy_work, results, 1:3) # Sort by key since order is not guaranteed with multithreading sort!(results, by=x->x.key) @@ -33,3 +37,94 @@ dummy_write(x) = println(io, JSON.json(x)) @test work_results == expected_work end + +@testset "Type inference and async features" begin + # Test automatic type inference - should infer concrete types + type_results_inferred = NamedTuple{(:key, :data), Tuple{Symbol, Int64}}[] + ReadWorkWrite.readworkwrite(x -> (key=Symbol("item_$x"), data=x), x -> (key=x.key, data=x.data * 2), type_results_inferred, [1, 2, 3]) + sort!(type_results_inferred, by=x->x.key) + @test type_results_inferred == [(key=:item_1, data=2), (key=:item_2, data=4), (key=:item_3, data=6)] + @test eltype(type_results_inferred) == NamedTuple{(:key, :data), Tuple{Symbol, Int64}} + + # Test manual type specification with Any - should allow mixed types + manual_results_any = Any[] + ReadWorkWrite.readworkwrite(x -> (key=Symbol("item_$x"), data=x), x -> (key=x.key, data=x.data * 2), manual_results_any, [1, 2, 3]; T=Any) + sort!(manual_results_any, by=x->x.key) + @test manual_results_any == [(key=:item_1, data=2), (key=:item_2, data=4), (key=:item_3, data=6)] + @test eltype(manual_results_any) == Any + + # Test return_on_completion=false + async_results = Int[] + handles = ReadWorkWrite.readworkwrite(x -> x, x -> x * 2, async_results, [1, 2, 3]; return_on_completion=false) + @test haskey(handles, :in_ch) + @test haskey(handles, :out_ch) + @test haskey(handles, :writer) + @test haskey(handles, :workers) + + # Wait for completion + wait.(handles.workers) # workers is now an array + close(handles.out_ch) # need to close output channel + wait(handles.writer) + sort!(async_results) + @test async_results == [2, 4, 6] + + # Test error handling for inconsistent types + inconsistent_data = [1, "hello", 3.14] + @test_throws ArgumentError ReadWorkWrite.readworkwrite(x -> x, x -> x, Int[], inconsistent_data) + + # But should work with T=Any + any_results = Any[] + ReadWorkWrite.readworkwrite(x -> x, x -> x, any_results, inconsistent_data; T=Any) + sort!(any_results, by=string) # Sort by string representation since mixed types + @test any_results == [1, 3.14, "hello"] +end + +@testset "Early Stopping" begin + stop_results = NamedTuple{(:key, :data), Tuple{Symbol, Int64}}[] + + # Start processing with one slow item + task = ReadWorkWrite.readworkwrite( + x -> (key=Symbol("item_$x"), data=x), + x -> begin + if x.key == :item_2 + sleep(2) # This should be interrupted + end + (key=x.key, data=x.data * 10) + end, + stop_results, + [1, 2, 3]; + return_on_completion=false + ) + + # Give it a moment to start processing + sleep(0.1) + + # Stop early - this should interrupt the sleep(2) for :item_2 + close(task.out_ch) + + # Clean up all tasks properly + try + # Give a moment for tasks to notice the closed channel and exit + sleep(0.1) + # Force cleanup if needed + for worker in task.workers + if !istaskdone(worker) + Base.schedule(worker, InterruptException(), error=true) + end + end + catch + # Ignore cleanup errors + end + + # Should have processed :item_1 but not :item_2 (which was sleeping) + # :item_3 might or might not be processed depending on timing + sort!(stop_results, by=x->x.key) + + # At minimum, we should have :item_1, and we should NOT have the full set + @test length(stop_results) >= 1 + @test length(stop_results) < 3 # Should be stopped before completing all + @test (key=:item_1, data=10) in stop_results + + # The sleeping :item_2 should not be completed + @test (key=:item_2, data=20) ∉ stop_results +end