Search code examples
pythonfaust

Accessing raw faust/kafka message while using take method


I have a need to batch the events in a Faust stream, so I'm using the take() method. However, I would also like to access the header message, particularly a timestamp.

Ordinarily you would access the header using:

async for event in stream.events()

and then call the header using event.header, but since we're using the take method:

async for event in stream.take(500, 1)

we can't seem to get access to the raw message. Any idea on how to get this? We're just trying to have a means of highlighting a slow section of the pipeline by monitoring timestamps of each as a header, rather than adding it as a part of the value part of the sent message.

Is there another raw timestamp that's 'hidden' but accessible that I've missed?

EDIT

Using faust-streaming==0.8.4 so it's definitely up to date


Solution

  • The Event and Message objects have headers as attributes that can be accessed in a stream. Both events and take utilize EventT objects so you should be able to access them the same way. The only difference is that take and its derivatives unpack EventT objects into dicts inside a buffer list whereas events yields an individual EventT at a time. You can individually access EventT objects if you set your take buffer size to 1.

    There's a function introduced in faust-streaming==0.7.7 called stream.take_with_timestamp that's nearly identical to stream.take that can be utilized via:

    async for event in stream.take_with_timestamp(1, 1, 'timestamp'):
      print(stream.current_event)
      if stream.current_event is not None:
        print(stream.current_event.headers)
        print(stream.current_event.message.headers)
    

    which will show you the timestamp of each event. The caveat here is that if you set the buffer set to anything >1 and your stream times out, your stream.current_event object will be None.

    Or you could just mimic the assignments inside take_with_timestamp and access event.message.timestamp inside stream.events():

    async for event in stream.events():
      print(event.message.timestamp)
      print(event.headers)
      print(event.message.headers)