I am trying to write a Dataflow job in Beam/Java to process a series of events coming from Pub/Sub and writing to Parquet. The events in Pub/Sub are in JSON format, and every event can generate one or more rows. I was able to write a very simple example writing a ParDo transformation returning just 1 record. The ParDo looks like this
static class GenerateRecords extends DoFn<String, GenericRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
final GenericData.Record record = new GenericData.Record(schema);
String msg = context.element();
com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);
context.output(pRecord);
}
}
and the write part of the pipeline
.apply("Write to file",
FileIO.<GenericRecord>
write()
.via(
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
)
.to(options.getOutputDirectory())
.withNumShards(options.getNumShards())
.withSuffix("pfile")
);
My question is, how do I generalize this ParDo transformation to return a list of records? I tried List but that does not work, the ParquetIO.sink(schema) barks at "cannot resolve method via".
You can invoke context.output()
in your DoFn
as much times as you need. So, if you know the business logic under which circumstances you need to emit several records then you just have to call context.output(record)
for every output record. It should be more simple than to have a PCollection
of containers.
PS: Btw, I have a simple example of how to write GenericRecord
s with ParquetIO
and AvroCoder
that perhaps could be helpful.