Search code examples
gorabbitmqamqp

How to keep my connection alive for publishing messages with RabbitMQ streadway/amqp?


As connection opening each time for publishing is costly I'm trying to implement some way to keep the connection alive and share it in my app to publish messages.

var (
    Connection *amqp.Connection
    Channel *amqp.Channel
    err error
)

func Connect() {

    Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
    FailOnError(err, "Failed to connect to RabbitMQ")

    Channel, err = Connection.Channel()
    FailOnError(err, "Failed to open a channel")
}

func CloseConnection() {
    err = Channel.Close()
    FailOnError(err, "Failed to close channel ")
    err = Connection.Close()
    FailOnError(err, "Failed to close connection ")
}

func KeepAlive() {

    queue, err := Channel.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    FailOnError(err, "couldn't publish tics")

    tic := "tic"
    for {
        err := Channel.Publish(
            "",         // exchange
            queue.Name, // routing key
            false,      // mandatory
            false,      // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(tic),
                Expiration: "5000",
            })
        FailOnError(err, "couldn't publish tics")
        time.Sleep(5 *time.Second)
    }
}

func FailOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

The function KeepAlive is an infinite loop that keeps sending a dummy message every 5 secs and that message have a TTL of 5 secs too so it gets destroyed.

func main() {
    rabbitmq.Connect()
    defer rabbitmq.CloseConnection()
    go func() {
        //publisher connection to stay alive as long as application is running
        rabbitmq.KeepAlive()
    }()

    data_layer.OpenDBConnection()
    router := gin.Default()

    router.POST("/whatever", whatever)

    err := router.Run()
    if err != nil {
        log.Fatal(err.Error())
    }
}

Here I'm creating the connection and calling KeepAlive in a goroutine so it can work in the background, keeping my connection alive all the time.

My questions:

  • I feel that this way is just a work around and although I've tried to search for examples how to keep it alive, it seems all of these examples are interested in the consumer side. Is there a cleaner way to keep my connection alive?

  • Is keeping my connection alive as long as my application is running bad? performance wise (network, memory usage)? note: I'm planning to monitor this with Prometheus to watch the performance but any note about what I might face would be helpful

Side note: these tics that are sent will be sent to a dummy queue since if I send it to my queue that I consume messages from by another service it will get stuck behind actual messages that doesn't have TTL and these tics will grow very large.


Solution

  • With streadway/amqp you don't need to implement the keepalive yourself. The library already provides this mechanism.

    The method amqp.Dial constructs a Connection with a default heartbeat of 10 seconds. You can see the code here:

    // connection.go
    func Dial(url string) (*Connection, error) {
        return DialConfig(url, Config{
            Heartbeat: defaultHeartbeat,
            Locale:    defaultLocale,
        })
    }
    

    This works by sending heartbeat frames on the open connection, which is going to be more efficient and maintainable than sending fake messages to a queue created only for that reason.

    From the above follows that you can change the connection heartbeat with amqp.DialConfig:

        conn, err := amqp.DialConfig(url, amqp.Config{
            Heartbeat: 5 * time.Second,
        })
    

    What you might want to implement yourself is the reconnect-on-error logic. For that you can find some useful information here: How to check if the channel is still working in streadway/amqp RabbitMQ client?