Search code examples
spring-batchchunking

FlatFileItemWriter in Remote Chunking worker


i am looking for example implementations of spring-batch remote chunking with FlatFileItemWriter in Worker.

I am using RabbitMQ, SpringBatch, Spring Integration.

With

<spring-cloud.version>Greenwich.SR1</spring-cloud.version>

When i tried, i got org.springframework.batch.integration.chunk.AsynchronousFailureException: Failure or interrupt detected in handler:>org.springframework.batch.item.WriterNotOpenException: Writer must be open before it can be written to
Exception.


        @Bean
    public FlatFileItemWriter<MyResponse> flatWriter() {
        FlatFileItemWriter<MyResponse> writer = new FlatFileItemWriter<>();

        writer.setResource(new FileSystemResource("writer_queue_remote.csv"));
        writer.setAppendAllowed(true);

        writer.setHeaderCallback(new FlatFileHeaderCallback() {

            @Override
            public void writeHeader(Writer writer) throws IOException {
                List<String> headersList = Arrays.asList("id","name");
                String headers = headersList.stream().collect(Collectors.joining(","));
                writer.write(headers);
            }
        });

        writer.setLineAggregator(new DelimitedLineAggregator<MyResponse>() {
            {
                setDelimiter(",");
                setFieldExtractor(new MyFieldExtractor());
            }
        });

        return writer;
    }


    // Worker Integrationflow
    @Bean
    public IntegrationFlow chunkIntegrationFlow() {
        // TODO Auto generated method stub
        return this.remoteChunkingWorkerBuilder.itemProcessor(chunkProcessor()).itemWriter(flatWriter())
                .inputChannel(requestChannel()).outputChannel(repliesChannel()).build();
    }

I am writing "MyResponse" Object to a file.

Please help me either by pointing to an example implementation or guide me to rectify this exception. ( Cause of the exception is ItemWriter.OutputState is not initialized )


Solution

  • In a remote chunking setup, there is no Spring Batch step on the worker side that would handle the writer's lifecycle (open/close, etc). So you need to open the writer before using it in your chunkIntegrationFlow.