I want to have a standard function to continually listen/receive messages from an Azure Service Bus queue. I want a fairly efficient way we can handle the scenario that no messages are on the subscription/topic - rather than constantly hammering requests in a for loop.
Currently the ReceiveMessages() function is fairly limited in comparison to other language SDKs in terms of the receive options. There is no MaxWaitTime so to handle the case of receiving no messages, you simply have to continually retry in some form of loop suggested here - How do I keep my consumer listening the messages on Azure Sevice Bus using Azure sdk for Golang v0.3.1?
client, err := azservicebus.NewClientFromConnectionString("Connection String", nil)
if err != nil {
log.Fatalf("Failed to create Service Bus Client: %s", err.Error())
}
receiver, err := client.NewReceiverForQueue("queue", nil)
if err != nil {
log.Fatalf("Failed to create Consumer: %s", err.Error())
}
messages, err := receiver.ReceiveMessages(context.TODO(), 10, nil)
if err != nil {
log.Fatalf("Failed to receive Messages: %s", err.Error())
}
for _, message := range messages {
body, err := message.Body()
if err != nil {
log.Fatalf("Failed to parse message body: %s", err.Error())
}
fmt.Println("Message --->", string(body))
err = receiver.CompleteMessage(context.TODO(), message)
if err != nil {
log.Fatalf("Failed to complete message: %s", err.Error())
}
Presumably continually requesting the messages (during potentially long periods of no messages) isn't the most efficient way of listening?
I've considered implementing some form of static/dynamic waiting, but is there anything more 'out the box' and/or best practice?
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
.package main
import (
"context"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func main() {
connectionString := "AzureServiceBusConnectionString "
client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
if err != nil {
panic(err)
}
queueName := "AzureServiceBusQueuename"
receiver, err := client.NewReceiverForQueue(queueName, nil)
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
defer cancel()
messages, err := receiver.ReceiveMessages(ctx, 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
// The message body is a []byte. For this example, we assume it's a string.
var body []byte = message.Body
fmt.Printf("Message received with body: %s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
// Handle completion error as needed
panic(err)
}
fmt.Printf("Received and completed the message\n")
}
client.Close(context.TODO())
}
To continually receive messages from an Azure Service Bus subscription/queue:
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func main() {
// Replace 'your_connection_string' with the actual Service Bus connection string
connectionString := "your_connection_string"
client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
if err != nil {
panic(err)
}
queueName := "your_queue_name"
// Create a receiver for the specified queue
receiver, err := client.NewReceiverForQueue(queueName, nil)
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
// Use a signal handler to gracefully stop the application
stopSignal := make(chan os.Signal, 1)
signal.Notify(stopSignal, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-stopSignal
fmt.Println("Received termination signal. Stopping gracefully...")
cancel()
}()
messageLoop:
for {
select {
case <-ctx.Done():
fmt.Println("Context canceled. Exiting the message receiving loop.")
break messageLoop
default:
messages, err := receiver.ReceiveMessages(ctx, 1, nil)
if err != nil {
// Handle receive error as needed
fmt.Printf("Error receiving messages: %v\n", err)
continue
}
for _, message := range messages {
var body []byte = message.Body
fmt.Printf("Message received with body: %s\n", string(body))
err = receiver.CompleteMessage(ctx, message, nil)
if err != nil {
fmt.Printf("Error completing message: %v\n", err)
}
fmt.Printf("Received and completed the message\n")
}
time.Sleep(1 * time.Second)
}
}
client.Close(ctx)
}