Skip to content

fix: ensure graceful consumer shutdown on SIGTERM#102

Closed
nsweeting wants to merge 2 commits intomasterfrom
cleanup-shutdown
Closed

fix: ensure graceful consumer shutdown on SIGTERM#102
nsweeting wants to merge 2 commits intomasterfrom
cleanup-shutdown

Conversation

@nsweeting
Copy link
Copy Markdown
Owner

Summary

Fixes consumer shutdown behavior to prevent unacked messages from accumulating on quorum queues during SIGTERM (e.g. k8s pod termination). Previously, the shutdown sequence had several issues that could leave AMQP channels and messages in a broken state.

Problems Fixed

  • Consumer.Supervisor was typed as :worker with 5s shutdown — the entire consumer subtree (all consumers, workers, executers) had only 5 seconds before being :killed. Now correctly typed as :supervisor with :infinity shutdown.
  • No consumption cancellation during shutdown — the broker continued delivering new messages while workers were shutting down. Now calls AMQP.Basic.cancel first.
  • Workers stopped sequentially — on an N-core machine with N workers, sequential DynamicSupervisor.stop calls could easily exceed the shutdown budget. Now stops all workers in parallel.
  • Executer had no terminate/2 — when shut down externally (supervisor shutdown), the spawned message-processing process was killed with no ack or nack. Unacked messages would sit in limbo on the broker until heartbeat timeout. Now nacks with requeue: true as a safety net.
  • No explicit shutdown timeouts — all processes used OTP defaults (5s). Now uses a proper timeout chain: Consumer.Supervisor (:infinity) > Consumer.Server (30s) > Workers/Executers (25s).

Corrected Shutdown Sequence

SIGTERM
  -> Consumer.Supervisor gets :shutdown (timeout: :infinity)
    -> Each Consumer.Server gets :shutdown (timeout: 30s)
      -> terminate/2:
        1. AMQP.Basic.cancel — stop new message delivery
        2. Stop all Workers in parallel (25s timeout)
           -> Each Executer.terminate/2: nack with requeue if not completed
        3. AMQP.Channel.close — clean channel shutdown
  -> Producer.Pool, Topology.Server, Connection.Pool shut down after

Changes

  • lib/rabbit/broker/supervisor.ex — Consumer.Supervisor child spec gets type: :supervisor
  • lib/rabbit/consumer/supervisor.ex — Consumer.Server child specs get shutdown: 30_000
  • lib/rabbit/consumer/server.ex — Add cancel_consumer/1, parallel stop_workers/1
  • lib/rabbit/consumer/executer.ex — Add terminate/2 with safety-net nack, shutdown: 25_000, completed state tracking
  • mix.exs — Version bump to 0.22.0

- Set Consumer.Supervisor child spec type to :supervisor so it gets
  shutdown: :infinity instead of being killed after 5 seconds
- Set explicit shutdown: 30_000 on each Consumer.Server child spec
- Cancel AMQP consumption (Basic.cancel) before stopping workers so
  no new messages arrive during drain
- Stop workers in parallel instead of sequentially to fit within
  the shutdown budget
- Add terminate/2 to Executer that nacks unfinished messages with
  requeue: true, preventing unacked messages from accumulating on
  quorum queues
- Set Executer child_spec shutdown: 25_000 to give in-flight messages
  time to complete before the safety-net nack
- Bump version to 0.22.0
Replace spawn_link with Task.async so terminate/2 can use
Task.shutdown/2 to give in-flight messages a 5s grace period
to complete before escalating to :kill and safety-net nacking.

Previously the spawned process was killed immediately on shutdown
with no chance to finish. Now the sequence is:
1. Task.shutdown(task, 5_000) - sends :shutdown, waits 5s
2. If task doesn't finish, brutally kills it
3. Safety-net nack with requeue: true
@nsweeting nsweeting closed this Mar 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant