Am building a wrapper library using apache-flink where I am listening(consuming) from multiple topics and I have a set of applications that want to process the messages from those topics.
Example : I have 10 applications app1, app2, app3 ... app10 (each of them is a java library part of the same on-prem project, ie., all 10 jars are part of same .war file) out of which only 5 are supposed to consume the messages coming to the consumer group. I am able to do filtering for 5 apps with the help of filter function.
The challenge is in the strStream.process(executionServiceInterface)
function, where app1 provides an implementation class for ExceucionServiceInterface
as ExecutionServiceApp1Impl
and similary app2 provides ExecutionServiceApp2Impl
.
when there are multiple implementations available spring wants us to provide @Qualifier
annotation or @Primary
has to be marked on the implementations (ExecutionServiceApp1Impl , ExecutionServiceApp2Impl).
But I don't really want to do this. As am building a generic wrapper library that should support any no of such applications (app1, app2 etc) and all of them should be able to implement their own implementation logic(ExecutionServiceApp1Impl , ExecutionServiceApp2Impl).
Can someone help me here ? how to solve this ?
Below is the code for reference.
@Autowired
private ExceucionServiceInterface executionServiceInterface;
public void init(){
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> consumer = createStringConsumer(topicList, kafkaAddress, kafkaGroup);
if (consumer != null) {
DataStream<String> strStream = environment.addSource(consumer);
strStream.filter(filterFunctionInterface).process(executionServiceInterface);
}
}
public FlinkKafkaConsumer011<String> createStringConsumer(List<String> listOfTopics, String kafkaAddress, String kafkaGroup) throws Exception {
FlinkKafkaConsumer011<String> myConsumer = null;
try {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
myConsumer = new FlinkKafkaConsumer011<>(listOfTopics, new SimpleStringSchema(), props);
} catch(Exception e) {
throw e;
}
return myConsumer;
}
Many thanks in advance!!
Solved this problem by using Reflection, below is the code that solved the issue.
Note : this requires me to know the list of fully qualified classNames and method names along with their parameters.
@Component
public class SampleJobExecutor extends ProcessFunction<String, String> {
@Autowired
MyAppProperties myAppProperties;
@Override
public void processElement(String inputMessage, ProcessFunction<String, String>.Context context,
Collector<String> collector) throws Exception {
String className = null;
String methodName = null;
try {
Map<String, List<String>> map = myAppProperties.getMapOfImplementors();
JSONObject json = new JSONObject(inputMessage);
if (json != null && json.has("appName")) {
className = map.get(json.getString("appName")).get(0);
methodName = map.get(json.getString("appName")).get(1);
}
Class<?> forName = Class.forName(className);
Object job = forName.newInstance();
Method method = forName.getDeclaredMethod(methodName, String.class);
method.invoke(job , inputMessage);
} catch (Exception e) {
e.printStackTrace();
}
}