Background
I have a Spring Boot application which I am trying to run on an Apache Flink cluster. This is a standard Spring Boot application with a main
class annotated using @SpringBootApplication
(Code snippet added later in this question).
When I tried launching my Spring Boot application on a standalone Apache Flink cluster using the following command, I immediately started facing issues with bean autowiring. Flink was unable to desearlize the beans that have been Autowired.
./bin/flink run examples/streaming/my-springboot-function.jar
On further research, I found that the flink functions are executed on TaskExecutors
and when this happens, Flink would first seralize the function and then deserialize it on a TaskExecutor
. At this point, the dependencies would come out as null and not automatically be autowired.
To solve this issue, I simply created my own custom Spring Context and called it in the open
method of my function and the beans autowiring was triggered as expected.
But the current issue that I am facing now is that Spring Boot is unable to map the command line arguments to the @Value
annotation marked fields.
Code and details
The custom Spring Context :
public class CustomSpringContext {
private transient final ConcurrentHashMap<String, ApplicationContext> springContext;
public CustomSpringContext() {
springContext = new ConcurrentHashMap<>();
}
public ApplicationContext getContext(String configurationPackageName) {
return springContext.computeIfAbsent(configurationPackageName, AnnotationConfigApplicationContext::new);
}
public <T> T autowiredBean(T bean, String configurationPackageName) {
AnnotationConfigApplicationContext context = getContext(configurationPackageName);
AutowireCapableBeanFactory factory = context.getAutowireCapableBeanFactory();
factory.autowireBean(bean);
return bean;
}
}
The Flink function : Notice the open
method
@Component
public class TransformFunction extends RichMapFunction<AuditRecord, String> implements Serializable {
@Autowired
private transient CacheDao cacheDao;
@Autowired
private transient ProcessController processController;
public String map(AuditRecord auditRecord) throws Exception {
//Some business logic
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//Autowire the dependent beans in this class
new CustomSpringContext().autowiredBean(this, "com.mypackage");
}
}
This solved my first issue which was the autowiring of beans after the function is deserialized on the TaskExecutor
of Flink.
But I am now running into an issue such that if I pass any command line arguments to my program, they are not being injected into the properties marked with @Value
. In a typical Spring Boot application, properties marked as @Value
can be populated via command line as well and not necessarily from the applicaiton.properites. But that doesn't seem to be the case here.
For example, I have the following configuration class in my application. It uses @Value
:
@Configuration
@ComponentScan(basePackages = "com.mypackage")
public class TaskApplicationConfig {
@Value("commandLineArgument");
private String commandLineArgument;
//other bean configurations etc
}
Spring is unable to find commandLineArgument
at runtime even though I pass it as a program input. I get the following exception at run time :
java.lang.IllegalArgumentException: Could not resolve placeholder 'commandLineArgument' in string value [${commandLineArgument}]
I did some digging and know why this happens. In a typical Spring Boot Application, we implement the ApplicationRunner
interface in our main class and override the run
method. This run
method injects the command line arguments into the context which are then accessible via @Value
annotations. Example :
@SpringBootApplication
public class TaskApplication implements ApplicationRunner {
public static void main(String[] args) throws Exception {
SpringApplication.run(TaskApplication .class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
//call some business logic
}
}
But since the Flink function (TransformFunction
) is creating a custom spring context, there is no application runner or a a callback run method. In turn, the fields marked as @Value
are not being mapped with command line arguments.
How do I solve this issue?
I was able to solve this issue myself by annotating my function itself with @SpringBootApplication
. This way, when the funciton runs in a Flink TaskExecutor
, the TaskExecutor
would end up reloading all the beans and autowiring them.
Code changes that I made :
@SpringBootApplication
public class TransformFunction extends RichMapFunction<AuditRecord, String> implements Serializable {
@Autowired
private transient CacheDao cacheDao;
@Autowired
private transient ProcessController processController;
public String map(AuditRecord auditRecord) throws Exception {
//Some business logic
}
}
With this change, I no longer needed the CustomSpringContext
class and no longer needed to load the beans manually in the open
method of theTransformFunction
The Apache Flink pipeline would then be created as follows :
env.fromCollection(inputRecprds).setParallelism(1)
.map(new TransformFunction()).setParallelism(30).print().setParallelism(30);