My objective is to setup a high throughput cluster using Kafka as source & Flink as the stream processing engine. Here's what I have done.
I have setup a 2-node cluster the following configuration on the master and the workers.
Master flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Worker flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
The slaves
file on the Master node looks like this:
<WORKER_IP_ADDR>
localhost
The flink setup on both nodes is in a folder which has the same name. I start up the cluster on the master by running
bin/start-cluster-streaming.sh
This starts up the task manager on the Worker node.
My input source is Kafka. Here is the snippet.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
Here is my Sink function
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
}
}
Here are the Flink Dependencies in my pom.xml.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
Then I run the packaged jar with this command on the master
bin/flink run flink-test-jar-with-dependencies.jar
However when I insert messages into the Kafka topic I am able to account for all messages coming in from my Kafka topic (via debug messages in the invoke method of my SinkFunction
implementation) on the Master node alone.
In the Job manager UI I am able to see 2 Task managers as below:
Also The dashboard looks like so : Questions:
When reading from a Kafka source in Flink, the maximum degree of parallelism for the source task is limited by the number of partitions of a given Kafka topic. A Kafka partition is the smallest unit which can be consumed by a source task in Flink. If there are more partitions than source tasks, then some tasks will consume multiple partitions.
Consequently, in order to supply input to all of your 100 tasks, you should assure that your Kafka topic has at least 100 partitions.
If you cannot change the number of partitions of your topic, then it is also possible to initially read from Kafka using a lower degree of parallelism using the setParallelism
method. Alternatively, you can use the rebalance
method which will shuffle your data across all available tasks of the preceding operation.