Improve OTP shutdown behavior for consumers#103
Open
Conversation
- Set Consumer.Supervisor child spec type to :supervisor so it gets shutdown: :infinity instead of being killed after 5 seconds - Set explicit shutdown: 30_000 on each Consumer.Server child spec - Cancel AMQP consumption (Basic.cancel) before stopping workers so no new messages arrive during drain - Stop workers in parallel instead of sequentially to fit within the shutdown budget - Add terminate/2 to Executer that nacks unfinished messages with requeue: true, preventing unacked messages from accumulating on quorum queues - Set Executer child_spec shutdown: 25_000 to give in-flight messages time to complete before the safety-net nack - Bump version to 0.22.0
Replace spawn_link with Task.async so terminate/2 can use Task.shutdown/2 to give in-flight messages a 5s grace period to complete before escalating to :kill and safety-net nacking. Previously the spawned process was killed immediately on shutdown with no chance to finish. Now the sequence is: 1. Task.shutdown(task, 5_000) - sends :shutdown, waits 5s 2. If task doesn't finish, brutally kills it 3. Safety-net nack with requeue: true
Check Task.shutdown/2 return value in terminate/2. If it returns
{:ok, _}, the task finished within the grace period and already
acked/nacked inside its body - skip the redundant nack.
- Executer lifecycle: normal completion, crash error handling, timeout - Executer terminate/2: shutdown with in-flight task, task-never-started edge case, skip nack when task completes before shutdown - Child spec assertions: Executer shutdown/restart, Consumer.Server shutdown timeout of 30_000
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Improves the consumer shutdown sequence to follow proper OTP conventions. Previously, incorrect child spec types, missing consumption cancellation, and lack of graceful drain logic meant that a SIGTERM (e.g. k8s pod termination) would result in abrupt process kills rather than an orderly shutdown.
Problems Fixed
:workerwith a 5s shutdown — the entire consumer subtree had only 5 seconds before being:killed. Now correctly typed as:supervisorwith:infinityshutdown, allowing children to drain properly.AMQP.Basic.cancelfirst.DynamicSupervisor.stopcalls could easily exceed the shutdown budget. Now stops all workers in parallel.terminate/2— when shut down externally, the spawned message-processing process was killed with no opportunity to finish. Now usesTask.async+Task.shutdown/2to give in-flight work a 5s grace period before escalating, and nacks withrequeue: trueas a safety net.Consumer.Supervisor (:infinity)>Consumer.Server (30s)>Workers/Executers (25s).Corrected Shutdown Sequence
Changes
lib/rabbit/broker/supervisor.ex— Consumer.Supervisor child spec getstype: :supervisorlib/rabbit/consumer/supervisor.ex— Consumer.Server child specs getshutdown: 30_000lib/rabbit/consumer/server.ex— Addcancel_consumer/1, parallelstop_workers/1lib/rabbit/consumer/executer.ex— Refactor toTask.async, addterminate/2with grace period and safety-net nack,shutdown: 25_000,completedstate trackingmix.exs— Version bump to 0.22.0