Search code examples

How to read CSVRecord in apache beam?

I have a Java Iterable object, Iterable records. And I want to pass it to the Beam pipeline. I tried

PCollection csvRecordPC = p.apply("Create collection", Create.of(records));

It caused an error

An exception occured while executing the Java class. Can not determine a default Coder for a 'Create' PTransform that has no elements. Either add elements, call Create.empty(Coder), Create.empty(TypeDescriptor), or call 'withCoder(Coder)' or 'withType(TypeDescriptor)' on the PTransform.

Which Coder should I use? Or how can I write my custom coder?


  • I found a solution using FileIO.

     .apply(ParDo.of(new CsvParser())) 

    The CsvPaser() is

    public class CsvParser extends DoFn<ReadableFile, CSVRecord> {
        public void processElement(@Element ReadableFile element, DoFn.OutputReceiver<CSVRecord> receiver) throws IOException {
            InputStream is = Channels.newInputStream(;
            Reader reader = new InputStreamReader(is);
            Iterable<CSVRecord> records = CSVFormat.EXCEL.withFirstRecordAsHeader().parse(reader);
            for (CSVRecord record : records) {