Search code examples
streamingcluster-computingapache-kafkaapache-flink

Apache Flink streaming in cluster does not split jobs with workers


My objective is to setup a high throughput cluster using Kafka as source & Flink as the stream processing engine. Here's what I have done.

I have setup a 2-node cluster the following configuration on the master and the workers.

Master flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 256

taskmanager.heap.mb: 512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

Worker flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 512 #256

taskmanager.heap.mb: 1024 #512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

The slaves file on the Master node looks like this:

<WORKER_IP_ADDR>
localhost

The flink setup on both nodes is in a folder which has the same name. I start up the cluster on the master by running

bin/start-cluster-streaming.sh

This starts up the task manager on the Worker node.

My input source is Kafka. Here is the snippet.

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> stream = 
    env.addSource(
    new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);

env.execute("Kafka stream");

Here is my Sink function

public class MySink implements SinkFunction<String> {

    private static final long serialVersionUID = 1L;

    public void invoke(String arg0) throws Exception {
        processMessage(arg0);
        System.out.println("Processed Message");
    }
}

Here are the Flink Dependencies in my pom.xml.

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>0.9.0</version>
</dependency>

Then I run the packaged jar with this command on the master

bin/flink run flink-test-jar-with-dependencies.jar

However when I insert messages into the Kafka topic I am able to account for all messages coming in from my Kafka topic (via debug messages in the invoke method of my SinkFunction implementation) on the Master node alone.

In the Job manager UI I am able to see 2 Task managers as below: Job Manager dashboard - task managers

Also The dashboard looks like so : enter image description here Questions:

  1. Why are the worker nodes not getting the tasks?
  2. Am I missing some configuration?

Solution

  • When reading from a Kafka source in Flink, the maximum degree of parallelism for the source task is limited by the number of partitions of a given Kafka topic. A Kafka partition is the smallest unit which can be consumed by a source task in Flink. If there are more partitions than source tasks, then some tasks will consume multiple partitions.

    Consequently, in order to supply input to all of your 100 tasks, you should assure that your Kafka topic has at least 100 partitions.

    If you cannot change the number of partitions of your topic, then it is also possible to initially read from Kafka using a lower degree of parallelism using the setParallelism method. Alternatively, you can use the rebalance method which will shuffle your data across all available tasks of the preceding operation.