Search code examples
apache-flinkpyflinkamazon-kinesis-analytics

Apache flink seems buffering events before processing and streaming to the sink


I was wondering if someone can provide some insights on this. We are building a low latency processor using flink and we use kinesis streams and the managed apache flink in AWS to do that.

We are using the python datastream API and it seems that there is some buffering of events happening in flink that adds a few seconds of latency. Based on the flink documentation, STREAMING mode should process the events immediately and we are not using any window functions. So we are a bit lost as to why the events are not processed the streamed to the sink immediately.

This is a sample log that records the eventtime in flink by injecting a time event via a map operator on the event.
event1 - data event_time=2023-10-05T07:28:18.985605, latency=6505, flink_received_latency=1184, flink_to_kcl_latency=5321
event2 - data event_time=2023-10-05T07:28:20.046061, latency=5444, flink_received_latency=123, flink_to_kcl_latency=5321
event3 - data event_time=2023-10-05T07:28:21.102405, latency=4388, flink_received_latency=1066, flink_to_kcl_latency=3322
event4 - data event_time=2023-10-05T07:28:22.166583, latency=3324, flink_received_latency=2003, flink_to_kcl_latency=1321
  • event_time - is the event time when the kinesis record added via a client script.
  • flink_received_latency - is the latency between the flink event time (added via a map operator) and data event.
  • flink_to_kcl_latency - is the latency between the flink event time (added via a map operator) and the time it took for it to be sent to sink and read by a kcl app.
  • latency - overall latency

A full sample script is in github for your reference - https://github.com/jp6rt/pyflink1-15-kinesis-latency/tree/main/app

We used the enhanced fanout as well but it didn't help with the latency.

Setting the max batch size on the kinesis sink seems to help reduce the latency to around (1.5 - 3 seconds) but this is still not enough in our use case.


Solution

  • The latency was resolved with these two configuration updates:

    Source-> Set the desired value for python.fn-execution.bundle.time (default is 1000ms)

    Sets the waiting timeout(in milliseconds) before processing a bundle for Python user-defined function execution. The timeout defines how long the elements of a bundle will be buffered before being processed. Lower timeouts lead to lower tail latencies, but may affect throughput.

    https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/python_config/

    Sink-> Set the max time in buffer to the desired value.

    maxTimeInBufferMS - the maximum amount of time an element may remain in the buffer. In most cases elements are flushed as a result of the batch size (in bytes or number) being reached or during a snapshot. However, there are scenarios where an element may remain in the buffer forever or a long period of time. To mitigate this, a timer is constantly active in the buffer such that: while the buffer is not empty, it will flush every maxTimeInBufferMS milliseconds.

    https://nightlies.apache.org/flink/flink-docs-master/api/java//index.html?org/apache/flink/connector/base/sink/AsyncSinkBaseBuilder.html