Search code examples
javagoogle-cloud-dataflowapache-beamdataflow

Is there a way to create a list of SpecificRecord in a ParDo transformation in Beam for writing Parquet files?


I am trying to write a Dataflow job in Beam/Java to process a series of events coming from Pub/Sub and writing to Parquet. The events in Pub/Sub are in JSON format, and every event can generate one or more rows. I was able to write a very simple example writing a ParDo transformation returning just 1 record. The ParDo looks like this

    static class GenerateRecords extends DoFn<String, GenericRecord> {
        @ProcessElement
        public void processElement(ProcessContext context) {
            final GenericData.Record record = new GenericData.Record(schema);
            String msg = context.element();

            com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);


            context.output(pRecord);
        }
    }

and the write part of the pipeline

                .apply("Write to file",
                FileIO.<GenericRecord>
                        write()
                        .via(
                                ParquetIO.sink(schema)
                                        .withCompressionCodec(CompressionCodecName.SNAPPY)
                        )
                        .to(options.getOutputDirectory())
                        .withNumShards(options.getNumShards())
                        .withSuffix("pfile")
                );

My question is, how do I generalize this ParDo transformation to return a list of records? I tried List but that does not work, the ParquetIO.sink(schema) barks at "cannot resolve method via".


Solution

  • You can invoke context.output() in your DoFn as much times as you need. So, if you know the business logic under which circumstances you need to emit several records then you just have to call context.output(record) for every output record. It should be more simple than to have a PCollection of containers.

    PS: Btw, I have a simple example of how to write GenericRecords with ParquetIO and AvroCoder that perhaps could be helpful.