-
Notifications
You must be signed in to change notification settings - Fork 51
fix: add graceful termination of futures in vmaas_sync #2082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
fix: add graceful termination of futures in vmaas_sync #2082
Conversation
Reviewer's guide (collapsed on small PRs)Reviewer's GuideEnsures that all asynchronous Kafka send operations in vmaas_sync.re_evaluate_systems complete gracefully before logging completion and committing, by tracking created futures and awaiting them at the end of processing. Sequence diagram for graceful completion of Kafka send futures in re_evaluate_systemssequenceDiagram
participant re_evaluate_systems
participant EVENT_LOOP as EVENT_LOOP
participant BATCH_SEMAPHORE as BATCH_SEMAPHORE
participant cur as cur
participant EVALUATOR_QUEUE as EVALUATOR_QUEUE
participant LOGGER as LOGGER
re_evaluate_systems->>EVENT_LOOP: loop = EVENT_LOOP
re_evaluate_systems->>re_evaluate_systems: total_scheduled = 0
re_evaluate_systems->>re_evaluate_systems: futures = []
loop while More database rows
re_evaluate_systems->>EVENT_LOOP: run_until_complete(BATCH_SEMAPHORE.acquire())
re_evaluate_systems->>cur: fetchmany(size = CFG.re_evaluation_kafka_batch_size)
cur-->>re_evaluate_systems: rows
re_evaluate_systems->>re_evaluate_systems: msgs = lists of system ids
re_evaluate_systems->>re_evaluate_systems: total_scheduled += len(msgs)
re_evaluate_systems->>EVALUATOR_QUEUE: send_list(msgs, loop = loop)
EVALUATOR_QUEUE-->>re_evaluate_systems: future
re_evaluate_systems->>future: add_done_callback(lambda x: BATCH_SEMAPHORE.release())
re_evaluate_systems->>re_evaluate_systems: futures.append(future)
end
alt futures not empty
re_evaluate_systems->>LOGGER: info(Waiting for N Kafka send operations to complete)
re_evaluate_systems->>EVENT_LOOP: run_until_complete(asyncio.gather(*futures))
end
re_evaluate_systems->>LOGGER: info(N systems scheduled for re-evaluation)
re_evaluate_systems->>DB as DB: conn.commit()
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Accumulating all
futureobjects in a singlefutureslist for the entirewhile Trueloop can lead to unnecessary memory growth for large re-evaluation runs; consider periodically awaiting and clearing a batch-local list of futures (e.g., per DB fetch) instead of storing all of them until the very end. - When using
asyncio.gather(*futures)withoutreturn_exceptions=True, a single exception in one send operation will raise and skip awaiting the rest; if partial completion is acceptable or you need to ensure all callbacks fire, considerreturn_exceptions=Trueand explicit error handling/logging.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Accumulating all `future` objects in a single `futures` list for the entire `while True` loop can lead to unnecessary memory growth for large re-evaluation runs; consider periodically awaiting and clearing a batch-local list of futures (e.g., per DB fetch) instead of storing all of them until the very end.
- When using `asyncio.gather(*futures)` without `return_exceptions=True`, a single exception in one send operation will raise and skip awaiting the rest; if partial completion is acceptable or you need to ensure all callbacks fire, consider `return_exceptions=True` and explicit error handling/logging.
## Individual Comments
### Comment 1
<location> `vmaas_sync/vmaas_sync.py:387` </location>
<code_context>
+
+ if futures:
+ LOGGER.info("Waiting for %s Kafka send operations to complete", len(futures))
+ loop.run_until_complete(asyncio.gather(*futures))
+
LOGGER.info("%s systems scheduled for re-evaluation", total_scheduled)
</code_context>
<issue_to_address>
**question (bug_risk):** Async gather error handling might need to distinguish partial failures
Calling `asyncio.gather(*futures)` without `return_exceptions=True` means one failing Kafka send will raise and cancel the rest, changing behavior from “fire-and-forget” to “fail-fast and cancel peers.” If you want all sends to run and then handle/log failures individually, use `asyncio.gather(*futures, return_exceptions=True)` and handle exceptions explicitly so one bad send doesn’t prevent others from completing.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
|
||
| if futures: | ||
| LOGGER.info("Waiting for %s Kafka send operations to complete", len(futures)) | ||
| loop.run_until_complete(asyncio.gather(*futures)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (bug_risk): Async gather error handling might need to distinguish partial failures
Calling asyncio.gather(*futures) without return_exceptions=True means one failing Kafka send will raise and cancel the rest, changing behavior from “fire-and-forget” to “fail-fast and cancel peers.” If you want all sends to run and then handle/log failures individually, use asyncio.gather(*futures, return_exceptions=True) and handle exceptions explicitly so one bad send doesn’t prevent others from completing.
|
/retest |
2 similar comments
|
/retest |
|
/retest |
RHINENG-22131
Secure Coding Practices Checklist GitHub Link
Secure Coding Checklist
Summary by Sourcery
Bug Fixes: