Search code examples
javaapache-kafkaapache-flinkflink-streaming

Consuming Kafka fails due to "Timeout expired while fetching topic metadata"


I'm trying to consume events using Apache Flink. The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.util.Collector;
import java.util.Properties;

public class KafkaStreaming {

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "kafka servers:9092...");
    props.setProperty("zookeeper.connect", "kafka servers:2181...");
    props.setProperty("group.id", "flinkPOC");
    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

    DataStream<String> dataStream = env.addSource(consumer);

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());
    wordDataStream.print();
    env.execute("Word Split");

}

public static class Splitter implements FlatMapFunction<String, String> {

    public void flatMap(String sentence, Collector<String> out) throws Exception {

        for (String word : sentence.split(" ")) {
            out.collect(word);
        }
    }

}
}

The app does not print anything to the screen (although I produced events to Kafka). I tried to skip the Splitter FlatMap function, but still nothing happens. SSL is not required from Kafka. When I submitted the job to the cluster I found in the logs timeout exception:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

I am really not sure what am I doing wrong:(


Solution

  • The problem is clearly in the Flink version for Kafka 9 since the same code runs fine when using Kafka 2. So I am afraid there is no solution for Kafka 09.