Search code examples
pythongoogle-cloud-dataflowapache-beamparquetpyarrow

Writing files to dynamic destinations in Parquet using Apache Beam Python SDK


I am trying to write Parquet files using dynamic destinations via the WriteToFiles class.

I even found some further developed example like this one, where they build a custom Avro file sink.

I am currently trying to use the pyarrow library to write a Parquet sink that could manage the write operation in a distributed way, similarly to how it is done by the WriteToParquet PTransform.

class ParquetFileSink(fileio.FileSink):
    def __init__(self, schema, codec='deflate'):
        self._schema = schema
        self._codec = codec
        self.writer = None

    def open(self, fh):
        # This is called on every new bundle.
        self.writer = pq.ParquetWriter(
            fh,
            self._schema,
            compression=self._codec,
            use_deprecated_int96_timestamps=False
        )

    def write(self, record):
        # This is called on every element.
        row = pa.Table.from_pandas(
            pd.DataFrame(record), schema=self._schema, preserve_index=False
        )
        self.writer.write_table(row)

    def flush(self):
        pass

The main issue here is that it is not possible, as far as I know, to write unbounded PCollections as Parquet files, so if I try to use the following class to write by record either I get an error for writing on closed file handlers, or some files are simply not created. I also tried to write batches using a GroupByKey PTransform, however as it is not possible to close the pyarrow.parquet.ParquetWriter object, files end up written only partially and being corrupted. Moreover this strategy is not safe as batches could be very large and to write them as a single file is not a good idea.

I can see that this problem is being faced in the class apache_beam.io.parquetio._ParquetSink, but I don't think this can be directly applied to the WriteToFiles class as I can't see how to fully manage file handlers with it.


Solution

  • I faced a similar problem and I ended up writing a ParquetSink that could be used with WriteToFiles. So it batches the records in memory given your configuration. I've used this to create dynamic files in a batch process dependent on a field in the record, but I assume it would also work with a streaming pipeline, although I haven't tested it.

    You can find the code in this gist

    class ParquetSink(fileio.FileSink):
        def __init__(self,
                    file_path_prefix,
                    schema,
                    row_group_buffer_size=64 * 1024 * 1024,
                    record_batch_size=1000,
                    codec='none',
                    use_deprecated_int96_timestamps=False,
                    file_name_suffix='',
                    num_shards=0,
                    shard_name_template=None,
                    mime_type='application/x-parquet'):
            self._inner_sink = beam.io.parquetio._create_parquet_sink(
                file_path_prefix,
                schema,
                codec,
                row_group_buffer_size,
                record_batch_size,
                use_deprecated_int96_timestamps,
                file_name_suffix,
                num_shards,
                shard_name_template,
                mime_type
            )
            self._codec = codec
            self._schema = schema
            self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
    
        def open(self, fh):
            self._pw = pyarrow.parquet.ParquetWriter(
                fh,
                self._schema,
                compression=self._codec,
                use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
    
        def write(self, record):
            self._inner_sink.write_record(self._pw, record)
    
        def flush(self):
            if len(self._inner_sink._buffer[0]) > 0:
                self._inner_sink._flush_buffer()
            if self._inner_sink._record_batches_byte_size > 0:
                self._inner_sink._write_batches(self._pw)
    
            self._pw.close()
    
    
    
       def parquet_compatible_filenaming(suffix=None):
        def _inner(window, pane, shard_index, total_shards, compression, destination):
            return fileio.destination_prefix_naming(suffix )(
                window, pane, shard_index, total_shards, compression, destination).replace(":", ".")
    
        return _inner
    
    
    def get_parquet_pipeline(pipeline_options, input, output):
        with beam.Pipeline(options=pipeline_options) as p:
            lines = (p 
                        | 'Read' >> beam.io.ReadFromParquet(file_pattern=input)
                        | 'Transform' >> beam.Map(lambda x: { 'some_key': x['some_key'], 'raw': x})
                        | 'Write to Parquet' >> fileio.WriteToFiles(
                                                    path=str(output),
                                                    destination=lambda x: x["some_key"],
                                                    sink=lambda x: ParquetSink(
                                                                        file_path_prefix=output,
                                                                        file_name_suffix=".parquet",
                                                                        codec="snappy",
                                                                        schema=pyarrow.schema([
                                                                            pyarrow.field("some_key", pyarrow.string()),
                                                                            pyarrow.field("raw", pyarrow.string())
                                                                        ])),
                                                        file_naming=parquet_compatible_filenaming(suffix=".parquet")
                                                    )
            )