-
Notifications
You must be signed in to change notification settings - Fork 0
Use workers for file download #271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis pull request extends the file transfer API to support parallel downloads by adding a num_workers parameter throughout the Backend and Interface layers. The Minio backend’s get_file implementation is rewritten to pre-allocate the target file, split the download into chunks, and dispatch them via audeer.run_tasks. A cancellation event and signal handler are introduced to ensure immediate cleanup on Ctrl+C. The change is covered by new tests for parallel downloads and interrupt handling, a benchmark script, and a dependency bump. Sequence diagram for parallel file download with Minio backendsequenceDiagram
participant User
participant Interface
participant MinioBackend
participant audeer
User->>Interface: get_file(src_path, dst_path, version, num_workers>1)
Interface->>MinioBackend: get_file(src_path, dst_path, num_workers)
MinioBackend->>MinioBackend: pre-allocate dst_path
MinioBackend->>audeer: run_tasks(_download_file, tasks, num_workers)
audeer->>MinioBackend: _download_file(src_path, dst_path, pbar, cancel_event, offset, length)
MinioBackend->>MinioBackend: write chunk to dst_path
MinioBackend->>audeer: update progress bar
Note over User,MinioBackend: If Ctrl+C pressed, cancel_event is set
MinioBackend->>MinioBackend: cleanup partial file
Class diagram for updated Backend and Interface file transfer methodsclassDiagram
class Backend {
+copy_file(src_path, dst_path, num_workers=1, validate=False, verbose=False)
+get_file(src_path, dst_path, num_workers=1, validate=False, verbose=False)
+move_file(src_path, dst_path, num_workers=1, validate=False, verbose=False)
+get_archive(src_path, dst_root, tmp_root=None, num_workers=1, validate=False, verbose=False)
}
class MinioBackend {
+_copy_file(src_path, dst_path, num_workers, verbose)
+_get_file(src_path, dst_path, num_workers, verbose)
+_move_file(src_path, dst_path, num_workers, verbose)
+_download_file(src_path, dst_path, pbar, cancel_event, offset=0, length=None)
}
class ArtifactoryBackend {
+_copy_file(src_path, dst_path, num_workers, verbose)
+_get_file(src_path, dst_path, num_workers, verbose)
+_move_file(src_path, dst_path, num_workers, verbose)
}
class FilesystemBackend {
+_copy_file(src_path, dst_path, num_workers, verbose)
+_get_file(src_path, dst_path, num_workers, verbose)
+_move_file(src_path, dst_path, num_workers, verbose)
}
Backend <|-- MinioBackend
Backend <|-- ArtifactoryBackend
Backend <|-- FilesystemBackend
class VersionedInterface {
+copy_file(src_path, dst_path, version=None, num_workers=1, validate=False, verbose=False)
+get_file(src_path, dst_path, version, num_workers=1, validate=False, verbose=False)
+move_file(src_path, dst_path, version=None, num_workers=1, validate=False, verbose=False)
}
class UnversionedInterface {
+copy_file(src_path, dst_path, num_workers=1, validate=False, verbose=False)
+get_file(src_path, dst_path, num_workers=1, validate=False, verbose=False)
+move_file(src_path, dst_path, num_workers=1, validate=False, verbose=False)
}
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files
🚀 New features to boost your workflow:
|
14aa823 to
0682790
Compare
This reverts commit b823bbe.
546d4b9 to
b32d4b2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider refactoring num_workers into a backend-level setting (for example a constructor parameter) rather than threading it through every copy/get/move method to reduce duplication and keep signatures simpler.
- Mutating the global SIGINT handler inside _get_file can interfere with other listeners or multithreaded contexts—consider a context manager or higher-level cancellation API instead of calling signal.signal directly.
- Rather than writing directly to the final dst_path, download into a temp file and atomically rename on success to avoid leaving partial files if the process is killed unexpectedly.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider refactoring num_workers into a backend-level setting (for example a constructor parameter) rather than threading it through every copy/get/move method to reduce duplication and keep signatures simpler.
- Mutating the global SIGINT handler inside _get_file can interfere with other listeners or multithreaded contexts—consider a context manager or higher-level cancellation API instead of calling signal.signal directly.
- Rather than writing directly to the final dst_path, download into a temp file and atomically rename on success to avoid leaving partial files if the process is killed unexpectedly.
## Individual Comments
### Comment 1
<location> `audbackend/core/backend/minio.py:305-308` </location>
<code_context>
+ # Create and run download tasks
+ tasks = []
+ chunk_size = src_size // num_workers
+ for i in range(num_workers):
+ offset = i * chunk_size
+ length = chunk_size if i < num_workers - 1 else src_size - offset
+ tasks.append(
+ ([src_path, dst_path, pbar, cancel_event, offset, length], {})
+ )
</code_context>
<issue_to_address>
**suggestion:** Chunk calculation may result in zero-length chunks if num_workers exceeds file size.
Ensure that num_workers does not exceed src_size // chunk_size to prevent assigning zero-byte tasks. Alternatively, validate that each worker receives a non-zero chunk before task creation.
Suggested implementation:
```python
# Create and run download tasks
tasks = []
# Ensure num_workers does not exceed src_size
num_workers = min(num_workers, src_size) if src_size > 0 else 1
chunk_size = src_size // num_workers
```
```python
for i in range(num_workers):
offset = i * chunk_size
# Ensure each worker gets at least one byte if possible
length = chunk_size if i < num_workers - 1 else src_size - offset
if length > 0:
tasks.append(
([src_path, dst_path, pbar, cancel_event, offset, length], {})
)
```
</issue_to_address>
### Comment 2
<location> `audbackend/core/backend/minio.py:334-336` </location>
<code_context>
+ chunk_size = 4 * 1024 # 4 KB
+
+ # Get the data stream
+ kwargs = {"offset": offset, "length": length} if length else {}
+ response = self._client.get_object(self.repository, src_path, **kwargs)
+
+ try:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Passing length=None may result in unexpected behavior for get_object.
If offset is set but length is None, the current logic omits both, which may not fetch the correct data range. Recommend always including offset, and only exclude length when it is None.
```suggestion
# Get the data stream
kwargs = {"offset": offset}
if length is not None:
kwargs["length"] = length
response = self._client.get_object(self.repository, src_path, **kwargs)
```
</issue_to_address>
### Comment 3
<location> `audbackend/core/backend/minio.py:343-349` </location>
<code_context>
+ if offset:
+ f.seek(offset)
+
+ with pbar:
+ while data := response.read(chunk_size):
+ # Check if cancellation was requested
+ if cancel_event and cancel_event.is_set():
+ raise KeyboardInterrupt("Download cancelled by user")
+ f.write(data)
+ pbar.update(len(data))
+ finally:
+ response.close()
</code_context>
<issue_to_address>
**issue (bug_risk):** Using the progress bar as a context manager inside each thread may cause synchronization issues.
Simultaneous entry into the progress bar context by multiple threads can cause race conditions and inaccurate progress updates. Use thread-safe mechanisms for progress updates or manage the progress bar outside thread contexts.
</issue_to_address>
### Comment 4
<location> `audbackend/core/backend/minio.py:315-316` </location>
<code_context>
+ audeer.run_tasks(self._download_file, tasks, num_workers=num_workers)
+ except KeyboardInterrupt:
+ # Clean up partial file
+ if os.path.exists(dst_path):
+ os.remove(dst_path)
+ raise
+ finally:
</code_context>
<issue_to_address>
**issue (bug_risk):** Removing the partial file on KeyboardInterrupt may race with other threads.
If threads are still writing when the file is removed, this could cause data corruption or errors. Use a lock or ensure all threads have finished before deleting the file.
</issue_to_address>
### Comment 5
<location> `benchmarks/README.rst:10` </location>
<code_context>
+Parallel file loading
+---------------------
+
+The ``Minio`` backend support parallel loading of files.
+It can be benchmarked with:
+
</code_context>
<issue_to_address>
**issue (typo):** Correct 'support' to 'supports' for subject-verb agreement.
Change to 'supports' for correct grammar.
```suggestion
The ``Minio`` backend supports parallel loading of files.
```
</issue_to_address>
### Comment 6
<location> `audbackend/core/backend/minio.py:212` </location>
<code_context>
def _copy_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Copy file on backend."""
src_path = self.path(src_path)
dst_path = self.path(dst_path)
checksum = self._checksum(src_path)
# `copy_object()` has a maximum size limit of 5GB.
# We use 4.9GB to have some headroom
if self._size(src_path) / 1024 / 1024 / 1024 >= 4.9:
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = audeer.path(tmp_dir, os.path.basename(src_path))
self._get_file(src_path, tmp_path, num_workers, verbose)
self._put_file(tmp_path, dst_path, checksum, verbose)
else:
self._client.copy_object(
self.repository,
dst_path,
minio.commonconfig.CopySource(self.repository, src_path),
metadata=_metadata(checksum),
)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify numeric comparison [×3] ([`simplify-numeric-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-numeric-comparison/))
```suggestion
if self._size(src_path) >= 5261334937.6:
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
When you look into the related |
These are some valid points, my answers to it:
|
Closes #264
Adds
num_workerstoBackend.copy_file()Backend.get_archive()Backend.get_file()Backend.move_file()Interface.copy_file()Interface.get_file()Interface.move_file()I decided to add those to the methods and not to the Backend instantiation as it allows for easier using them.
I further decided to not provide the
chunk_sizeargument to the user, but let's the backend internally decide what the correct chunk size should be.For the Artifactory and Filesystem backends we ignore
num_workersand use a single worker, for Minio we support using several workers to speed up the download, using a chunk size to split the file innum_workersparts.I created audeering/audmodel#35 to test the changes in
audmodel.Archive extraction
Even though I add
num_workerstoget_archive()here, it is only used during file download, not during file extraction. I would propose we first investigate multi-threading file extraction further at audeering/audeer#184 and add it here in a separate pull request if we think it makes sense.Keyboard interruption
A user can interrupt a download with Ctrl+C. This needed some additional code for
num_workers> 1 as the code handling it insideaudeer.run_tasks()is not sufficient when writing to files. In that case the file handler would wait untilresponse.read()finished. With the help of Claude I found a solution that ensures we now always immediately return when the user presses Ctrl+C independent of the number of workers.Code to test user interruption
Benchmarks
I tested the current implementation on the model
7289b57d-1.0.0(4.2 GB) by running:$ cd benchmarks $ uv run --python 3.12 minio-parallel.pyon a server with 10 CPUs.
Summary by Sourcery
Introduce a num_workers parameter to enable parallel downloads in Minio, improve Ctrl+C interrupt handling with immediate cleanup, extend tests for multi-worker and interrupt scenarios, and add benchmarks
New Features:
Enhancements:
Build:
Documentation:
Tests: