Search code examples
javaspring-bootdesign-patternsapache-flinkflink-streaming

Apache Flink Kafka Stream Processing based on conditions


Am building a wrapper library using apache-flink where I am listening(consuming) from multiple topics and I have a set of applications that want to process the messages from those topics.

Example : I have 10 applications app1, app2, app3 ... app10 (each of them is a java library part of the same on-prem project, ie., all 10 jars are part of same .war file) out of which only 5 are supposed to consume the messages coming to the consumer group. I am able to do filtering for 5 apps with the help of filter function.

The challenge is in the strStream.process(executionServiceInterface) function, where app1 provides an implementation class for ExceucionServiceInterface as ExecutionServiceApp1Impl and similary app2 provides ExecutionServiceApp2Impl.

when there are multiple implementations available spring wants us to provide @Qualifier annotation or @Primary has to be marked on the implementations (ExecutionServiceApp1Impl , ExecutionServiceApp2Impl).

But I don't really want to do this. As am building a generic wrapper library that should support any no of such applications (app1, app2 etc) and all of them should be able to implement their own implementation logic(ExecutionServiceApp1Impl , ExecutionServiceApp2Impl).

Can someone help me here ? how to solve this ?

Below is the code for reference.

@Autowired
private ExceucionServiceInterface executionServiceInterface;

public void init(){
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    FlinkKafkaConsumer011<String> consumer = createStringConsumer(topicList, kafkaAddress, kafkaGroup);
    if (consumer != null) {
        DataStream<String> strStream = environment.addSource(consumer);
        strStream.filter(filterFunctionInterface).process(executionServiceInterface);
    }
}

public FlinkKafkaConsumer011<String> createStringConsumer(List<String> listOfTopics, String kafkaAddress, String kafkaGroup) throws Exception {
        FlinkKafkaConsumer011<String> myConsumer = null;
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", kafkaAddress);
            props.setProperty("group.id", kafkaGroup);
            myConsumer = new FlinkKafkaConsumer011<>(listOfTopics, new SimpleStringSchema(), props);
        } catch(Exception e) {
            throw e;
        }
        return myConsumer;
    }

Many thanks in advance!!


Solution

  • Solved this problem by using Reflection, below is the code that solved the issue.

    Note : this requires me to know the list of fully qualified classNames and method names along with their parameters.

    @Component
    public class SampleJobExecutor extends ProcessFunction<String, String> {
    
    @Autowired
    MyAppProperties myAppProperties;
    
        @Override
        public void processElement(String inputMessage, ProcessFunction<String, String>.Context context,
                Collector<String> collector) throws Exception {
            
            String className = null;
            String methodName = null;
            try {   
                
                Map<String, List<String>> map = myAppProperties.getMapOfImplementors();
                
                JSONObject json = new JSONObject(inputMessage);
                if (json != null && json.has("appName")) {
                    className = map.get(json.getString("appName")).get(0);
                    methodName = map.get(json.getString("appName")).get(1);
                }
                
                
                Class<?> forName = Class.forName(className);
                Object job = forName.newInstance();
                Method method = forName.getDeclaredMethod(methodName, String.class);
                method.invoke(job , inputMessage);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }