Search code examples
pythonrabbitmqqpid-proton

Receive from specific offset with Qpid Proton + RabbitMQ Stream Queue (Core)


I want to implement a python based receiver app with Qpid proton library to consume messages from a RabbitMQ Stream Queue over AMQP 1.0. For a proper implementation I need to track the offset of consumed messages. When restarting the receiver I want to continue receiving from the last consumed offset instead of the lastest messages.

How can I pass the offset number to the proton receiver so that it only reads from that offset? I don't find related documentation about any approach achieving this.

with the following attempts, no offset filtering was applied: receiver.source.filter.put_dict({'x-stream-offset': ulong(1000)'} receiver.source.filter.put_dict({'rabbitmq:stream-offset-spec': ulong(1000)'} event.container.create_receiver(conn, "examples", options=Selector(u"x-stream-offset = '1000'"))


Solution

  • The solution is to import symbol and use symbol("rabbitmq:stream-offset-spec"):

    sample extract:

    from proton import symbol
    from proton.reactor import Filter
    
    offset = 123
    
    offset_filter = Filter(filter_set={symbol("rabbitmq:stream-offset-spec"): ulong(int(offset))})
    receiver = conn.create_receiver(address=self.address, options=offset_filter)