feat: implement group().on_done() barrier for multi-parent dependencies#1
feat: implement group().on_done() barrier for multi-parent dependencies#1
Conversation
Add dependency_job_ids (Json) and pending_dependency_count (Integer) fields to queue.job to support wait-for-all barriers. Group callbacks now start in waiting state and transition to pending only after every group member completes. Failure of any member cascades to the callback. The worker path uses atomic SQL (UPDATE SET count = count - 1) for concurrency safety. The ORM path (_release_dependents/_fail_dependents) serves run_now() and button actions.
Summary of ChangesHello @jeremi, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the job queuing system by introducing robust support for multi-parent job dependencies, specifically for Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and valuable feature: a wait-for-all barrier for group().on_done() callbacks. The implementation correctly adds the necessary fields to the queue.job model and leverages atomic SQL updates in the hot path of the worker for good performance. The documentation and tests are also well-updated.
My review focuses on a few key areas for improvement:
- Performance: The ORM-based paths for releasing and failing dependent jobs (
_release_dependents,_fail_dependents) are inefficient and can lead to N+1 query problems. I've suggested replacing them with raw SQL queries for better performance, mirroring the efficient approach already used in the worker. - Code Duplication: There's an opportunity to reduce code duplication in the worker's exception handling by reusing the new
_fail_dependentsmethod.
Overall, this is a solid implementation of a complex feature. Addressing these points will make it more robust and performant.
| def _release_dependents(self): | ||
| """Release multi-parent dependents (group barriers) when this job completes.""" | ||
| for job in self: | ||
| if not job.graph_uuid: | ||
| continue | ||
| waiting_deps = self.search( | ||
| [ | ||
| ("graph_uuid", "=", job.graph_uuid), | ||
| ("state", "=", "waiting"), | ||
| ("dependency_job_ids", "!=", False), | ||
| ] | ||
| ) | ||
| for dep in waiting_deps: | ||
| if job.id in (dep.dependency_job_ids or []): | ||
| dep.pending_dependency_count -= 1 | ||
| if dep.pending_dependency_count <= 0: | ||
| dep.state = "pending" |
There was a problem hiding this comment.
The current implementation of _release_dependents is inefficient. It fetches all potential dependent jobs within the same graph and then filters them in Python, leading to an N+1 update problem as each dependent job's pending_dependency_count is decremented individually. This can cause significant performance issues for groups with many members.
A much more performant approach would be to use a single, atomic SQL UPDATE query, similar to the logic in the worker's finalize_job function. This avoids fetching unnecessary data and looping in Python.
def _release_dependents(self):
"""Release multi-parent dependents (group barriers) when this job completes."""
for job in self:
if not job.graph_uuid:
continue
# Use a single atomic SQL UPDATE for performance, mirroring the worker path.
# This avoids fetching all dependents and looping in Python (N+1 problem).
self.env.cr.execute(
"""
UPDATE queue_job
SET pending_dependency_count = pending_dependency_count - 1,
state = CASE
WHEN pending_dependency_count - 1 <= 0
THEN 'pending'
ELSE state
END,
write_date = NOW()
WHERE graph_uuid = %s
AND state = 'waiting'
AND dependency_job_ids IS NOT NULL
AND dependency_job_ids @> (%s)::jsonb
""",
(job.graph_uuid, json.dumps([job.id])),
)| def _fail_dependents(self): | ||
| """Cascade failure to multi-parent dependents.""" | ||
| for job in self: | ||
| if not job.graph_uuid: | ||
| continue | ||
| waiting_deps = self.search( | ||
| [ | ||
| ("graph_uuid", "=", job.graph_uuid), | ||
| ("state", "=", "waiting"), | ||
| ("dependency_job_ids", "!=", False), | ||
| ] | ||
| ) | ||
| for dep in waiting_deps: | ||
| if job.id in (dep.dependency_job_ids or []): | ||
| dep.state = "failed" | ||
| dep.exc_info = f"Parent job {job.id} failed" |
There was a problem hiding this comment.
Similar to _release_dependents, this method is inefficient. It fetches all potential dependents and filters in Python, then updates them one by one, causing an N+1 update problem. This can be replaced by a single UPDATE query for better performance.
def _fail_dependents(self):
"""Cascade failure to multi-parent dependents."""
for job in self:
if not job.graph_uuid:
continue
# Use a single atomic SQL UPDATE for performance, mirroring the worker path.
self.env.cr.execute(
"""
UPDATE queue_job
SET state = 'failed',
exc_info = %s,
write_date = NOW()
WHERE graph_uuid = %s
AND state = 'waiting'
AND dependency_job_ids IS NOT NULL
AND dependency_job_ids @> (%s)::jsonb
""",
(
f"Parent job {job.id} failed",
job.graph_uuid,
json.dumps([job.id]),
),
)| # Cascade failure to multi-parent dependents (group barriers) | ||
| if job.graph_uuid: | ||
| dep_jobs = job.env["queue.job"].search( | ||
| [ | ||
| ("graph_uuid", "=", job.graph_uuid), | ||
| ("state", "=", "waiting"), | ||
| ("dependency_job_ids", "!=", False), | ||
| ] | ||
| ) | ||
| for dep in dep_jobs: | ||
| if job.id in (dep.dependency_job_ids or []): | ||
| dep.state = "failed" | ||
| dep.exc_info = f"Parent job {job.id} failed" |
There was a problem hiding this comment.
This block of code for cascading failures to multi-parent dependents duplicates the logic from the newly added _fail_dependents method in the queue.job model. To improve maintainability and avoid code duplication, you should call job._fail_dependents() instead. This also has the benefit that if _fail_dependents is optimized (as suggested in another comment), this part of the code will automatically benefit from it.
# Cascade failure to multi-parent dependents (group barriers)
job._fail_dependents()The __getattr__ guard was rejecting all method names starting with _, but Odoo convention uses single-underscore prefixed methods for private methods that are commonly delayed (e.g., _import_data, _validate). Relax the check to only reject dunder methods (__) which are Python internal attributes that should not be intercepted.
Add dependency_job_ids (Json) and pending_dependency_count (Integer) fields to queue.job to support wait-for-all barriers. Group callbacks now start in waiting state and transition to pending only after every group member completes. Failure of any member cascades to the callback.
The worker path uses atomic SQL (UPDATE SET count = count - 1) for concurrency safety. The ORM path (_release_dependents/_fail_dependents) serves run_now() and button actions.