Search code examples
connectionelixiramqperlang-otperlang-supervisor

AMQP connection lost doesn't kill parent process, so reconnection never happens


I have init function of my GenServer. Supervisor is looking after it, and should restart it on exit.

  def init(_opts) do
    username = get_conf(:username)
    password = get_conf(:password)
    host = get_conf(:host)
    port = get_conf(:port)
    vhost = String.replace(get_conf(:vhost), "/", "%2f")
    {:ok, conn} = Connection.open("amqp://#{username}:#{password}@#{host}:#{port}/#{vhost}")
    {:ok, chan} = Channel.open(conn)
    state = %State{
      exchange: get_conf(:exchange),
      channel: chan,
      routing_key: get_conf(:routing_key)
    }
    {:ok, state}
  end

When I restart RabbitMQ with sudo service rabbitmq-server restart new connection isn't established.

In debug bar I see following: enter image description here When I click on connection pid <0.417.0> I get message that process doesn't exist any more. It seems that process is dead and parent AmqpTransport know nothing about that. How can I make AmqpTransport die together with it's child Connection?


Solution

  • Fixed it with adding trapping exits and linking to Connection process. Also I pattern match errors during connection to avoid reached_max_restart_intensity.

      @restart_delay 2000 # 2 seconds
    
      def init(_opts) do
        Process.flag(:trap_exit, true)
        send(self(), :connect)
        {:ok, nil}
      end
    
      def handle_info(:connect, state) do
        #...
        case Connection.open("amqp://#{username}:#{password}@#{host}:#{port}/#{vhost}") do
          {:ok, conn} ->
            Process.link(conn.pid)
            # ...
            {:noreply, state}
          {:error, :econnrefused} ->
            Logger.error("amqp transport failed with connection refused")
            Process.send_after(self(), :connect, @restart_delay)
            {:noreply, nil}
        end
    
      end
    
      def handle_info({:EXIT, pid, reason}, state) do
        Logger.error("amqp transport failed with #{inspect(reason)}")
        Process.unlink(pid)
        Process.send_after(self(), :connect, @restart_delay)
        {:noreply, nil}
      end