i am looking for example implementations of spring-batch remote chunking with FlatFileItemWriter in Worker.
I am using RabbitMQ, SpringBatch, Spring Integration.
With
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
When i tried, i got org.springframework.batch.integration.chunk.AsynchronousFailureException: Failure or interrupt detected in handler:>org.springframework.batch.item.WriterNotOpenException: Writer must be open before it can be written to
Exception.
@Bean
public FlatFileItemWriter<MyResponse> flatWriter() {
FlatFileItemWriter<MyResponse> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("writer_queue_remote.csv"));
writer.setAppendAllowed(true);
writer.setHeaderCallback(new FlatFileHeaderCallback() {
@Override
public void writeHeader(Writer writer) throws IOException {
List<String> headersList = Arrays.asList("id","name");
String headers = headersList.stream().collect(Collectors.joining(","));
writer.write(headers);
}
});
writer.setLineAggregator(new DelimitedLineAggregator<MyResponse>() {
{
setDelimiter(",");
setFieldExtractor(new MyFieldExtractor());
}
});
return writer;
}
// Worker Integrationflow
@Bean
public IntegrationFlow chunkIntegrationFlow() {
// TODO Auto generated method stub
return this.remoteChunkingWorkerBuilder.itemProcessor(chunkProcessor()).itemWriter(flatWriter())
.inputChannel(requestChannel()).outputChannel(repliesChannel()).build();
}
I am writing "MyResponse" Object to a file.
Please help me either by pointing to an example implementation or guide me to rectify this exception. ( Cause of the exception is ItemWriter.OutputState is not initialized )
In a remote chunking setup, there is no Spring Batch step on the worker side that would handle the writer's lifecycle (open/close, etc). So you need to open the writer before using it in your chunkIntegrationFlow
.