Search code examples
gorabbitmqrabbitmqctl

Whether to create connection every time when amqp.Dial is threadsafe or not in go lang


As it is mentioned in the RabbitMQ docs that tcp connections are expensive to make. So, for that concept of channel was introduced. Now i came across this example. In the main() it creates the connection everytime a message is publised. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/"). Shouldn't it be declared globally once and there should be failover mechanism in case connection get closed like singleton object. If amqp.Dial is thread-safe, which i suppose it should be

Edited question :

I am handling the connection error in the following manner. In which i listen on a channel and create a new connection on error. But when i kill the existing connection and try to publish message. I get the following error.

error :

2016/03/30 19:20:08 Failed to open a channel: write tcp 172.16.5.48:51085->172.16.0.20:5672: use of closed network connection
exit status 1
7:25 PM

Code :

 func main() {

        Conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
         context := &appContext{queueName: "QUEUENAME",exchangeName: "ExchangeName",exchangeType: "direct",routingKey: "RoutingKey",conn: Conn}
        c := make(chan *amqp.Error)

        go func() {
            error := <-c
            if(error != nil){                
                Conn, err = amqp.Dial("amqp://guest:[email protected]:5672/")            
                failOnError(err, "Failed to connect to RabbitMQ")            
                Conn.NotifyClose(c)                                           
            }            
        }()

        Conn.NotifyClose(c)
        r := web.New()
        // We pass an instance to our context pointer, and our handler.
        r.Get("/", appHandler{context, IndexHandler})
        graceful.ListenAndServe(":8086", r)  

    }

Solution

  • Of course, you shouldn't create a connection for each request. Make it a global variable or better part of an application context which you initialize once at startup.

    You can handle connection errors by registering a channel using Connection.NotifyClose:

    func initialize() {
      c := make(chan *amqp.Error)
      go func() {
        err := <-c
        log.Println("reconnect: " + err.Error())
        initialize()
      }()
    
      conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
      if err != nil {
        panic("cannot connect")
      }
      conn.NotifyClose(c)
    
      // create topology
    }