Search code examples
azuregoazureservicebus

Continually receiving messages from Azure Service Bus subscription/queue


I want to have a standard function to continually listen/receive messages from an Azure Service Bus queue. I want a fairly efficient way we can handle the scenario that no messages are on the subscription/topic - rather than constantly hammering requests in a for loop.

Currently the ReceiveMessages() function is fairly limited in comparison to other language SDKs in terms of the receive options. There is no MaxWaitTime so to handle the case of receiving no messages, you simply have to continually retry in some form of loop suggested here - How do I keep my consumer listening the messages on Azure Sevice Bus using Azure sdk for Golang v0.3.1?

    client, err := azservicebus.NewClientFromConnectionString("Connection String", nil)
    
    if err != nil {
        log.Fatalf("Failed to create Service Bus Client: %s", err.Error())
    }

    receiver, err := client.NewReceiverForQueue("queue", nil)

    if err != nil {
        log.Fatalf("Failed to create Consumer: %s", err.Error())
    }

    messages, err := receiver.ReceiveMessages(context.TODO(), 10, nil)

    if err != nil {
        log.Fatalf("Failed to receive Messages: %s", err.Error())
    }

    for _, message := range messages {

        body, err := message.Body()

        if err != nil {
            log.Fatalf("Failed to parse message body: %s", err.Error())
        }

        fmt.Println("Message --->", string(body))

        err = receiver.CompleteMessage(context.TODO(), message)

        if err != nil {
            log.Fatalf("Failed to complete message: %s", err.Error())
        }

Presumably continually requesting the messages (during potentially long periods of no messages) isn't the most efficient way of listening?

I've considered implementing some form of static/dynamic waiting, but is there anything more 'out the box' and/or best practice?


Solution

    • Using this reference MSDOC, you can receive messages from Azure Service Bus.
    • The code below connects to Azure Service Bus and receives messages from a queue.
    • To install, run go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus.
    package main
    
    import (
        "context"
        "fmt"
        "time"
    
        "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
    )
    
    func main() {
        
        connectionString := "AzureServiceBusConnectionString "
    
        
        client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
        if err != nil {
            panic(err)
        }
    
        
        queueName := "AzureServiceBusQueuename"
    
        
        receiver, err := client.NewReceiverForQueue(queueName, nil)
        if err != nil {
            panic(err)
        }
    
        
        ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
        defer cancel()
    
    
        messages, err := receiver.ReceiveMessages(ctx, 1, nil)
        if err != nil {
            panic(err)
        }
    
        for _, message := range messages {
            // The message body is a []byte. For this example, we assume it's a string.
            var body []byte = message.Body
            fmt.Printf("Message received with body: %s\n", string(body))
    
            
            err = receiver.CompleteMessage(context.TODO(), message, nil)
            if err != nil {
                // Handle completion error as needed
                panic(err)
            }
    
            fmt.Printf("Received and completed the message\n")
        }
    
        
        client.Close(context.TODO())
    }
    

    To continually receive messages from an Azure Service Bus subscription/queue:

    • The code below receives messages from an Azure Service, sets up a signal handler to capture termination signals, and uses a context to control the message-receiving loop.
    package main
    
    import (
        "context"
        "fmt"
        "os"
        "os/signal"
        "syscall"
        "time"
    
        "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
    )
    
    func main() {
        // Replace 'your_connection_string' with the actual Service Bus connection string
        connectionString := "your_connection_string"
    
        
        client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
        if err != nil {
            panic(err)
        }
    
        
        queueName := "your_queue_name"
    
        // Create a receiver for the specified queue
        receiver, err := client.NewReceiverForQueue(queueName, nil)
        if err != nil {
            panic(err)
        }
    
    
        ctx, cancel := context.WithCancel(context.Background())
    
        // Use a signal handler to gracefully stop the application
        stopSignal := make(chan os.Signal, 1)
        signal.Notify(stopSignal, syscall.SIGINT, syscall.SIGTERM)
    
        go func() {
            
            <-stopSignal
            fmt.Println("Received termination signal. Stopping gracefully...")
            cancel()
        }()
    
        
    messageLoop:
        for {
            
            select {
            case <-ctx.Done():
                fmt.Println("Context canceled. Exiting the message receiving loop.")
                break messageLoop
            default:
                
                messages, err := receiver.ReceiveMessages(ctx, 1, nil)
                if err != nil {
                    // Handle receive error as needed
                    fmt.Printf("Error receiving messages: %v\n", err)
                    continue
                }
    
                for _, message := range messages {
                    
                    var body []byte = message.Body
                    fmt.Printf("Message received with body: %s\n", string(body))
    
                
                    err = receiver.CompleteMessage(ctx, message, nil)
                    if err != nil {
                        
                        fmt.Printf("Error completing message: %v\n", err)
                    }
    
                    fmt.Printf("Received and completed the message\n")
                }
    
            
                time.Sleep(1 * time.Second)
            }
        }
    
        
        client.Close(ctx)
    }
    
    
    

    enter image description here

    enter image description here

    enter image description here

    • Refere this for Azure go sdk with Azure servicebus.