Search code examples
goactivemq-classicamqp

How do you connect to an AMQP 1.0 topic not queue in Golang


I have been trying out the sample code on go-amp package README, but I wanted to connect to a topic and not a queue as shown in the sample code on that README as of today.

What I did was to just put the topic name where the 'queue-name' had been put like this.

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/Azure/go-amqp"
)

const host = "example.com"
const topic = "/topic/my_topic"
const port = "5672"
const username = "my_username"
const password = "my_password"

// A hleper function to handle errors
func failOnError(err error, msg string){
    if err != nil {
        log.Fatalf("%s %s", msg, err)
    }
}


func main(){
    // connect to remote amqp server
    host_address := fmt.Sprintf("amqps://%s:%s", host, port)
    log.Println("Connecting to ", host_address)

    client, err := amqp.Dial(host_address,
        amqp.ConnSASLPlain(username, password),
    )
    failOnError(err, "Failed to connect to Server")
    defer client.Close()

    // Open a session
    session, err := client.NewSession()
    failOnError(err, "Failed to create AMQP session")

    ctx := context.Background()

        // Continuously read messages
        {
            // Create a receiver
            receiver, err := session.NewReceiver(
                amqp.LinkSourceAddress(topic),
                amqp.LinkCredit(10),
            )
            failOnError(err, "Failed creating receiver link")
            defer func() {
                ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
                receiver.Close(ctx)
                cancel()
            }()

            log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
            for {
                // Receive next message
                msg, err := receiver.Receive(ctx)
                failOnError(err, "Failed reading message from AMQP:")
    
                // Accept message
                msg.Accept(context.Background())    
                fmt.Printf("Message received: Body: %s\n", msg.Value)
            }
        }
}

I kept getting this error.

Failed creating receiver link *Error{Condition: amqp:unauthorized-access, 
Description: User my_username is not authorized to read from: queue:///topic/my_topic, Info: map[]}

It looks like it is treating my topic as a queue. How do I set the receiver to try to attach to a topic and not a queue?

EDIT
I am using an ActiveMQ broker that uses AMQP 1.0. It is managed by someone else so I just have to use AMQP 1.0. That means I cannot use the more popular go amqp package as it has no support for AMQP 1.0. Thanks Tim Bish for alerting me to add this.


Solution

  • After trial and error, I figured one can just change topic to 'topic://my_topic'

    const topic = "topic://my_topic"
    

    The rest of the code remains the same. I created a gist for both sending to and receiving from a topic.

    I hope that helps newbies like me some good hours of banging their heads on their keyboards.