diff --git a/lib/rabbit/broker/supervisor.ex b/lib/rabbit/broker/supervisor.ex index 7af7c1c..2d9e5df 100644 --- a/lib/rabbit/broker/supervisor.ex +++ b/lib/rabbit/broker/supervisor.ex @@ -69,7 +69,11 @@ defmodule Rabbit.Broker.Supervisor do name = Broker.consumers(module) opts = Keyword.get(opts, :consumers, []) opts = Enum.map(opts, &Keyword.put(&1, :connection, conn)) - spec = build_spec(Rabbit.Consumer.Supervisor, name, module, opts) + + spec = + Rabbit.Consumer.Supervisor + |> build_spec(name, module, opts) + |> Map.put(:type, :supervisor) children ++ [spec] end diff --git a/lib/rabbit/consumer/executer.ex b/lib/rabbit/consumer/executer.ex index 57b413c..ce12a96 100644 --- a/lib/rabbit/consumer/executer.ex +++ b/lib/rabbit/consumer/executer.ex @@ -20,7 +20,8 @@ defmodule Rabbit.Consumer.Executer do %{ id: __MODULE__, start: {__MODULE__, :start_link, args}, - restart: :temporary + restart: :temporary, + shutdown: 25_000 } end @@ -49,18 +50,26 @@ defmodule Rabbit.Consumer.Executer do @impl GenServer def handle_continue(:run, state) do state = run(state) - {:noreply, state, state.timeout} + {:noreply, state} end @doc false @impl GenServer def handle_info(:timeout, state) do - if is_pid(state.executer), do: Process.exit(state.executer, :normal) + Task.shutdown(state.task, :brutal_kill) handle_error(state, {:exit, :timeout}, []) - {:stop, :timeout, state} + {:stop, :timeout, %{state | completed: true}} end - def handle_info({:EXIT, pid1, reason}, %{executer: pid2} = state) when pid1 == pid2 do + def handle_info({ref, _result}, %{task: %Task{ref: ref}} = state) do + # Task completed successfully - the task body already did ack/nack. + # Flush the :DOWN message that Task.async sends after completion. + Process.demonitor(ref, [:flush]) + {:stop, :normal, %{state | completed: true}} + end + + def handle_info({:DOWN, ref, :process, _pid, reason}, %{task: %Task{ref: ref}} = state) do + # Task crashed before completing. Run error handler. {reason, stack} = case reason do {%_{} = reason, stack} -> {reason, stack} @@ -68,15 +77,43 @@ defmodule Rabbit.Consumer.Executer do end handle_error(state, reason, stack) - {:stop, reason, state} + {:stop, reason, %{state | completed: true}} + end + + def handle_info({:EXIT, _, _}, state) do + # Task.async links to the caller. Since we trap exits, we receive + # EXIT messages from the task process. The actual result/crash is + # handled via the task ref and :DOWN messages above, so we ignore + # EXIT signals here. + {:noreply, state} end - @doc false @impl GenServer - def handle_cast({:complete, ref1}, %{executer_ref: ref2} = state) when ref1 == ref2 do - {:stop, :normal, state} + def terminate(_reason, %{completed: false, message: message} = state) do + result = + if state.task do + Task.shutdown(state.task, 5_000) + end + + # Only nack if the task did not complete within the grace period. + # {:ok, _} means the task finished - it already acked/nacked inside its body. + case result do + {:ok, _} -> + :ok + + _ -> + try do + Message.nack(message, requeue: true) + catch + _, _ -> :ok + end + end + + :ok end + def terminate(_reason, _state), do: :ok + ################################ # Private Functions ################################ @@ -85,9 +122,9 @@ defmodule Rabbit.Consumer.Executer do opts |> Enum.into(%{}) |> Map.merge(%{ - executer: nil, - executer_ref: nil, - message: message + task: nil, + message: message, + completed: false }) end @@ -100,11 +137,8 @@ defmodule Rabbit.Consumer.Executer do end defp run(state) do - parent = self() - ref = make_ref() - - executer = - spawn_link(fn -> + task = + Task.async(fn -> try do message = decode_payload!(state.message) consumer_callback(state, :handle_message, [message]) @@ -113,11 +147,9 @@ defmodule Rabbit.Consumer.Executer do catch msg, reason -> handle_error(state, {msg, reason}, __STACKTRACE__) end - - GenServer.cast(parent, {:complete, ref}) end) - %{state | executer: executer, executer_ref: ref} + %{state | task: task} end defp decode_payload!(message) do diff --git a/lib/rabbit/consumer/server.ex b/lib/rabbit/consumer/server.ex index dc712ca..32e54d2 100644 --- a/lib/rabbit/consumer/server.ex +++ b/lib/rabbit/consumer/server.ex @@ -286,19 +286,36 @@ defmodule Rabbit.Consumer.Server do end state + |> cancel_consumer() |> stop_workers() |> close_channel() end + defp cancel_consumer(%{consuming: true, channel_open: true} = state) do + try do + AMQP.Basic.cancel(state.channel, state.consumer_tag) + catch + _, _ -> :ok + end + + %{state | consuming: false} + end + + defp cancel_consumer(state), do: state + defp stop_workers(state) do if state.workers_started do - Enum.each(state.workers, fn worker -> - try do - :ok = Worker.stop(worker) - catch - _, _ -> :ok - end + state.workers + |> Enum.map(fn worker -> + Task.async(fn -> + try do + Worker.stop(worker) + catch + _, _ -> :ok + end + end) end) + |> Task.await_many(25_000) end %{state | workers: nil, workers_started: false} diff --git a/lib/rabbit/consumer/supervisor.ex b/lib/rabbit/consumer/supervisor.ex index 6821d10..27ca39e 100644 --- a/lib/rabbit/consumer/supervisor.ex +++ b/lib/rabbit/consumer/supervisor.ex @@ -35,7 +35,13 @@ defmodule Rabbit.Consumer.Supervisor do defp build_children(module, [consumer | consumers], children) do id = children |> Enum.count() |> to_string() |> String.to_atom() - spec = %{id: id, start: {Rabbit.Consumer, :start_link, [module, consumer]}} + + spec = %{ + id: id, + start: {Rabbit.Consumer, :start_link, [module, consumer]}, + shutdown: 30_000 + } + children = children ++ [spec] build_children(module, consumers, children) end diff --git a/mix.exs b/mix.exs index 8a5c620..28b2a84 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Rabbit.MixProject do use Mix.Project - @version "0.21.0" + @version "0.22.0" def project do [ diff --git a/test/consumer/executer_test.exs b/test/consumer/executer_test.exs new file mode 100644 index 0000000..c2984da --- /dev/null +++ b/test/consumer/executer_test.exs @@ -0,0 +1,194 @@ +defmodule Rabbit.Consumer.ExecuterTest do + use ExUnit.Case, async: true + + alias Rabbit.Consumer.Executer + alias Rabbit.Message + + @moduletag :capture_log + + # A consumer module that signals the test process and blocks until told to proceed. + defmodule BlockingConsumer do + def handle_message(message) do + test_pid = message.custom_meta.test_pid + + send(test_pid, {:handling_message, self()}) + + receive do + :proceed -> :ok + {:proceed_with, return} -> return + end + end + + def handle_error(message) do + test_pid = message.custom_meta.test_pid + send(test_pid, {:handling_error, self(), message.error_reason}) + :ok + end + end + + # A consumer module that completes immediately. + defmodule ImmediateConsumer do + def handle_message(message) do + test_pid = message.custom_meta.test_pid + send(test_pid, {:handled_message, self()}) + :ok + end + + def handle_error(message) do + test_pid = message.custom_meta.test_pid + send(test_pid, {:handled_error, self(), message.error_reason}) + :ok + end + end + + # A consumer module that raises an exception. + defmodule CrashingConsumer do + def handle_message(_message) do + raise "boom" + end + + def handle_error(message) do + test_pid = message.custom_meta.test_pid + send(test_pid, {:handled_error, self(), message.error_reason}) + :ok + end + end + + defp build_message(module, opts \\ []) do + custom_meta = Map.new([{:test_pid, self()} | opts]) + + %Message{ + consumer: self(), + module: module, + channel: nil, + payload: "test", + decoded_payload: nil, + meta: %{delivery_tag: 1, content_type: nil, exchange: "", routing_key: ""}, + custom_meta: custom_meta + } + end + + defp start_executer(message, opts) do + Process.flag(:trap_exit, true) + {:ok, pid} = Executer.start_link(message, opts) + pid + end + + describe "child_spec/1" do + test "sets restart to :temporary" do + spec = Executer.child_spec([build_message(ImmediateConsumer)]) + assert spec.restart == :temporary + end + + test "sets shutdown to 25_000" do + spec = Executer.child_spec([build_message(ImmediateConsumer)]) + assert spec.shutdown == 25_000 + end + end + + describe "task lifecycle" do + test "stops normally when task completes" do + message = build_message(ImmediateConsumer) + pid = start_executer(message, timeout: 60_000) + ref = Process.monitor(pid) + + assert_receive {:handled_message, _} + assert_receive {:DOWN, ^ref, :process, ^pid, :normal} + end + + test "runs error handler when task crashes" do + message = build_message(CrashingConsumer) + pid = start_executer(message, timeout: 60_000) + ref = Process.monitor(pid) + + assert_receive {:handled_error, _, %RuntimeError{message: "boom"}} + assert_receive {:DOWN, ^ref, :process, ^pid, _reason} + end + + @tag timeout: 10_000 + test "runs error handler on timeout" do + message = build_message(BlockingConsumer) + pid = start_executer(message, timeout: 100) + ref = Process.monitor(pid) + + # The message handler starts but blocks + assert_receive {:handling_message, _} + + # Timeout fires, error handler is called + assert_receive {:handling_error, _, {:exit, :timeout}}, 1_000 + + assert_receive {:DOWN, ^ref, :process, ^pid, :timeout} + end + end + + describe "terminate/2 on shutdown" do + test "shuts down in-flight task and exits cleanly" do + message = build_message(BlockingConsumer) + pid = start_executer(message, timeout: 60_000) + ref = Process.monitor(pid) + + # Wait for the task to be running + assert_receive {:handling_message, task_pid} + task_ref = Process.monitor(task_pid) + + # Shut down the executer + Process.exit(pid, :shutdown) + + # Task should be shut down (receives :shutdown signal from Task.shutdown/2) + assert_receive {:DOWN, ^task_ref, :process, ^task_pid, :shutdown}, 1_000 + + # Executer should also be down + assert_receive {:DOWN, ^ref, :process, ^pid, :shutdown}, 1_000 + end + + test "does not safety-net nack when task completed before shutdown" do + # If the task already completed (completed: true), the second terminate/2 + # clause matches which is a no-op. Verify clean :normal exit. + message = build_message(ImmediateConsumer) + pid = start_executer(message, timeout: 60_000) + ref = Process.monitor(pid) + + assert_receive {:handled_message, _} + assert_receive {:DOWN, ^ref, :process, ^pid, :normal} + end + + test "skips nack when task finishes before shutdown is processed" do + # If the task completes and the GenServer processes {ref, result} + # before the :shutdown signal, completed is set to true and + # terminate/2 is a no-op. The process exits :normal. + # + # If the GenServer hasn't processed {ref, result} yet but the task + # is done, Task.shutdown/2 in terminate/2 returns {:ok, _} and + # the nack is skipped. + # + # Either way, no safety-net nack occurs. + message = build_message(ImmediateConsumer) + pid = start_executer(message, timeout: 60_000) + ref = Process.monitor(pid) + + # Wait for the task to finish its work + assert_receive {:handled_message, _} + + # Send shutdown - may or may not arrive before the GenServer + # processes the task completion message + Process.exit(pid, :shutdown) + + # Process exits either :normal (task result processed first) or + # :shutdown (shutdown processed first, but Task.shutdown returns {:ok, _}) + assert_receive {:DOWN, ^ref, :process, ^pid, reason}, 1_000 + assert reason in [:normal, :shutdown] + end + + test "handles shutdown when task was never started" do + message = build_message(BlockingConsumer) + pid = start_executer(message, timeout: 60_000) + ref = Process.monitor(pid) + + # Immediately shut down before task can start + Process.exit(pid, :shutdown) + + # Should shut down without crashing (nack fails silently with nil channel) + assert_receive {:DOWN, ^ref, :process, ^pid, :shutdown}, 1_000 + end + end +end diff --git a/test/consumer_supervisor_test.exs b/test/consumer_supervisor_test.exs index 4acbd6a..bbf6f63 100644 --- a/test/consumer_supervisor_test.exs +++ b/test/consumer_supervisor_test.exs @@ -80,4 +80,17 @@ defmodule Rabbit.ConsumerSupervisorTest do assert {:ok, consumer_sup} = ConsumerSupervisor.start_link(TestConsumers) assert [_, _] = Supervisor.which_children(consumer_sup) end + + describe "child specs" do + test "consumer server children have shutdown of 30_000" do + assert {:ok, consumer_sup} = ConsumerSupervisor.start_link(TestConsumers) + + children = Supervisor.which_children(consumer_sup) + + for {id, _pid, :worker, _modules} <- children do + {:ok, spec} = :supervisor.get_childspec(consumer_sup, id) + assert spec.shutdown == 30_000 + end + end + end end