Search code examples
javagoogle-cloud-platformpipelineapache-beam

Java: read excel file stored in a bucket using apache beam pipeline


I know the below way but I need in Apache beam pipeline please provide an example :

try (ReadableByteChannel chan = FileSystems.open(FileSystems.matchNewResource(
            "gs://bucketname/filename.xlsx", false ))) {
      InputStream inputStream = Channels.newInputStream(chan);

Solution

  • I have implemented reading a .xlsx file from the local file system but the same should work for your GCS bucket path. I have tried the same in a different pipeline, and it worked fine.

    The enrichedCollection in the below code can be treated like a .csv file being read line by line. I have used semicolons as a delimiter to separate out the values.

    package com.fooBar;
    
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.FileIO;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.poi.ss.usermodel.Cell;
    import org.apache.poi.ss.usermodel.Row;
    import org.apache.poi.xssf.usermodel.XSSFSheet;
    import org.apache.poi.xssf.usermodel.XSSFWorkbook;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.nio.channels.Channels;
    import java.util.Iterator;
    
    public class SampleExcelInput {
    
        public static void main(String[] args) throws IOException {
            
            Pipeline pipeline = Pipeline.create();
            
            PCollection<FileIO.ReadableFile> inputCollection  = pipeline.apply(FileIO.match()
    //                .filepattern("gs://bucket/file.xlsx"))
                            .filepattern("C:\\Workspace\\ApacheBeam\\src\\main\\resources\\Inputfiles\\SampleExcel.xlsx"))
                    .apply(FileIO.readMatches());
            
            PCollection<String> enrichedCollection = inputCollection.apply(ParDo.of(new ReadXlsxDoFn()));
            //TODO: do further processing treating the lines of enrichedCollection pcollection as if they were read from csv
            pipeline.run().waitUntilFinish();
        }
    
        static class ReadXlsxDoFn extends DoFn<FileIO.ReadableFile, String>{
            final static String  DELIMITER  = ";";
            @ProcessElement
            public void process(ProcessContext c) throws IOException {
                FileIO.ReadableFile  fileName = c.element();
                System.out.println("FileName being read is :" + fileName);
                assert fileName != null;
                InputStream stream = Channels.newInputStream(fileName.openSeekable());
                XSSFWorkbook wb = new XSSFWorkbook(stream);
                XSSFSheet sheet = wb.getSheetAt(0);     //creating a Sheet object to retrieve object
                //iterating over Excel file
                for (Row row : sheet) {
                    Iterator<Cell> cellIterator = row.cellIterator();   //iterating over each column
                    StringBuilder sb  = new StringBuilder();
                    while (cellIterator.hasNext()) {
                        Cell cell = cellIterator.next();
                        if(cell.getCellType() ==  Cell.CELL_TYPE_NUMERIC){
                            sb.append(cell.getNumericCellValue()).append(DELIMITER);
                        }
                        else{
                            sb.append(cell.getStringCellValue()).append(DELIMITER);
                        }
                    }
                    System.out.println(sb.substring(0, sb.length()-1));
                c.output(sb.substring(0, sb.length()-1));//removing the delimiter present @End of String 
    
                }
            }
        }
    }
    

    For the Dependencies I had to manually add some jars to make it work, you can take that reference from here

    Apart from the above Jars I have the following as my maven dependencies.

             <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-sdks-java-core</artifactId>
                <version>2.37.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-runners-direct-java</artifactId>
                <version>2.37.0</version>
            </dependency>
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.11.0</version>
            </dependency>
    

    Link for The sample .xlsx file : here

    Console output from the DoFn

    FileName being read is :ReadableFile{metadata=Metadata{resourceId=C:\Users\USER\Desktop\Java 
       
       
    
    
        
        FileName being read is :ReadableFile{metadata=Metadata{resourceId=C:\Users\USER\Desktop\Java Masterclass\ApacheBeam\src\main\resources\Inputfiles\SampleExcel.xlsx, sizeBytes=7360, isReadSeekEfficient=true, checksum=null, lastModifiedMillis=0}, compression=UNCOMPRESSED}
    0.0;First Name;Last Name;Gender;Country;Age;Date;Id
    1.0;Dulce;Abril;Female;United States;32.0;15/10/2017;1562.0
    2.0;Mara;Hashimoto;Female;Great Britain;25.0;16/08/2016;1582.0
    3.0;Philip;Gent;Male;France;36.0;21/05/2015;2587.0
    4.0;Kathleen;Hanner;Female;United States;25.0;15/10/2017;3549.0
    5.0;Nereida;Magwood;Female;United States;58.0;16/08/2016;2468.0
    6.0;Gaston;Brumm;Male;United States;24.0;21/05/2015;2554.0
    7.0;Etta;Hurn;Female;Great Britain;56.0;15/10/2017;3598.0
    8.0;Earlean;Melgar;Female;United States;27.0;16/08/2016;2456.0  
    .
    .
    .
    50.0;Rasheeda;Alkire;Female;United States;29.0;16/08/2016;6125.0
    
    Process finished with exit code 0
    

    Note: Since the file is being parsed line by line in a Simple DoFn, this would mean it would be one thread per file. if you have just a single file with a very large size say ~5GB you will see notice a significant performance drop. One workaround for this would be to make the sizes of the input files small and use a wildcard file pattern.