Hi all: I've implemented a Flink rule engine based on the very helpful groundwork of https://github.com/brggs/dynamic-flink . I am getting towards using it in production but am now experiencing some of the peculiarities of Flink in this context.
The rules are being published on a Kafka topic and are then broadcast to the all TaskManagers (Broadcast State Pattern).
My first question is: is it possible to control how many TaskManagers (or which ones) subscribe to the topic. What I am noticing is that all TaskManagers (in my test 32) are subscribing but there are only 4 Partitions. So 28 will never get data.
I am using a BroadcastProcessFunction to process the broadcasted rules received: https://github.com/brggs/dynamic-flink/blob/trunk/flink-job/src/main/java/uk/co/brggs/dynamicflink/InputBroadcastProcessFunction.java The rules are received and added to broadcast state.
My second question is around the architecture of such an application. As in the InputBroadcastProcessFunction.java above I wanted to send back a confirmation of receipt and processing of the published rules. But only the receiver of broadcast state knows if the rules are valid. In my current configuration then 32 different confirmations are sent back on a Kafka topic. The same applies to QUERY_STATUS: I get 32 answers with all rules. How it this usually handled? I could (side) output and aggregate or something but was hoping there was some platform native way of doing this.
My first question is: is it possible to control how many TaskManagers (or which ones) subscribe to the topic. What I am noticing is that all TaskManagers (in my test 32) are subscribing but there are only 4 Partitions. So 28 will never get data.
This is the expected behavior with Flink and reading from Kafka. Your number of partitions will be your limiting factor in terms of parallelization. This can result in the following scenarios:
When parallelism > partitioning, each task manager instance will consume from a partition in the topic and the excess task manager instances would be idle. (This is what you are experiencing now). You can improve overall resource usage by adding something like a rebalance()
operation after the source to handle distributing the load across all of the parallel task managers.
When parallelism = partitioning, each task manager instance will consume from a dedicated partition in the topic.
When parallelism < partitioning, each task manager will be assigned a group of partitions that it is responsible for.
I think to fully take advantage of your parallelism, you'd need to either align the number of partitions with your level of parallelization or if that isn't reasonable you can consider performing a rebalance after the source operation to handle redistributing the load across all 32 of your task managers (obviously this could have it's own overhead).
My second question is around the architecture of such an application. As in the InputBroadcastProcessFunction.java above I wanted to send back a confirmation of receipt and processing of the published rules. But only the receiver of broadcast state knows if the rules are valid. In my current configuration then 32 different confirmations are sent back on a Kafka topic. The same applies to QUERY_STATUS: I get 32 answers with all rules. How it this usually handled? I could (side) output and aggregate or something but was hoping there was some platform native way of doing this.
I'm not necessarily sure there's a great way to get around this since every time the broadcast state itself is updated, it's going to trigger the same function call processBroadcastState()
for every parallel task manager instance (so all 32 of them). Since each parallel instance couldn't be aware of the values of the other instances, there's no way to know the "first time" you've seen a given record to handle emitting it (at least not within the context of the broadcast function (e.g. processBroadcastState()
).
I'd imagine that if the context supports it, a side-output could be useful along with a simple window (or your preferred aggregation du-jour) to handle deduping prior to writing to Kafka or depending on how you are consuming it downstream.