Search code examples
javaspringspring-bootapache-flinkcommand-line-arguments

Using Spring with Apache Flink - Command line arguments are not available to Spring


Background

I have a Spring Boot application which I am trying to run on an Apache Flink cluster. This is a standard Spring Boot application with a main class annotated using @SpringBootApplication (Code snippet added later in this question).

When I tried launching my Spring Boot application on a standalone Apache Flink cluster using the following command, I immediately started facing issues with bean autowiring. Flink was unable to desearlize the beans that have been Autowired.

./bin/flink run examples/streaming/my-springboot-function.jar

On further research, I found that the flink functions are executed on TaskExecutors and when this happens, Flink would first seralize the function and then deserialize it on a TaskExecutor. At this point, the dependencies would come out as null and not automatically be autowired.

To solve this issue, I simply created my own custom Spring Context and called it in the open method of my function and the beans autowiring was triggered as expected.

But the current issue that I am facing now is that Spring Boot is unable to map the command line arguments to the @Value annotation marked fields.

Code and details

The custom Spring Context :

public class CustomSpringContext {

    private transient final ConcurrentHashMap<String, ApplicationContext> springContext;

    public CustomSpringContext() {
        springContext = new ConcurrentHashMap<>();
    }

    public ApplicationContext getContext(String configurationPackageName) {
        return springContext.computeIfAbsent(configurationPackageName, AnnotationConfigApplicationContext::new);
    }

    public <T> T autowiredBean(T bean, String configurationPackageName) {
        AnnotationConfigApplicationContext context = getContext(configurationPackageName);
        AutowireCapableBeanFactory factory = context.getAutowireCapableBeanFactory();
        factory.autowireBean(bean);
        return bean;
    }
}

The Flink function : Notice the open method

@Component
public class TransformFunction extends RichMapFunction<AuditRecord, String> implements Serializable {

    @Autowired
    private transient CacheDao cacheDao;

    @Autowired
    private transient ProcessController processController;

    public String map(AuditRecord auditRecord) throws Exception {

        //Some business logic
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //Autowire the dependent beans in this class 
        new CustomSpringContext().autowiredBean(this, "com.mypackage");
    }
}

This solved my first issue which was the autowiring of beans after the function is deserialized on the TaskExecutor of Flink.

But I am now running into an issue such that if I pass any command line arguments to my program, they are not being injected into the properties marked with @Value. In a typical Spring Boot application, properties marked as @Value can be populated via command line as well and not necessarily from the applicaiton.properites. But that doesn't seem to be the case here.

For example, I have the following configuration class in my application. It uses @Value :

@Configuration
@ComponentScan(basePackages = "com.mypackage")
public class TaskApplicationConfig {

    @Value("commandLineArgument");
    private String commandLineArgument;

    //other bean configurations etc

}

Spring is unable to find commandLineArgument at runtime even though I pass it as a program input. I get the following exception at run time :

java.lang.IllegalArgumentException: Could not resolve placeholder 'commandLineArgument' in string value [${commandLineArgument}]

I did some digging and know why this happens. In a typical Spring Boot Application, we implement the ApplicationRunner interface in our main class and override the run method. This run method injects the command line arguments into the context which are then accessible via @Value annotations. Example :

@SpringBootApplication
public class TaskApplication implements ApplicationRunner {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(TaskApplication .class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        //call some business logic
    }
}

But since the Flink function (TransformFunction) is creating a custom spring context, there is no application runner or a a callback run method. In turn, the fields marked as @Value are not being mapped with command line arguments.

How do I solve this issue?


Solution

  • I was able to solve this issue myself by annotating my function itself with @SpringBootApplication. This way, when the funciton runs in a Flink TaskExecutor, the TaskExecutor would end up reloading all the beans and autowiring them.

    Code changes that I made :

    @SpringBootApplication
    public class TransformFunction extends RichMapFunction<AuditRecord, String> implements Serializable {
    
        @Autowired
        private transient CacheDao cacheDao;
    
        @Autowired
        private transient ProcessController processController;
    
        public String map(AuditRecord auditRecord) throws Exception {
    
            //Some business logic
        }
        
    }
    

    With this change, I no longer needed the CustomSpringContext class and no longer needed to load the beans manually in the open method of theTransformFunction

    The Apache Flink pipeline would then be created as follows :

    env.fromCollection(inputRecprds).setParallelism(1)
                        .map(new TransformFunction()).setParallelism(30).print().setParallelism(30);