I have been trying out the sample code on go-amp package README, but I wanted to connect to a topic and not a queue as shown in the sample code on that README as of today.
What I did was to just put the topic name where the 'queue-name' had been put like this.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/Azure/go-amqp"
)
const host = "example.com"
const topic = "/topic/my_topic"
const port = "5672"
const username = "my_username"
const password = "my_password"
// A hleper function to handle errors
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s %s", msg, err)
}
}
func main(){
// connect to remote amqp server
host_address := fmt.Sprintf("amqps://%s:%s", host, port)
log.Println("Connecting to ", host_address)
client, err := amqp.Dial(host_address,
amqp.ConnSASLPlain(username, password),
)
failOnError(err, "Failed to connect to Server")
defer client.Close()
// Open a session
session, err := client.NewSession()
failOnError(err, "Failed to create AMQP session")
ctx := context.Background()
// Continuously read messages
{
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress(topic),
amqp.LinkCredit(10),
)
failOnError(err, "Failed creating receiver link")
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
receiver.Close(ctx)
cancel()
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
for {
// Receive next message
msg, err := receiver.Receive(ctx)
failOnError(err, "Failed reading message from AMQP:")
// Accept message
msg.Accept(context.Background())
fmt.Printf("Message received: Body: %s\n", msg.Value)
}
}
}
I kept getting this error.
Failed creating receiver link *Error{Condition: amqp:unauthorized-access,
Description: User my_username is not authorized to read from: queue:///topic/my_topic, Info: map[]}
It looks like it is treating my topic as a queue. How do I set the receiver to try to attach to a topic and not a queue?
EDIT
I am using an ActiveMQ broker that uses AMQP 1.0. It is managed by someone else so I just have to use AMQP 1.0.
That means I cannot use the more popular go amqp package as it has no support for AMQP 1.0. Thanks Tim Bish for alerting me to add this.
After trial and error, I figured one can just change topic to 'topic://my_topic'
const topic = "topic://my_topic"
The rest of the code remains the same. I created a gist for both sending to and receiving from a topic.
I hope that helps newbies like me some good hours of banging their heads on their keyboards.