From 53b52ab65212fd77655453d0f585faf91715a5d6 Mon Sep 17 00:00:00 2001 From: Jeffrey Erlich Date: Sun, 5 Oct 2025 00:02:52 +0100 Subject: [PATCH 1/9] trigger CI checks for PR --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b407a3c..d14884f 100644 --- a/README.md +++ b/README.md @@ -102,4 +102,4 @@ ReadWorkWrite.jl provides a clean abstraction for this common pattern while hand ## License -MIT License \ No newline at end of file +MIT License# Test From 543a6c3084e0a0c01317ccb371ee46175c41606e Mon Sep 17 00:00:00 2001 From: Jeffrey Erlich Date: Mon, 6 Oct 2025 00:38:40 +0100 Subject: [PATCH 2/9] infer channel type instead of Any --- src/ReadWorkWrite.jl | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/ReadWorkWrite.jl b/src/ReadWorkWrite.jl index 99dd200..111bd4a 100644 --- a/src/ReadWorkWrite.jl +++ b/src/ReadWorkWrite.jl @@ -12,9 +12,25 @@ Run a pipeline: Data flows through channels with backpressure, up to `buf` in-flight items. """# General form: explicit writer function readworkwrite(rf, wf, wrf::Function, data; - nworkers=Threads.nthreads(), buf=nworkers+2) - in_ch = Channel{Any}(buf) - out_ch = Channel{Any}(buf) + nworkers=Threads.nthreads(), buf=nworkers+2, T=nothing) + # Type inference from first element + if T === nothing + first_elem = first(data) + first_read = rf(first_elem) + first_work = wf(first_read) + InType = typeof(first_read) + OutType = typeof(first_work) + in_ch = Channel{InType}(buf) + out_ch = Channel{OutType}(buf) + + # Put the already-computed first work item + put!(out_ch, first_work) + remaining_data = Iterators.drop(data, 1) + else + in_ch = Channel{T}(buf) + out_ch = Channel{T}(buf) + remaining_data = data + end writer = @async begin for out in out_ch @@ -23,7 +39,7 @@ function readworkwrite(rf, wf, wrf::Function, data; end @async begin - for d in data + for d in remaining_data put!(in_ch, rf(d)) end close(in_ch) @@ -45,17 +61,13 @@ function readworkwrite(rf, wf, wrf::Function, data; end # Mutating variant: push into given vector -function readworkwrite(rf, wf, results::Vector, data; - nworkers=Threads.nthreads(), buf=nworkers+2) - readworkwrite(rf, wf, x -> push!(results, x), data; - nworkers=nworkers, buf=buf) +function readworkwrite(rf, wf, results::Vector, data; kwargs...) + readworkwrite(rf, wf, x -> push!(results, x), data; kwargs...) return results end -function workwrite(wf, results::Vector, data; - nworkers=Threads.nthreads(), buf=nworkers+2) - readworkwrite((x)->x, wf, x -> push!(results, x), data; - nworkers=nworkers, buf=buf) +function workwrite(wf, results::Vector, data; kwargs...) + readworkwrite((x)->x, wf, x -> push!(results, x), data; kwargs...) return results end From c42494e6aacd2c379683c6fe5d627227318c7f82 Mon Sep 17 00:00:00 2001 From: Jeffrey Erlich Date: Mon, 6 Oct 2025 01:01:12 +0100 Subject: [PATCH 3/9] task management --- src/ReadWorkWrite.jl | 48 ++++++++++++++++++++++++++++++-------------- test/runtests.jl | 34 +++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 15 deletions(-) diff --git a/src/ReadWorkWrite.jl b/src/ReadWorkWrite.jl index 111bd4a..ac54639 100644 --- a/src/ReadWorkWrite.jl +++ b/src/ReadWorkWrite.jl @@ -12,7 +12,8 @@ Run a pipeline: Data flows through channels with backpressure, up to `buf` in-flight items. """# General form: explicit writer function readworkwrite(rf, wf, wrf::Function, data; - nworkers=Threads.nthreads(), buf=nworkers+2, T=nothing) + nworkers=Threads.nthreads(), buf=nworkers+2, T=nothing, + return_on_completion=true) # Type inference from first element if T === nothing first_elem = first(data) @@ -38,26 +39,43 @@ function readworkwrite(rf, wf, wrf::Function, data; end end - @async begin - for d in remaining_data - put!(in_ch, rf(d)) + reader = @async begin + try + for d in remaining_data + put!(in_ch, rf(d)) + end + catch e + if e isa MethodError || e isa TypeError + throw(ArgumentError("Type mismatch detected. Your reader `rf` produces inconsistent types. Try using T=Any for heterogeneous data processing.")) + else + rethrow(e) + end + finally + close(in_ch) end - close(in_ch) end - @sync begin - for wid in 1:nworkers - Threads.@spawn begin - for item in in_ch - put!(out_ch, wf(item)) - end + workers = [Threads.@spawn begin + try + for item in in_ch + put!(out_ch, wf(item)) + end + catch e + if e isa MethodError || e isa TypeError + throw(ArgumentError("Type mismatch detected in work function. Your pipeline produces inconsistent types. Try using T=Any for heterogeneous data processing.")) + else + rethrow(e) end end + end for _ in 1:nworkers] + + if return_on_completion + wait.(workers) # Wait for all worker tasks + wait(writer) + return nothing + else + return (;in_ch, out_ch, writer, workers, reader) end - - close(out_ch) - wait(writer) - return nothing end # Mutating variant: push into given vector diff --git a/test/runtests.jl b/test/runtests.jl index cd2f399..3b4da80 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -33,3 +33,37 @@ dummy_write(x) = println(io, JSON.json(x)) @test work_results == expected_work end + +@testset "Type inference and async features" begin + # Test automatic type inference + type_results = Int[] + ReadWorkWrite.readworkwrite(x -> x, x -> x * 2, type_results, [1, 2, 3]) + @test type_results == [2, 4, 6] + + # Test manual type specification + manual_results = Any[] + ReadWorkWrite.readworkwrite(x -> x, x -> x * 2, manual_results, [1, 2, 3]; T=Any) + @test manual_results == [2, 4, 6] + + # Test return_on_completion=false + async_results = Int[] + handles = ReadWorkWrite.readworkwrite(x -> x, x -> x * 2, async_results, [1, 2, 3]; return_on_completion=false) + @test haskey(handles, :in_ch) + @test haskey(handles, :out_ch) + @test haskey(handles, :writer) + @test haskey(handles, :workers) + + # Wait for completion + wait(handles.workers) + wait(handles.writer) + @test async_results == [2, 4, 6] + + # Test error handling for inconsistent types + inconsistent_data = [1, "hello", 3.14] + @test_throws ArgumentError ReadWorkWrite.readworkwrite(x -> x, x -> x, Int[], inconsistent_data) + + # But should work with T=Any + any_results = Any[] + ReadWorkWrite.readworkwrite(x -> x, x -> x, any_results, inconsistent_data; T=Any) + @test any_results == [1, "hello", 3.14] +end From 64e4a8f316aafb1067da2e2acc29ab57dfaa6420 Mon Sep 17 00:00:00 2001 From: Jeffrey Erlich Date: Mon, 6 Oct 2025 01:03:00 +0100 Subject: [PATCH 4/9] fix hang --- src/ReadWorkWrite.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ReadWorkWrite.jl b/src/ReadWorkWrite.jl index ac54639..96cd059 100644 --- a/src/ReadWorkWrite.jl +++ b/src/ReadWorkWrite.jl @@ -71,7 +71,8 @@ function readworkwrite(rf, wf, wrf::Function, data; if return_on_completion wait.(workers) # Wait for all worker tasks - wait(writer) + close(out_ch) # Close output channel after workers finish + wait(writer) # Now writer can finish return nothing else return (;in_ch, out_ch, writer, workers, reader) From d39b2b8b178076aeb09ffd5266727052a44ce884 Mon Sep 17 00:00:00 2001 From: Jeffrey Erlich Date: Mon, 6 Oct 2025 01:35:35 +0100 Subject: [PATCH 5/9] better tests --- .github/workflows/CI.yml | 2 ++ src/ReadWorkWrite.jl | 20 +++++++++++++++----- test/runtests.jl | 31 +++++++++++++++++++++---------- 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index ad5b145..c1348e8 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -29,6 +29,8 @@ jobs: - uses: julia-actions/cache@v1 - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 + with: + num_threads: 4 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v3 with: diff --git a/src/ReadWorkWrite.jl b/src/ReadWorkWrite.jl index 96cd059..a895425 100644 --- a/src/ReadWorkWrite.jl +++ b/src/ReadWorkWrite.jl @@ -45,6 +45,7 @@ function readworkwrite(rf, wf, wrf::Function, data; put!(in_ch, rf(d)) end catch e + @debug e if e isa MethodError || e isa TypeError throw(ArgumentError("Type mismatch detected. Your reader `rf` produces inconsistent types. Try using T=Any for heterogeneous data processing.")) else @@ -61,6 +62,7 @@ function readworkwrite(rf, wf, wrf::Function, data; put!(out_ch, wf(item)) end catch e + @debug e if e isa MethodError || e isa TypeError throw(ArgumentError("Type mismatch detected in work function. Your pipeline produces inconsistent types. Try using T=Any for heterogeneous data processing.")) else @@ -70,9 +72,19 @@ function readworkwrite(rf, wf, wrf::Function, data; end for _ in 1:nworkers] if return_on_completion - wait.(workers) # Wait for all worker tasks - close(out_ch) # Close output channel after workers finish - wait(writer) # Now writer can finish + try + wait(reader) # Wait for reader to finish + wait.(workers) # Wait for all worker tasks + close(out_ch) # Close output channel after workers finish + wait(writer) # Now writer can finish + catch e + if e isa TaskFailedException + # Unwrap the nested exception for cleaner error messages + rethrow(e.task.exception) + else + rethrow(e) + end + end return nothing else return (;in_ch, out_ch, writer, workers, reader) @@ -82,12 +94,10 @@ end # Mutating variant: push into given vector function readworkwrite(rf, wf, results::Vector, data; kwargs...) readworkwrite(rf, wf, x -> push!(results, x), data; kwargs...) - return results end function workwrite(wf, results::Vector, data; kwargs...) readworkwrite((x)->x, wf, x -> push!(results, x), data; kwargs...) - return results end diff --git a/test/runtests.jl b/test/runtests.jl index 3b4da80..d4d7a4b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -11,6 +11,10 @@ io = IOBuffer() dummy_write(x) = println(io, JSON.json(x)) @testset "ReadWorkWrite basic test" begin + if Threads.nthreads() == 1 + @warn "Tests running with single thread - multithreading behavior not fully tested. Run with julia --threads=4 for complete testing." + end + ReadWorkWrite.readworkwrite(dummy_read, dummy_work, results, 1:3) # Sort by key since order is not guaranteed with multithreading sort!(results, by=x->x.key) @@ -35,15 +39,19 @@ dummy_write(x) = println(io, JSON.json(x)) end @testset "Type inference and async features" begin - # Test automatic type inference - type_results = Int[] - ReadWorkWrite.readworkwrite(x -> x, x -> x * 2, type_results, [1, 2, 3]) - @test type_results == [2, 4, 6] + # Test automatic type inference - should infer concrete types + type_results_inferred = NamedTuple{(:key, :data), Tuple{Symbol, Int64}}[] + ReadWorkWrite.readworkwrite(x -> (key=Symbol("item_$x"), data=x), x -> (key=x.key, data=x.data * 2), type_results_inferred, [1, 2, 3]) + sort!(type_results_inferred, by=x->x.key) + @test type_results_inferred == [(key=:item_1, data=2), (key=:item_2, data=4), (key=:item_3, data=6)] + @test eltype(type_results_inferred) == NamedTuple{(:key, :data), Tuple{Symbol, Int64}} - # Test manual type specification - manual_results = Any[] - ReadWorkWrite.readworkwrite(x -> x, x -> x * 2, manual_results, [1, 2, 3]; T=Any) - @test manual_results == [2, 4, 6] + # Test manual type specification with Any - should allow mixed types + manual_results_any = Any[] + ReadWorkWrite.readworkwrite(x -> (key=Symbol("item_$x"), data=x), x -> (key=x.key, data=x.data * 2), manual_results_any, [1, 2, 3]; T=Any) + sort!(manual_results_any, by=x->x.key) + @test manual_results_any == [(key=:item_1, data=2), (key=:item_2, data=4), (key=:item_3, data=6)] + @test eltype(manual_results_any) == Any # Test return_on_completion=false async_results = Int[] @@ -54,8 +62,10 @@ end @test haskey(handles, :workers) # Wait for completion - wait(handles.workers) + wait.(handles.workers) # workers is now an array + close(handles.out_ch) # need to close output channel wait(handles.writer) + sort!(async_results) @test async_results == [2, 4, 6] # Test error handling for inconsistent types @@ -65,5 +75,6 @@ end # But should work with T=Any any_results = Any[] ReadWorkWrite.readworkwrite(x -> x, x -> x, any_results, inconsistent_data; T=Any) - @test any_results == [1, "hello", 3.14] + sort!(any_results, by=string) # Sort by string representation since mixed types + @test any_results == [1, 3.14, "hello"] end From ae5a0e16ab032085431b58dd4beb64f3e600daf1 Mon Sep 17 00:00:00 2001 From: Jeffrey Erlich Date: Mon, 6 Oct 2025 01:47:04 +0100 Subject: [PATCH 6/9] more tests --- README.md | 2 ++ test/runtests.jl | 50 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/README.md b/README.md index d14884f..8dbc158 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,8 @@ Execute work-write pipeline, skipping the read step. See the `examples/` directory for complete working examples including MCMC analysis with Turing.jl. +For additional usage patterns and advanced features (like early stopping, type inference, and structured data handling), check out `test/runtests.jl`. + ## Key Features - **Thread Safety**: IO operations remain single-threaded to avoid concurrency issues diff --git a/test/runtests.jl b/test/runtests.jl index d4d7a4b..278b851 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -78,3 +78,53 @@ end sort!(any_results, by=string) # Sort by string representation since mixed types @test any_results == [1, 3.14, "hello"] end + +@testset "Early Stopping" begin + stop_results = NamedTuple{(:key, :data), Tuple{Symbol, Int64}}[] + + # Start processing with one slow item + task = ReadWorkWrite.readworkwrite( + x -> (key=Symbol("item_$x"), data=x), + x -> begin + if x.key == :item_2 + sleep(2) # This should be interrupted + end + (key=x.key, data=x.data * 10) + end, + stop_results, + [1, 2, 3]; + return_on_completion=false + ) + + # Give it a moment to start processing + sleep(0.1) + + # Stop early - this should interrupt the sleep(2) for :item_2 + close(task.out_ch) + + # Clean up all tasks properly + try + # Give a moment for tasks to notice the closed channel and exit + sleep(0.1) + # Force cleanup if needed + for worker in task.workers + if !istaskdone(worker) + Base.schedule(worker, InterruptException(), error=true) + end + end + catch + # Ignore cleanup errors + end + + # Should have processed :item_1 but not :item_2 (which was sleeping) + # :item_3 might or might not be processed depending on timing + sort!(stop_results, by=x->x.key) + + # At minimum, we should have :item_1, and we should NOT have the full set + @test length(stop_results) >= 1 + @test length(stop_results) < 3 # Should be stopped before completing all + @test (key=:item_1, data=10) in stop_results + + # The sleeping :item_2 should not be completed + @test (key=:item_2, data=20) ∉ stop_results +end From 6df1aaf9c541806fce854efd3f31b50491bc2354 Mon Sep 17 00:00:00 2001 From: Jeffrey Erlich Date: Mon, 6 Oct 2025 01:49:18 +0100 Subject: [PATCH 7/9] small fix --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8dbc158..db22622 100644 --- a/README.md +++ b/README.md @@ -104,4 +104,4 @@ ReadWorkWrite.jl provides a clean abstraction for this common pattern while hand ## License -MIT License# Test +MIT License From f0c3af56e2bd7b6fe069b9e6e52cb3c0036907e2 Mon Sep 17 00:00:00 2001 From: Jeffrey Erlich Date: Mon, 6 Oct 2025 01:53:49 +0100 Subject: [PATCH 8/9] add CI badge --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index db22622..92d0eb4 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # ReadWorkWrite.jl +[![CI](https://github.com/jerlich/ReadWorkWrite.jl/actions/workflows/CI.yml/badge.svg)](https://github.com/jerlich/ReadWorkWrite.jl/actions/workflows/CI.yml) + A Julia package for efficient parallel processing pipelines that separates IO-bound operations from CPU-intensive work. ## Overview From ec77c12ec87257d01583e70af1b939845a5cca23 Mon Sep 17 00:00:00 2001 From: Jeffrey Erlich Date: Wed, 8 Oct 2025 09:08:16 +0100 Subject: [PATCH 9/9] update README --- README.md | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 92d0eb4..19958e3 100644 --- a/README.md +++ b/README.md @@ -7,11 +7,19 @@ A Julia package for efficient parallel processing pipelines that separates IO-bo ## Overview ReadWorkWrite.jl implements a pattern where: -- **Read**: Single-threaded IO operations (loading files from disk) +- **Read**: Single-threaded IO (loading files from disk) - **Work**: Multi-threaded CPU-intensive processing (e.g., MCMC sampling, data analysis) -- **Write**: Single-threaded IO operations (writing to databases, files) +- **Write**: Single-threaded IO (writing to databases, files) -This design prevents threading issues with IO operations while maximizing parallelization for computational work. +This design prevents threading issues with IO operations, and minimizing memory requirements, while maximizing parallelization for computational work. + +## Motivation + +I am a [neuroscientist](https://www.sainsburywellcome.org/web/groups/erlich-lab). In my work, we often need to process data in batches and these processes are CPU bound, not IO bound. For example, we might need to do model comparison on many thousands of neurons. + +Other packages, like [Folds.jl](https://github.com/JuliaFolds/Folds.jl) or [ThreadsX.jl](https://github.com/tkf/ThreadsX.jl) are convenient for multi-core or multi-threaded `map` like functions. But they process an entire iterator together so if you have thousands of elements the process may be more memory intensive than is necessary. + +This package, ReadWorkWrite.jl, takes advantage of Base Channels and Threads to read in data _only as fast as the workers can handle them_. ## Installation @@ -58,6 +66,8 @@ readworkwrite(load_data, process_data, save_results, filenames) For complete working examples, see the `examples/` directory. +For additional usage patterns and advanced features (like early stopping, type inference, and structured data handling), check out `test/runtests.jl`. + ## API Reference ### `readworkwrite(readfn, workfn, writefn, data; nworkers=Threads.nthreads(), buf=nworkers+2)` @@ -85,7 +95,6 @@ Execute work-write pipeline, skipping the read step. See the `examples/` directory for complete working examples including MCMC analysis with Turing.jl. -For additional usage patterns and advanced features (like early stopping, type inference, and structured data handling), check out `test/runtests.jl`. ## Key Features @@ -95,15 +104,3 @@ For additional usage patterns and advanced features (like early stopping, type i - **Scalable**: Automatically uses available CPU threads for work processing - **Order Independence**: Handles unordered results from parallel processing -## Design Rationale - -Many applications need to: -1. Load data from files/databases (IO-bound, often not thread-safe) -2. Perform expensive computations (CPU-bound, benefits from parallelization) -3. Save results (IO-bound, often requires serialization) - -ReadWorkWrite.jl provides a clean abstraction for this common pattern while handling the complexities of thread coordination and backpressure management. - -## License - -MIT License