Search code examples
gorabbitmqamqp

RabbitMQ pub/sub implementation not working


I've converted the RabbitMQ pub/sub tutorial into the below dummy test. Somehow it is not working as expected.

amqpURL is a valid AMQP service (i.e. RabbitMQ) URL. I've tested it with the queue example and it works. Somehow it fails in "exchange"

I'd expect TestDummy to log " [x] Hello World". Somehow it is not happening. Only the sending half is working as expected.

What did I got wrong?

import (
    "fmt"
    "log"
    "testing"

    "github.com/streadway/amqp"
)

func TestDummy(t *testing.T) {
    done := exchangeReceive()
    exchangeSend("Hello World")
    <-done
}

func exchangeSend(msg string) {
    failOnError := func(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }

    log.Printf("exchangeSend: connect %s", amqpURL)
    conn, err := amqp.Dial(amqpURL)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    body := []byte(msg)
    err = ch.Publish(
        "logs", // exchange
        "",     // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

func exchangeReceive() <-chan bool {

    done := make(chan bool)

    failOnError := func(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }

    log.Printf("exchangeReceive: connect %s", amqpURL)
    conn, err := amqp.Dial(amqpURL)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when usused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.QueueBind(
        q.Name, // queue name
        "",     // routing key
        "logs", // exchange
        false,
        nil)
    failOnError(err, "Failed to bind a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
            done <- true
        }
    }()

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")

    return done
}

Solution

  • Some silly mistake here. When exchangeRecieve ends, the defer statments are triggered and hence closed the connections. That's why my rewrite fails.

    I've changed my code this way and it worked:

    import (
        "fmt"
        "os"
        "testing"
        "time"
    
        "github.com/streadway/amqp"
    )
    
    func TestDummy(t *testing.T) {
        amqpURL := os.Getenv("CLOUDAMQP_URL")
        t.Logf("  [*] amqpURL: %s", amqpURL)
    
        results1 := exchangeReceive(t, "consumer 1", amqpURL)
        results2 := exchangeReceive(t, "consumer 2", amqpURL)
        time.Sleep(50 * time.Millisecond)
    
        exchangeSend(t, amqpURL, "Hello World")
        if want, have := "Hello World", <-results1; want != have {
            t.Errorf("expected %#v, got %#v", want, have)
        }
        if want, have := "Hello World", <-results2; want != have {
            t.Errorf("expected %#v, got %#v", want, have)
        }
    }
    
    func exchangeReceive(t *testing.T, name, amqpURL string) <-chan string {
    
        out := make(chan string)
    
        failOnError := func(err error, msg string) {
            if err != nil {
                t.Fatalf("%s: %s", msg, err)
                panic(fmt.Sprintf("%s: %s", msg, err))
            }
        }
    
        conn, err := amqp.Dial(amqpURL)
        failOnError(err, "Failed to connect to RabbitMQ")
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
    
        err = ch.ExchangeDeclare(
            "logs",   // name
            "fanout", // type
            true,     // durable
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        q, err := ch.QueueDeclare(
            "",    // name
            false, // durable
            false, // delete when usused
            true,  // exclusive
            false, // no-wait
            nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        err = ch.QueueBind(
            q.Name, // queue name
            "",     // routing key
            "logs", // exchange
            false,
            nil)
        failOnError(err, "Failed to bind a queue")
    
        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            true,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        go func() {
            for d := range msgs {
                t.Logf("  [x] %s received: %s", name, d.Body)
                out <- string(d.Body)
            }
        }()
    
        t.Logf("  [*] %s ready to receive", name)
        return out
    }
    
    func exchangeSend(t *testing.T, amqpURL, msg string) {
        failOnError := func(err error, msg string) {
            if err != nil {
                t.Fatalf("%s: %s", msg, err)
                panic(fmt.Sprintf("%s: %s", msg, err))
            }
        }
    
        conn, err := amqp.Dial(amqpURL)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        err = ch.ExchangeDeclare(
            "logs",   // name
            "fanout", // type
            true,     // durable
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        body := []byte(msg)
        err = ch.Publish(
            "logs", // exchange
            "",     // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(body),
            })
        failOnError(err, "Failed to publish a message")
    
        t.Logf(" [x] Sent %s", body)
    }