Search code examples
javaamazon-web-servicesamazon-s3aws-lambdazip4j

What is the best way of getting files from AWS S3, inserting them into zip and uploading that zip to the bucket?


I'm trying to find efficient way of downloading files from s3, insert them into zip file, and after inserting those files into archive, upload it to the bucket. It should be efficient as archive could reach almost 10Gb.I tried to use one thread with Zip4J`s library ZipOutputStream that is created from PipedOutputStream for inserting files into zip and simultaneously using another thread for uploading that zip to the bucket. The problem is result zip is corrupted, and I'm not able to copy any file from that zip, here's my code:

Part that download and insert files into zip archive:

private Thread getDownloadAndZipThread(List<String> keys, String filename, ZipFile zip, ZipParameters parameters) {
        return new Thread(() -> {
            long start = System.currentTimeMillis();
            try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)){
                keys.forEach(key -> {
                    //logger.log("\nprocessing file: " + key);
                    try {
                        parameters.setFileNameInZip(key);
                        zipOutputStream.putNextEntry(parameters);
                        pipedOutputStream.write(s3Service.getFile(key,               BUCKET_NAME).readAllBytes());  
                        zipOutputStream.closeEntry();
                        //zip.addStream(IOUtils.copy(s3Service.getFile(key, BUCKET_NAME),  pipedOutputStream), parameters);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
            } catch (Exception e) {
                logger.log("\nZipService - getDownloadAndZipThread - error: " + e);
            }
            long executedTime = System.currentTimeMillis() - start;
            logger.log("\nZipService - getDownloadAndZipThread - execution time: " + executedTime);
        });
    }

Part that upload zip to the bucket:

private Thread getS3Out(String filename) {
    return new Thread(() -> {
        long start = System.currentTimeMillis();
        try {
            s3Service.multipartUpload(filename, BUCKET_NAME, pipedInputStream);
            pipedInputStream.close();
        } catch (final Exception all) {
            logger.log("\nFailed to process outputStream due to error: " + all + "\n");
            System.exit(-3);
        }
        long executedTime = System.currentTimeMillis() - start;
        logger.log("\nZipService - getS3Out - execution time: " + executedTime);
        });
}

Part that call these threads:

String filename = generateZipName();
 ZipFile zip = new ZipFile(filename);

        
 ZipParameters parameters = new ZipParameters();
 parameters.setCompressionLevel(CompressionLevel.FASTEST);

 final Thread downloadAndZip = getDownloadAndZipThread(keys, filename, zip, parameters); 
 final Thread upload = getS3Out(filename);

 downloadAndZip.start();
 upload.start();
 try {
      downloadAndZip.join();
      upload.join();
     } catch (InterruptedException e) {
      logger.log("ZipService - failed to join thread due to error: " + e);
      throw new RuntimeException(e);
    }

I tried using built-in Java ZipOutput stream, but it works too slow and as I need to use this code in AWS lambda it should be better. It take 130 seconds to download - insert into zip - upload to the bucket.

Zip4J ZipOutputStream instead takes 80 seconds in with the same cycle.

Both variants was tested on 16 files with total size 1.6Gb.

I need to somehow fix issue with corrupted zip file for Zip4j case.


Solution

  • I want to post here a solution that I found back then, when the question was actual:

    public class ZipServiceImpl implements ZipService {
    
    private final Logger LOGGER = LoggerFactory.getLogger(ZipServiceImpl.class);
    
    @Autowired
    private AmazonS3ServiceImpl s3Service;
    
    /**
     * Method for simultaneous downloading files
     * from the S3 bucket, generating a zip and
     * uploading that zip into bucket
     *
     * @param keys
     */
    @Override
    public void createZip(Map<String, String> keys, String zipName) {
        final PipedOutputStream pipedOutputStream = new PipedOutputStream();
        final PipedInputStream pipedInputStream;
        try {
            pipedInputStream = new PipedInputStream(pipedOutputStream);
        } catch (IOException e) {
            LOGGER.error("ZipServiceImpl - createZip - failed to create input stream");
            throw new RuntimeException(e);
        }
    
        final Thread downloadAndZip = getDownloadAndZipThread(keys, pipedOutputStream);
        final Thread upload = getUploadThread(zipName, pipedInputStream);
    
        downloadAndZip.start();
        upload.start();
    
        try {
            downloadAndZip.join();
            upload.join();
        } catch (InterruptedException e) {
            LOGGER.error("ZipService - failed to join thread due to error: " + e);
            throw new RuntimeException(e);
        }
    }
    
    private Thread getDownloadAndZipThread(Map<String, String> keys, PipedOutputStream pipedOutputStream) {
        return new Thread(() -> {
            long start = System.currentTimeMillis();
            try (final ZipArchiveOutputStream zipOutputStream = new ZipArchiveOutputStream(pipedOutputStream)) {
                for (Map.Entry<String, String> entry : keys.entrySet()) {
                    try {
                        downloadAndZip(zipOutputStream, entry.getKey(), entry.getValue());
                    } catch (Exception e) {
                        LOGGER.error("ZipServiceImpl - getDownloadAndZipThread - failed to download file: " + entry.getKey());
                    }
                }
            } catch (Exception e) {
                LOGGER.error("ZipService - getDownloadAndZipThread - Failed to process inputStream due to error: " + e);
                throw new RuntimeException(e);
            }
            long executedTime = System.currentTimeMillis() - start;
            LOGGER.info("ZipService - getDownloadAndZipThread - execution time: " + executedTime);
        });
    }
    
    /**
     * Instantiating of thread for uploading file into bucket
     *
     * @param filename
     * @return
     */
    private Thread getUploadThread(String filename, PipedInputStream pipedInputStream) {
        return new Thread(() -> {
            long start = System.currentTimeMillis();
            try {
                s3Service.multipartUpload(filename, pipedInputStream);
                pipedInputStream.close();
            } catch (Exception e) {
                LOGGER.error("Failed to process outputStream due to error: " + e);
                throw new RuntimeException(e);
            }
            long executedTime = System.currentTimeMillis() - start;
            LOGGER.info("ZipService - getUploadThread - execution time: " + executedTime);
        });
    }
    
    /**
     * @param zipOutputStream -
     * @param awsKey          - name of the file that should be downloaded
     */
    private void downloadAndZip(ZipArchiveOutputStream zipOutputStream, String awsKey, String destName) {
    
        if (!s3Service.existsAssetByName(awsKey)) {
            String error = "ZipService - downloadAndZip - file with following aws key does not exist: " + awsKey;
            LOGGER.error(error);
            throw new RuntimeException(error);
        }
    
        ZipArchiveEntry entry = new ZipArchiveEntry(destName);
    
        try (InputStream inputStream = s3Service.getAssetByName(awsKey)) {
    
            zipOutputStream.putArchiveEntry(entry);
    
            byte[] buffer = new byte[1024];
            int len;
            while ((len = inputStream.read(buffer)) > 0) {
                zipOutputStream.write(buffer, 0, len);
            }
    
            zipOutputStream.closeArchiveEntry();
    
        } catch (Exception e) {
            LOGGER.error("ZipService - downloadAndZip - failed processing of: " + awsKey + " due to error: " + e);
            throw new RuntimeException(e);
        }
    }
    

    }