Search code examples
javaapache-camelspring-batchreroute

How to read file using Apache Camel, process the file using Spring batch, read each line and route it back through Apache Camel


Would like to know how to pass the JAXBElement to a Camel route, which processed from each line of the batch file read through Spring batch loaded through Camel Route.

Code snippets given below uses a customerWriter method to call JMSTemplate to write the message to a Queue. Instead I need to route the message to another Camel route.

Current: CamelRoute -> ReadFile -> Spring Batch -> Process Each line -> Queue

Expected: CamelRoute -> ReadFile -> Spring Batch -> Process Each line -> Camel Route

Camel Route to read the file:

@Override
public void configure()  {

    String fromUri = batchLoadPath + "?" + batchFileOptions;
    from(fromUri).process(new Processor() {
        public void process(Exchange msg)  {
            File file = msg.getIn().getBody(File.class);
            String fileName = file.getAbsolutePath();

            try {
                JobParameters jobParameters = new JobParametersBuilder().addString("input.file.name", fileName).addDate("dateTime", new Date()).toJobParameters();
                jobLauncher.run(importCustomerJob, jobParameters);
            } catch (Exception e) {
                log.error(Process file encountered error:" + e.getMessage(), e);
            }
        }

    })
    .to("log:EndBatch");

Batch Config:

@Bean
public JmsItemWriter<String> customerWriter() {
    JmsItemWriter<String> writer = new JmsItemWriter<String>();
    writer.setJmsTemplate(jmsTemplate);
    return writer;
}

public Job importCustomerJob(JobCompletionNotificationListener listener, JobBuilderFactory jobBuilderFactory, Step step1) {
    JobBuilder builder = jobBuilderFactory.get("importCustomerJob");
    builder.incrementer(new RunIdIncrementer());
    builder.listener(listener);
    JobFlowBuilder jfb = builder.flow(step1);
    jfb.end();
    Job job = jfb.build().build();
    return job;
}

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory) {
    // Read chunk of 10 records and writing those as messages to queue
    return stepBuilderFactory.get("step1")
            .<Customer, String>chunk(10)
            .reader(customerReader())
            .faultTolerant()
            .skipPolicy(fileVerificationSkipper())
            .processor(customerItemProcessor())
            .writer(customerWriter())
            .listener(customerReader())
            .build();
}

Batch Processor:

public class CustomerItemProcessor implements ItemProcessor<Customer, String> {
    @Autowired
    JaxbUtil jaxbUtil;

    public String process(Customer item) throws Exception {
        // Mapping code goes here
        JAXBElement<CustomerX> mobj = customerFactory.createCustomerX(cp);
        return jaxbUtil.objectToXml(mobj);
    }
}

Solution

  • Well, from a Camel point of view, to call another Camel route you simply add a .to() statement. For example to call an in-memory route synchronously, you can use direct:.

    from(fromUri).process(new Processor() {
        public void process(Exchange msg)  {
            ... your processor impl
        }
    })
    .to("direct:yourOtherRoute")
    .to("log:EndBatch"); 
    
    from("direct:yourOtherRoute")
    ...
    

    To pass the result of the processor to the next route, the processor must set this result into the Exchange Body.