Search code examples
csvapache-beamapache-commons-csv

How to read CSVRecord in apache beam?


I have a Java Iterable object, Iterable records. And I want to pass it to the Beam pipeline. I tried

PCollection csvRecordPC = p.apply("Create collection", Create.of(records));

It caused an error

An exception occured while executing the Java class. Can not determine a default Coder for a 'Create' PTransform that has no elements. Either add elements, call Create.empty(Coder), Create.empty(TypeDescriptor), or call 'withCoder(Coder)' or 'withType(TypeDescriptor)' on the PTransform.

Which Coder should I use? Or how can I write my custom coder?


Solution

  • I found a solution using FileIO.

    p.apply(FileIO.match().filepattern(options.getInputFile()))
     .apply(FileIO.readMatches())
     .apply(ParDo.of(new CsvParser())) 
    

    The CsvPaser() is

    public class CsvParser extends DoFn<ReadableFile, CSVRecord> {
        @DoFn.ProcessElement
        public void processElement(@Element ReadableFile element, DoFn.OutputReceiver<CSVRecord> receiver) throws IOException {
            InputStream is = Channels.newInputStream(element.open());
    
            Reader reader = new InputStreamReader(is);
    
            Iterable<CSVRecord> records = CSVFormat.EXCEL.withFirstRecordAsHeader().parse(reader);
    
            for (CSVRecord record : records) {
                receiver.output(record);
            }
        }
    }