Am building a wrapper library using apache-flink where I am listening(consuming) from multiple topics and I have a set of applications that want to process the messages from those topics.
Example : I have 10 applications app1, app2, app3 ... app10 (each of them is a java library part of the same on-prem project, ie., all 10 jars are part of same .war file) out of which only 5 are supposed to consume the messages coming to the consumer group. I am able to do filtering for 5 apps with the help of filter function.
The challenge is in the strStream.process(executionServiceInterface)
function, where app1 provides an implementation class for ExceucionServiceInterface
as ExecutionServiceApp1Impl
and similary app2 provides ExecutionServiceApp2Impl
when there are multiple implementations available spring wants us to provide @Qualifier
annotation or @Primary
has to be marked on the implementations (ExecutionServiceApp1Impl , ExecutionServiceApp2Impl).
But I don't really want to do this. As am building a generic wrapper library that should support any no of such applications (app1, app2 etc) and all of them should be able to implement their own implementation logic(ExecutionServiceApp1Impl , ExecutionServiceApp2Impl).
Can someone help me here ? how to solve this ?
Below is the code for reference.
private ExceucionServiceInterface executionServiceInterface;
public void init(){
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> consumer = createStringConsumer(topicList, kafkaAddress, kafkaGroup);
if (consumer != null) {
DataStream<String> strStream = environment.addSource(consumer);
public FlinkKafkaConsumer011<String> createStringConsumer(List<String> listOfTopics, String kafkaAddress, String kafkaGroup) throws Exception {
FlinkKafkaConsumer011<String> myConsumer = null;
try {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("", kafkaGroup);
myConsumer = new FlinkKafkaConsumer011<>(listOfTopics, new SimpleStringSchema(), props);
} catch(Exception e) {
throw e;
return myConsumer;
Many thanks in advance!!
Solved this problem by using Reflection, below is the code that solved the issue.
Note : this requires me to know the list of fully qualified classNames and method names along with their parameters.
public class SampleJobExecutor extends ProcessFunction<String, String> {
MyAppProperties myAppProperties;
public void processElement(String inputMessage, ProcessFunction<String, String>.Context context,
Collector<String> collector) throws Exception {
String className = null;
String methodName = null;
try {
Map<String, List<String>> map = myAppProperties.getMapOfImplementors();
JSONObject json = new JSONObject(inputMessage);
if (json != null && json.has("appName")) {
className = map.get(json.getString("appName")).get(0);
methodName = map.get(json.getString("appName")).get(1);
Class<?> forName = Class.forName(className);
Object job = forName.newInstance();
Method method = forName.getDeclaredMethod(methodName, String.class);
method.invoke(job , inputMessage);
} catch (Exception e) {