According to official documentation for Azure event hubs, it is the consumer's responsibility to manage offsets. Quoting:
Consumers are responsible for storing their own offset values outside of the Event Hubs service.
But looking at the API doc for event hub Offset class, it is immediately obvious that it offers no way to be serialized or otherwise stored.
So my question is: how would I go about storing event hub Offsets?
Please carefully refer to the source code of common.py
of GitHub repo Azure/azure-event-hubs-python
and the Offset
class is defined at the line 253 as below.
class Offset(object):
"""
The offset (position or timestamp) where a receiver starts. Examples:
Beginning of the event stream:
>>> offset = Offset("-1")
End of the event stream:
>>> offset = Offset("@latest")
Events after the specified offset:
>>> offset = Offset("12345")
Events from the specified offset:
>>> offset = Offset("12345", True)
Events after a datetime:
>>> offset = Offset(datetime.datetime.utcnow())
Events after a specific sequence number:
>>> offset = Offset(1506968696002)
"""
def __init__(self, value, inclusive=False):
"""
Initialize Offset.
:param value: The offset value.
:type value: ~datetime.datetime or int or str
:param inclusive: Whether to include the supplied value as the start point.
:type inclusive: bool
"""
self.value = value
self.inclusive = inclusive
def selector(self):
"""
Creates a selector expression of the offset.
:rtype: bytes
"""
operator = ">=" if self.inclusive else ">"
if isinstance(self.value, datetime.datetime):
timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
if isinstance(self.value, six.integer_types):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')
According to the source code of Offset
class, it's just a normal Python class with two properties value
and inclusive
. You can simply store the values of its properties as json string or others, or just extract these values, as my sample code below.
from azure.eventhub.common import Offset
offset = Offset("-1")
print(offset.value, offset.inclusive)
# -1 False
print(offset.__dict__)
# {'value': '-1', 'inclusive': False}
import json
offset_json = json.dumps(offset.__dict__)
# '{"value": "-1", "inclusive": false}'
Note: In the future, GitHub repo Azure/azure-event-hubs-python
will completed move to GitHub repo Azure/azure-sdk-for-python
, the changes for Offset
class is renamed as EventPosition
class with the same properties value
and inclusive
.