Search code examples
javaamazon-s3spring-batchspring-cloudspring-cloud-aws

Reading S3 Resource with SpringBatch's ItemReaders issue


I have a Spring Batch Job that reads a bunch of files from an S3 Bucket, process them and then send it to a database, doing this in a multi-threaded configuration. The application.properties file contains this:

cloud.aws.credentials.accessKey=accessKey 
cloud.aws.credentials.secretKey=secret
cloud.aws.region.static=us-east-1
cloud.aws.credentials.instanceProfile=true 
cloud.aws.stack.auto=false

My ItemReader:

@Bean
ItemReader<DataRecord> itemReader() {
    FlatFileItemReader<DataRecord> flatFileItemReader = new FlatFileItemReader<>();
    flatFileItemReader.setLinesToSkip(0);
    flatFileItemReader.setLineMapper(new DataRecord.DataRecordLineMapper());
    flatFileItemReader.setSaveState(false);

    MultiResourceItemReader<DataRecord> multiResourceItemReader = new MultiResourceItemReader<>();
    multiResourceItemReader.setDelegate(flatFileItemReader);
    multiResourceItemReader.setResources(loadS3Resources(null, null));
    multiResourceItemReader.setSaveState(false);

    SynchronizedItemStreamReader<DataRecord> itemStreamReader = new SynchronizedItemStreamReader<>();
    itemStreamReader.setDelegate(multiResourceItemReader);
    return itemStreamReader;
}

And my TaskExecutor:

@Bean
TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
    return threadPoolTaskExecutor;
}

The Job consist in only one Step where it reads from the files, process them and then writes to the DB. Under this configuration, the resources are loaded, the Job starts and the step do the processing for the ~240k first lines of the first Resource (There are 7 Resources, each one with 1.2M lines). Then I get the following Exception:

org.springframework.batch.item.file.NonTransientFlatFileException: Unable to read from resource: [Amazon s3 resource [bucket='my-bucket' and object='output/part-r-00000']]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:220) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.read(MultiResourceItemReader.java:108) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.support.SynchronizedItemStreamReader.read(SynchronizedItemStreamReader.java:55) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:157) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:116) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:110) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.3.9.RELEASE.jar!/:4.3.9.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_65]
Caused by: javax.net.ssl.SSLException: SSL peer shut down incorrectly
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:596) ~[na:1.8.0_65]
at sun.security.ssl.InputRecord.read(InputRecord.java:532) ~[na:1.8.0_65]
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) ~[na:1.8.0_65]
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930) ~[na:1.8.0_65]
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) ~[na:1.8.0_65]
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:198) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135) ~[httpclient-4.5.3.jar!/:4.5.3]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:117) ~[aws-java-sdk-s3-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) ~[na:1.8.0_65]
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) ~[na:1.8.0_65]
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[na:1.8.0_65]
at java.io.InputStreamReader.read(InputStreamReader.java:184) ~[na:1.8.0_65]
at java.io.BufferedReader.fill(BufferedReader.java:161) ~[na:1.8.0_65]
at java.io.BufferedReader.readLine(BufferedReader.java:324) ~[na:1.8.0_65]
at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[na:1.8.0_65]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:201) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
... 23 common frames omitted

I would like to know if there's a simple way to fix this. Currently I'm thinking to just make a local copy of the files and then read from those but I would like to know if this Exception can be avoided by some configuration.

Thanks!


Solution

  • My guess would be one thread closing the SFTP session, while another thread is still processing.

    Better to use the MultiResourcePartitioner to create a partition per resource (file) and then have the reader pick up each file separately as it's own partition. With that configuration, you don't need the MultiResourceItemReader anymore as well (you can go straight to the delegate).

    Refer example here https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/resources/jobs/partitionFileJob.xml

    Also refer How to apply partitioned count for MultiResourceItemReader?