Search code examples
javaapache-kafkaapache-flinkflink-streaming

How to get the processing kafka topic name dynamically in Flink Kafka Consumer?


Currently, I have one Flink Cluster which wants to consume Kafka Topic by one Pattern, By using this way, we don't need to maintain one hard code Kafka topic list.

import java.util.regex.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
...
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
...
FlinkKafkaConsumer010<KafkaMessage> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, deserializerClazz.newInstance(), kafkaConsumerProps);
DataStream<KafkaMessage> input = env.addSource(kafkaConsumer);

I just want to know by using the above way, How can I get to know the real Kafka topic name during the processing? Thanks.

--Update-- The reason why I need to know the topic information is we need this topic name as the parameter to be used in the coming Flink sink part.


Solution

  • You can implement your own custom KafkaDeserializationSchema, like this:

      public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
        @Override
        public boolean isEndOfStream(Tuple2<String, String> nextElement) {
            return false;
        }
    
        @Override
        public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
        }
    
        @Override
        public TypeInformation<Tuple2<String, String>> getProducedType() {
            return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        }
      }
    

    With the custom KafkaDeserializationSchema, you can create DataStream of which the element contains topic infos. In my demo case the element type is Tuple2<String, String>, so you can access the topic name by Tuple2#f0.

    FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
              topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);
    DataStream<Tuple2<String, String>> input = env.addSource(kafkaConsumer);
    
    input.process(new ProcessFunction<Tuple2<String,String>, String>() {
                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                    String topicName = value.f0;
                    // your processing logic here.
                    out.collect(value.f1);
                }
            });