Search code examples
javagoogle-cloud-platformgoogle-cloud-storageapache-beamapache-beam-io

How to read zipped gzip csv files saved in cloud storage in apache beam without extracting


I have GZIP zipped csv files getting uploaded in cloud storage through third party. I am using apache beam continuous streaming pipeline to read zipped files metadata(file name , full path).I have one more requirement to read first line and last line of this csv files. I am using following codes to get all zipped file that is being added to cloud storage bucket folder.

    pipeline.apply("MatchFile(s)", FileIO.match()
            .filepattern(zipFilePath)
            .continuously(Duration.standardMinutes(1), Watch.Growth.never()))
            .apply(Window.<~>into(FixedWindows.of(Duration.standardMinutes(1))))
            .apply("Get Compressed File(s)", ParDo.of(new GetCompressedFile()));

static class GetCompressedFile extends DoFn<MatchResult.Metadata, Void> {
    @ProcessElement
    public void processElement(ProcessContext context) throws ParseException {

            ResourceId inputFile = context.element().resourceId();
            String fileName = inputFile.getFilename();
            String currentDirectoryPath = inputFile.getCurrentDirectory().toString();

I am able to get zipped file name and path but I am not able to read csv files without extracting it. I tried some google answer to read zipped file but that is not reading from cloud storage.


Solution

  • I am able to read zipped file by using following codes without extracting it. May be it will help somebody.

        pipeline.apply("MatchFile(s)", FileIO.match()
                .filepattern(zipFilePath)
                .continuously(Duration.standardMinutes(1), Watch.Growth.never()))
                .apply(Window.<MatchResult.Metadata>into(FixedWindows.of(Duration.standardMinutes(1))))
                .apply(FileIO.readMatches().withCompression(GZIP))
                .apply("Read Files",ParDo.of(new ReadFilesGZIP()));
    
        pipeline.run();
    }
    
    static class ReadsFilesGZIP extends DoFn<FileIO.ReadableFile,String>{
        @ProcessElement
        public void processElement(ProcessContext context){
            FileIO.ReadableFile file = context.element();
    
    
                ReadableByteChannel readableByteChannel = file.getCompression().readDecompressed(FileSystems.open(file.getMetadata().resourceId()));
                try (BufferedReader r = new BufferedReader(new InputStreamReader(Channels.newInputStream(readableByteChannel)))) {
                    String line;
                    Stream<String> fileLines = r.lines();
    

    }