Search code examples
azuregoazure-iot-hub

How to Consume the Latest Event with Azure Event Hubs Go SDK (azeventhubs)?


I'm in the process of migrating from azure-event-hubs-go/v3 to the newer azeventhubs Go SDK. In the older SDK, there was a ReceiveOption argument that allowed me to specify from where to start consuming events.

In the new SDK, I'm using the following code to initialize the processor:

processor, err := azeventhubs.NewProcessor(
    e.ConsumerClient, 
    checkpointStore, 
    &azeventhubs.ProcessorOptions{
        UpdateInterval: time.Second, 
        Prefetch: 0, 
        StartPositions: azeventhubs.StartPositions{
            Default: azeventhubs.StartPosition{
                Latest: to.Ptr(true), 
                EnqueuedTime: to.Ptr(time.Now()), 
                Inclusive: true
            }
        }
    }
)

However, I've noticed that the events are being consumed from the last checkpoint rather than from the most recently sent events.

What I've Tried: I've experimented with both ConsumingEventsUsingConsumerClient and ConsumingEventsWithCheckpoints examples, but they both behave the same way, consuming events from the last checkpoint rather than the most recent events.

What I'm Expecting: I want the processor to start consuming the latest events sent from the device, which sends a message every second. How can I achieve this behavior using azeventhubs Go SDK?


Solution

  • I initially struggled to grasp the underlying mechanisms of AMQP. However, I am pleased to report that the issue has been successfully resolved.

    var wg sync.WaitGroup
    wg.Add(1)
    
    for _, partition := range p.PartitionIDs {
        go func(partition string) {
            defer wg.Done()
    
            partitionClient, err := consumerClient.NewPartitionClient(partition, nil)
            if err != nil {
                panic(err)
            }
    
            receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
            defer cancel()
    
            for {
                events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)
    
                if err != nil && !errors.Is(err, context.DeadlineExceeded) {
                    panic(err)
                }
    
                for _, evt := range events {
                    fmt.Printf("partition: %s\n", partition)
                    fmt.Printf("Body: %s\n", string(evt.Body))
                }
            }
        }(partition)
    }
    
    wg.Wait()
    

    I extend my gratitude to the Azure customer support services team for their invaluable assistance.