Search code examples
apache-kafkaapache-flinkflink-streaming

Apache Flink Stream to send messages to different sinks based on condition


I have a list of topics and I need to send messages to one of those topics, but based on some condition.

Assume that, producer is configured with a set of topics(lets say around 10 topics), Can we do something like below in Apache-flink ?

if(condition1){
   send message to topic 1
} else if(condition2){
   send message to topic 2
}
//and so on

Note that I have one single source of messages(ie., single String-stream) Below is the code that I am currently using.

                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                CustomSourceFunction source = CustomSourceFunction.getInstance();
                DataStream<String> stringStream = env.addSource(source);
                FlinkKafkaProducer011<String> producer = getProducer();
                stringStream.addSink(producer);
                env.execute();

Is there a way to configure a no of sinks based on some conditions in apache-flink ? Also, how many sinks can be added to the same stream ?


Solution

  • I'm not sure if there is any limits for number of sinks.

    Since Flink 14 there is .setTopicSelector((element) -> {<your-topic-selection-logic>}) method in kafka Serializer but your code looks like older version. I think there is many ways to achieve your goal, one of them to apply your topic selection logic in ProducerRecord during serialization.

        /**
         * Create a record with no key
         * 
         * @param topic The topic this record should be sent to
         * @param value The record contents
         */
        public ProducerRecord(String topic, V value) {
            this(topic, null, null, null, value, null);
        }
    

    Or maybe it will be easier to just filter your stream and create a specific producers/sinks for different cases. Also maybe side outputs will work for you.