I have a list of topics and I need to send messages to one of those topics, but based on some condition.
Assume that, producer is configured with a set of topics(lets say around 10 topics), Can we do something like below in Apache-flink ?
if(condition1){
send message to topic 1
} else if(condition2){
send message to topic 2
}
//and so on
Note that I have one single source of messages(ie., single String-stream) Below is the code that I am currently using.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
CustomSourceFunction source = CustomSourceFunction.getInstance();
DataStream<String> stringStream = env.addSource(source);
FlinkKafkaProducer011<String> producer = getProducer();
stringStream.addSink(producer);
env.execute();
Is there a way to configure a no of sinks based on some conditions in apache-flink ? Also, how many sinks can be added to the same stream ?
I'm not sure if there is any limits for number of sinks.
Since Flink 14 there is .setTopicSelector((element) -> {<your-topic-selection-logic>})
method in kafka Serializer but your code looks like older version. I think there is many ways to achieve your goal, one of them to
apply your topic selection logic in ProducerRecord
during serialization.
/**
* Create a record with no key
*
* @param topic The topic this record should be sent to
* @param value The record contents
*/
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}
Or maybe it will be easier to just filter your stream and create a specific producers/sinks for different cases. Also maybe side outputs will work for you.