Search code examples
javaspringamazon-s3

Issues Processing a Large CSV File from S3 in Java


I am trying to retrieve and process a large CSV file (130,000 KB) from Amazon S3, but the processing takes over an hour, and at a certain point, an error occurs.

The code I use to retrieve the file is as follows:

   try (Reader reader = new InputStreamReader(s3ServiceStock.getObject(key).getObjectContent(), StandardCharsets.UTF_8);
              CSVReader csvReader = new CSVReaderBuilder(reader).withCSVParser(parser).build()) {

         importContent(csvReader )

        } catch (Exception e) {
            log.error("Error", e);
        }
}

The importContent method looks like this:

private void importContent(CSVReader csvReader) throws IOException {

        String[] nextRecord;
        while ((nextRecord = csvReader.readNext()) != null) {

          ...//some treatment

         repository.save(entity);
       }

}

However, at some point during the process, I encounter the following error, which seems to occur on different lines of the CSV file:

org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 130,272,542; received: 22,061,056)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178) ~[httpcore-4.4.11.jar:4.4.11]
    at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135) ~[httpclient-4.5.9.jar:4.5.9]
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90) ~[aws-java-sdk-core-1.11.604.jar:na]
    at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.604.jar:na]
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90) ~[aws-java-sdk-core-1.11.604.jar:na]
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90) ~[aws-java-sdk-core-1.11.604.jar:na]
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90) ~[aws-java-sdk-core-1.11.604.jar:na]
    at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.604.jar:na]
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90) ~[aws-java-sdk-core-1.11.604.jar:na]
    at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107) ~[aws-java-sdk-core-1.11.604.jar:na]
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90) ~[aws-java-sdk-core-1.11.604.jar:na]
    at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125) ~[aws-java-sdk-s3-1.11.604.jar:na]
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90) ~[aws-java-sdk-core-1.11.604.jar:na]
    at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) ~[na:na]
    at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) ~[na:na]
    at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[na:na]
    at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181) ~[na:na]
    at java.base/java.io.BufferedReader.fill(BufferedReader.java:161) ~[na:na]
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326) ~[na:na]
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392) ~[na:na]
    at com.opencsv.stream.reader.LineReader.readLine(LineReader.java:41) ~[opencsv-4.6.jar:na]
    at com.opencsv.CSVReader.getNextLine(CSVReader.java:436) ~[opencsv-4.6.jar:na]
    at com.opencsv.CSVReader.readNext(CSVReader.java:351) ~[opencsv-4.6.jar:na]
    at ...
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]

This error points to the line where ((nextRecord = csvReader.readNext()) != null) { is called in the importContent method.

I thought the issue might be that maintaining the connection to the S3 repository for such a long time was causing the error. Therefore, I tried using TransferManager to download the file:

@Override
    public void downloadFileWithTransferManager(String key, String downloadFilePath){
        TransferManager transferManager = TransferManagerBuilder.standard()
                .withS3Client(s3)
                .withExecutorFactory(() -> Executors.newFixedThreadPool(5))
                .withMinimumUploadPartSize(Long.valueOf(5L * 1024 * 1024)) // Tamaño mínimo por parte
                .withMultipartUploadThreshold(Long.valueOf(10L * 1024 * 1024)) // Umbral para multipart uploads
                .build();
        Download download = transferManager.download(bucketName, key, new File(downloadFilePath));
        try{
            download.waitForCompletion();
        } catch (Exception e) {
            LOG.log(Level.FINER, e.getMessage());
        }finally {
            transferManager.shutdownNow();
        }
    }

However, when I reach the download method, I get a 400 Bad Request error.

Does anyone know how I can efficiently retrieve the entire file from S3 without errors? I appreciate any help or suggestions.

Thank you!


Solution

  • Update: the problem was the S3 configuration. The process was so long that the S3 at certain point cut the connection and the process threw an error. Changing the S3 configuration solved the problem.