I have a Java Iterable object, Iterable records. And I want to pass it to the Beam pipeline. I tried
PCollection csvRecordPC = p.apply("Create collection", Create.of(records));
It caused an error
An exception occured while executing the Java class. Can not determine a default Coder for a 'Create' PTransform that has no elements. Either add elements, call Create.empty(Coder), Create.empty(TypeDescriptor), or call 'withCoder(Coder)' or 'withType(TypeDescriptor)' on the PTransform.
Which Coder should I use? Or how can I write my custom coder?
I found a solution using FileIO.
p.apply(FileIO.match().filepattern(options.getInputFile()))
.apply(FileIO.readMatches())
.apply(ParDo.of(new CsvParser()))
The CsvPaser() is
public class CsvParser extends DoFn<ReadableFile, CSVRecord> {
@DoFn.ProcessElement
public void processElement(@Element ReadableFile element, DoFn.OutputReceiver<CSVRecord> receiver) throws IOException {
InputStream is = Channels.newInputStream(element.open());
Reader reader = new InputStreamReader(is);
Iterable<CSVRecord> records = CSVFormat.EXCEL.withFirstRecordAsHeader().parse(reader);
for (CSVRecord record : records) {
receiver.output(record);
}
}
}