Search code examples
pythonapache-flinkperformance-testingapache-beam

How to measure element processing time by apache beam pipeline?


I have a beam pipeline coded in python and running on Flink, which reads from kafka topic, applies few Do.Fn transformations and writes back to another kafka topic. What would be a reasonable approach to measure end-to-end processing time of one element in apache beam pipeline?

Does it make sense apply MeasureTime(beam.DoFn) on the first and last transformation and count the difference or there's a better way.

https://github.com/apache/beam/blob/1988284a89b10b60eea48325f8a3b370b551c77c/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py#L406

Thanks


Solution

  • This could work, but note that for a streaming pipeline the difference in timestamps would become dwarfed by the runtime of the pipeline itself.

    Another option would be to do something like emitting (identifier(original_element), timestamp) both before and after to separate PCollections, do a streaming join on (identifier(original_element), and publish a distribution of the differences. (This computation would likely be more expensive than the original read/mutate/write, unless you did some sampling).