During performance test my application died with following log:
17:17:28.187 [info] SIGTERM received - shutting down
17:17:28.187 [info] SIGTERM received - shutting down
17:17:28.188 [error] GenServer #PID<0.3707.0> terminating
** (stop) 'stopping because dependent process <0.3703.0> died: shutdown'
Last message: {:EXIT, #PID<0.3703.0>, :shutdown}
17:17:28.189 [error] gen_server <0.3707.0> terminated with reason: "stopping because dependent process <0.3703.0> died: shutdown"
17:17:28.190 [error] CRASH REPORT Process <0.3707.0> with 0 neighbours exited with reason: "stopping because dependent process <0.3703.0> died: shutdown" in gen_server:handle_common_reply/8 line 751
17:17:28.190 [error] Supervisor {<0.3705.0>,amqp_connection_sup} had child connection started with amqp_gen_connection:start_link(<0.3706.0>, {amqp_params_network,<<"publicmq-npperfcom1">>,<<"publicmq-npperfcom1">>,<<"/publicmq-npperfcom1">>,...}) at <0.3707.0> exit with reason "stopping because dependent process <0.3703.0> died: shutdown" in context child_terminated
17:17:28.190 [error] Supervisor {<0.3705.0>,amqp_connection_sup} had child connection started with amqp_gen_connection:start_link(<0.3706.0>, {amqp_params_network,<<"publicmq-npperfcom1">>,<<"publicmq-npperfcom1">>,<<"/publicmq-npperfcom1">>,...}) at <0.3707.0> exit with reason reached_max_restart_intensity in context shutdown
Code that spawn connection to amqp looks like this:
defmodule MyApp.Events.AmqpTransport do
require Logger
use GenServer
use AMQP
@restart_delay 2000 # 2 seconds
defmodule State do
@moduledoc false
@type t :: %__MODULE__{
exchange: String.t,
channel: AMQP.Channel.t,
routing_key: String.t,
emitter_id: String.t,
np_tracking_id: String.t
}
defstruct [:exchange, :channel, :routing_key, :emitter_id, :np_tracking_id]
end
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_opts) do
Process.flag(:trap_exit, true)
send(self(), :connect)
{:ok, nil}
end
def handle_info(:connect, _state) do
username = get_conf(:username)
password = get_conf(:password)
host = get_conf(:host)
port = get_conf(:port)
vhost = String.replace(get_conf(:vhost), "/", "%2f")
amqp_url = "amqp://#{username}:#{password}@#{host}:#{port}/#{vhost}"
Logger.info("amqp transport connecting to #{amqp_url}")
case Connection.open(amqp_url) do
{:ok, conn} ->
Process.link(conn.pid)
{:ok, chan} = Channel.open(conn)
:ok = AMQP.Exchange.declare(chan, get_conf(:exchange), :topic, durable: true)
state = %State{
exchange: get_conf(:exchange),
channel: chan,
routing_key: get_conf(:routing_key),
emitter_id: Application.fetch_env!(:coups_events, :emitter_id),
np_tracking_id: Application.fetch_env!(:coups_events, :np_tracking_id),
}
{:noreply, state}
{:error, err} ->
Logger.error("amqp transport failed\n Err: #{inspect(err)}\n Retrying to connect ...")
Process.send_after(self(), :connect, @restart_delay)
{:noreply, nil}
end
end
def handle_info({:EXIT, pid, reason}, _state) do
Logger.error("amqp transport failed with #{inspect(reason)}")
Process.unlink(pid)
Process.send_after(self(), :connect, @restart_delay)
{:noreply, nil}
end
def handle_cast({:emit, event}, state) do
# event publishing
{:noreply, state}
end
defp get_conf(key) do
conf = Application.get_env(:events, :amqp)
conf[key]
end
end
Questions:
Connection
starts supervised internally by amqp
application’s supervisor as amqp_sup:start_connection_sup(AmqpParams3)
(Connection.open/2
simply delegates to :amqp_connection.start/2
) under the hood.
That is tricky and not idempotent in general to have one process to be linked to two processes trapping exits, that’s why the official documentation suggests to Process.monitor/1
the underlying connection and go full path restarting the monitoring process as well.
I recall there were complaints from Andrea Leopardi on the relevant topic, but it always worked perfectly for me.