Search code examples
zipapache-flink

ZIP compressed input for Apache Flink


I need to read and processes a specific file within a zip archive in Apache Flink.

In the documentation, I found that

Flink currently supports transparent decompression of input files if these are marked with an appropriate file extension.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/batch/#read-compressed-files

Is it possible process it while decompressing on the fly in Apache Flink?


Solution

  • I want to share the solution I implemented meanwhile.

    So, after created my own InputFormat I used the following code within the open() method:

    @Override
    public void open(final FileInputSplit ignored) throws IOException {
        ...
        final XMLInputFactory xmlif = XMLInputFactory.newInstance();
        final XMLStreamReader xmlr = xmlif.createXMLStreamReader(filePath.toString(),
                  InputFormatUtil.readFileWithinZipArchive(filePath, nestedXmlFileName));
        while (xmlr.hasNext()) {
        ...
    }
    

    where the implementation of readFileWithinZipArchive(...) is:

    public static InputStream readFileWithinZipArchive(final Path zipPath, final String filename) throws IOException {
        // using org.apache.flink.core.fs.Path for getting the InputStream from the (remote) zip archive
        final InputStream zipInputStream = zipPath.getFileSystem().open(zipPath);
        // generating a temporary local copy of the zip file
        final File tmpFile = stream2file(zipInputStream);
        // then using java.util.zip.ZipFile for extracting the InputStream for the specific file within the zip archive
        final ZipFile zipFile = new ZipFile(tmpFile);
        return zipFile.getInputStream(zipFile.getEntry(filename));
    }