Search code examples
pythonazureserializationoffsetazure-eventhub

How can I store azure.eventhub.common.Offset in Python?


According to official documentation for Azure event hubs, it is the consumer's responsibility to manage offsets. Quoting:

Consumers are responsible for storing their own offset values outside of the Event Hubs service.

But looking at the API doc for event hub Offset class, it is immediately obvious that it offers no way to be serialized or otherwise stored.

So my question is: how would I go about storing event hub Offsets?


Solution

  • Please carefully refer to the source code of common.py of GitHub repo Azure/azure-event-hubs-python and the Offset class is defined at the line 253 as below.

    class Offset(object):
        """
        The offset (position or timestamp) where a receiver starts. Examples:
        Beginning of the event stream:
          >>> offset = Offset("-1")
        End of the event stream:
          >>> offset = Offset("@latest")
        Events after the specified offset:
          >>> offset = Offset("12345")
        Events from the specified offset:
          >>> offset = Offset("12345", True)
        Events after a datetime:
          >>> offset = Offset(datetime.datetime.utcnow())
        Events after a specific sequence number:
          >>> offset = Offset(1506968696002)
        """
    
        def __init__(self, value, inclusive=False):
            """
            Initialize Offset.
            :param value: The offset value.
            :type value: ~datetime.datetime or int or str
            :param inclusive: Whether to include the supplied value as the start point.
            :type inclusive: bool
            """
            self.value = value
            self.inclusive = inclusive
    
        def selector(self):
            """
            Creates a selector expression of the offset.
            :rtype: bytes
            """
            operator = ">=" if self.inclusive else ">"
            if isinstance(self.value, datetime.datetime):
                timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
                return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
            if isinstance(self.value, six.integer_types):
                return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
            return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')
    

    According to the source code of Offset class, it's just a normal Python class with two properties value and inclusive. You can simply store the values of its properties as json string or others, or just extract these values, as my sample code below.

    from azure.eventhub.common import Offset
    offset = Offset("-1")
    print(offset.value, offset.inclusive)
    # -1 False
    print(offset.__dict__)
    # {'value': '-1', 'inclusive': False}
    import json
    offset_json = json.dumps(offset.__dict__)
    # '{"value": "-1", "inclusive": false}'
    

    Note: In the future, GitHub repo Azure/azure-event-hubs-python will completed move to GitHub repo Azure/azure-sdk-for-python, the changes for Offset class is renamed as EventPosition class with the same properties value and inclusive.