Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/rabbit/broker/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ defmodule Rabbit.Broker.Supervisor do
name = Broker.consumers(module)
opts = Keyword.get(opts, :consumers, [])
opts = Enum.map(opts, &Keyword.put(&1, :connection, conn))
spec = build_spec(Rabbit.Consumer.Supervisor, name, module, opts)

spec =
Rabbit.Consumer.Supervisor
|> build_spec(name, module, opts)
|> Map.put(:type, :supervisor)

children ++ [spec]
end
Expand Down
72 changes: 52 additions & 20 deletions lib/rabbit/consumer/executer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ defmodule Rabbit.Consumer.Executer do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, args},
restart: :temporary
restart: :temporary,
shutdown: 25_000
}
end

Expand Down Expand Up @@ -49,34 +50,70 @@ defmodule Rabbit.Consumer.Executer do
@impl GenServer
def handle_continue(:run, state) do
state = run(state)
{:noreply, state, state.timeout}
{:noreply, state}
end

@doc false
@impl GenServer
def handle_info(:timeout, state) do
if is_pid(state.executer), do: Process.exit(state.executer, :normal)
Task.shutdown(state.task, :brutal_kill)
handle_error(state, {:exit, :timeout}, [])
{:stop, :timeout, state}
{:stop, :timeout, %{state | completed: true}}
end

def handle_info({:EXIT, pid1, reason}, %{executer: pid2} = state) when pid1 == pid2 do
def handle_info({ref, _result}, %{task: %Task{ref: ref}} = state) do
# Task completed successfully - the task body already did ack/nack.
# Flush the :DOWN message that Task.async sends after completion.
Process.demonitor(ref, [:flush])
{:stop, :normal, %{state | completed: true}}
end

def handle_info({:DOWN, ref, :process, _pid, reason}, %{task: %Task{ref: ref}} = state) do
# Task crashed before completing. Run error handler.
{reason, stack} =
case reason do
{%_{} = reason, stack} -> {reason, stack}
reason -> {reason, []}
end

handle_error(state, reason, stack)
{:stop, reason, state}
{:stop, reason, %{state | completed: true}}
end

def handle_info({:EXIT, _, _}, state) do
# Task.async links to the caller. Since we trap exits, we receive
# EXIT messages from the task process. The actual result/crash is
# handled via the task ref and :DOWN messages above, so we ignore
# EXIT signals here.
{:noreply, state}
end

@doc false
@impl GenServer
def handle_cast({:complete, ref1}, %{executer_ref: ref2} = state) when ref1 == ref2 do
{:stop, :normal, state}
def terminate(_reason, %{completed: false, message: message} = state) do
result =
if state.task do
Task.shutdown(state.task, 5_000)
end

# Only nack if the task did not complete within the grace period.
# {:ok, _} means the task finished - it already acked/nacked inside its body.
case result do
{:ok, _} ->
:ok

_ ->
try do
Message.nack(message, requeue: true)
catch
_, _ -> :ok
end
end

:ok
end

def terminate(_reason, _state), do: :ok

################################
# Private Functions
################################
Expand All @@ -85,9 +122,9 @@ defmodule Rabbit.Consumer.Executer do
opts
|> Enum.into(%{})
|> Map.merge(%{
executer: nil,
executer_ref: nil,
message: message
task: nil,
message: message,
completed: false
})
end

Expand All @@ -100,11 +137,8 @@ defmodule Rabbit.Consumer.Executer do
end

defp run(state) do
parent = self()
ref = make_ref()

executer =
spawn_link(fn ->
task =
Task.async(fn ->
try do
message = decode_payload!(state.message)
consumer_callback(state, :handle_message, [message])
Expand All @@ -113,11 +147,9 @@ defmodule Rabbit.Consumer.Executer do
catch
msg, reason -> handle_error(state, {msg, reason}, __STACKTRACE__)
end

GenServer.cast(parent, {:complete, ref})
end)

%{state | executer: executer, executer_ref: ref}
%{state | task: task}
end

defp decode_payload!(message) do
Expand Down
29 changes: 23 additions & 6 deletions lib/rabbit/consumer/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -286,19 +286,36 @@ defmodule Rabbit.Consumer.Server do
end

state
|> cancel_consumer()
|> stop_workers()
|> close_channel()
end

defp cancel_consumer(%{consuming: true, channel_open: true} = state) do
try do
AMQP.Basic.cancel(state.channel, state.consumer_tag)
catch
_, _ -> :ok
end

%{state | consuming: false}
end

defp cancel_consumer(state), do: state

defp stop_workers(state) do
if state.workers_started do
Enum.each(state.workers, fn worker ->
try do
:ok = Worker.stop(worker)
catch
_, _ -> :ok
end
state.workers
|> Enum.map(fn worker ->
Task.async(fn ->
try do
Worker.stop(worker)
catch
_, _ -> :ok
end
end)
end)
|> Task.await_many(25_000)
end

%{state | workers: nil, workers_started: false}
Expand Down
8 changes: 7 additions & 1 deletion lib/rabbit/consumer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ defmodule Rabbit.Consumer.Supervisor do

defp build_children(module, [consumer | consumers], children) do
id = children |> Enum.count() |> to_string() |> String.to_atom()
spec = %{id: id, start: {Rabbit.Consumer, :start_link, [module, consumer]}}

spec = %{
id: id,
start: {Rabbit.Consumer, :start_link, [module, consumer]},
shutdown: 30_000
}

children = children ++ [spec]
build_children(module, consumers, children)
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Rabbit.MixProject do
use Mix.Project

@version "0.21.0"
@version "0.22.0"

def project do
[
Expand Down
Loading
Loading