Search code examples
pythonfaust

Faust streaming agent slow to 'wake up' after idle period


I have a pipeline where a message is produced to topic A, which is processed by a stream processor and sends enriched data to topic B. Topic B is consumed by 3 other stream processors which independently perform a small part of the calculation (to reduce the load on a single processor) and forward their enriched data onto a new topic. The final processor reads from all 3 new topics and send this data on to web clients via web sockets. It all works well but if the system sits idle for 30 minutes or so with no new messages it can sometimes take up to 10 seconds to get to the end of the pipeline. When operating normally this time has been in the order of 10-20ms.

Every stream processor uses tables to refer to previous data and determine how to enrich going forward, so I'm not sure whether accessing this table slows if there's no need to access it over time?

If so, it seems a silly workaround, but it might be possible to use a timer to send a dummy dataset to trigger each worker to stay alive and alert.

Below is a print output of the time difference from the message initiation to the arrival time at the end of the pipeline:

[2022-05-23 08:52:46,445] [10340] [WARNING] 0:00:00.017999
[2022-05-23 08:53:03,469] [10340] [WARNING] 0:00:00.025995
[2022-05-23 09:09:46,774] [10340] [WARNING] 0:00:06.179146

I wonder whether using any of the settings available to either brokers or agents noted on this page will be of use here? If anyone knows, please let me know.

UPDATE

So I ran tests where i use the @app.time option to send a dummy/test message through the entire pipeline every second and never had an instance of slow send times. I also updated the way things work to directly talk to the app using the @app.page() decorator rather than a FastAPI endpoint to try send to the topic and this did mean I never saw a delay greater than 2 seconds. But the same thing did still happen where if it sat idle for a while then received a new message it took almost exactly 2 seconds (plus change) to do it's thing. This really starts to look like an agent throttles it's poll or kafka throttles an agent's connection if the throughput is low.


Solution

  • I believe the issue was the fact that the main API request server was not set up as a true faust application, but rather as a FastAPI application so I was creating a faust App object when first running the server, then sending via that on an API request so it probably needed to go through some connection process every now and then which would be triggered by some request that needed to send to the kafka service. The delay would be triggered on some random request after a defined period of time. The solution is either to send the request directly to the faust application ahndling this type of data, or set up a true faust application to handle all API requests.