I'm experimenting with SCDF and successfully running Spring Batch Jobs as Tasks. But I'm having a issue with Task Arguments persisting. It seems that each time I need to execute the Task I should provide it with the command line arguments.
In my use case I need the command line arguments to be set once and for all for a Task.
Thank you
After some research I ended up using the Parameters instead of the Arguments.
First I created a Spring Batch with multiple CommandLineRunners (2 in my case), one for the production which will use the "application properties" that will be overridden by SCDF parameters, and one for the other environments (DEV, ...) that will get launched through simple command line arguments or through API.
First CommandLineRunner:
@Component @Slf4j @Profile("prod") public class ProdJobCommandLineRunner implements CommandLineRunner {
@Value("${jobname}")
private String jobName;
@Value("${argument1}")
private String argument1;
@Value("${argument2}")
private String argument2;
@Autowired
private ApplicationContext context;
@Autowired
private JobLauncher jobLauncher;
@Override
public void run(String... args) {
log.info("Begin Launching Job with Args {}", Arrays.asList(args));
log.error("JOB NAME: " + jobName);
if (!CollectionUtils.isEmpty(Arrays.asList(args))) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("argument1", argument1);
jobParametersBuilder.addString("argument2", argument2);
try {
Job job = (Job) context.getBean(jobName);
jobLauncher.run(job, jobParametersBuilder.toJobParameters());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
log.error("Exception ", e);
}
}
log.info("End Launching Job with Args {}", Arrays.asList(args));
}
}
Second CommandLineRunner:
@Component @Slf4j @Profile("!prod") public class DefaultJobCommandLineRunner implements CommandLineRunner {
@Autowired
private ApplicationContext context;
@Autowired
private JobLauncher jobLauncher;
@Override
public void run(String... args) {
log.info("Begin Launching Job with Args {}", Arrays.asList(args));
if (!CollectionUtils.isEmpty(Arrays.asList(args))) {
Map<String, String> params = parseJobArgs(args);
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
if (Boolean.parseBoolean(params.getOrDefault("force_restart", "false"))) {
jobParametersBuilder.addString("force_restart", LocalDateTime.now().toString());
}
try {
String jobName = params.get("job_name");
log.info("JOB NAME: " + jobName);
Job job = (Job) context.getBean(jobName);
jobLauncher.run(job, jobParametersBuilder.toJobParameters());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
log.error("Exception ", e);
}
}
log.info("End Launching Job with Args {}", Arrays.asList(args));
}
private Map<String, String> parseJobArgs(String[] args) {
Map<String, String> params = new HashMap<>();
Arrays.asList(args).forEach(arg -> {
String key = StringUtils.trimAllWhitespace(arg.split("=")[0]);
String value = StringUtils.trimAllWhitespace(arg.split("=")[1]);
params.put(key, value);
});
return params;
}
}
Import the app in SCDF, say for example TESTAPP
Create multiple Tasks, depending on how many use cases you have, using the same imported app
For each task when launched for the first time, set the Parameters you have following the naming convention: app. "APP_NAME". "property key"="property value"
In this case for example it will be: app.TESTAPP.jobname=JOB_NAME
I hope this helps.