Search code examples

How to split the result of GroupByKey() transform based on key and write values into GCS bucket using Apache Beam python?

I am new to Apache Beam, Dataflow and Python and any help would be appreciated. I have a requirement where I need to generate reports by fetching records from BigQuery table and writing the results into GCS bucket using Apache Beam in python.I wrote the pipeline as below -

#Here I am converting the BigQuery output to 2 element tuple where elements are dictionaries for ex : 
({'institution_id' :'100'},{'customer_id' : '1000','customer_name': 'ABC','customer_email' : '[email protected]','phone_number': '00012345'})

class convtotupleofdict(beam.DoFn):
     def process(self,element):
         return[( {'institiution_id' : element['institiution_id'] },
                  'customer_name' :  element['customer_name'],
                  'customer_email' : element['customer_email'],
                  'phone_number' : element['phone_number']})]

with beam.Pipeline(options=pipeline_options) as p:
   csv_ip=    p | 'ReadfromBQ' >>'SELECT institiution_id,customer_id,customer_name,customer_email,phone_number from <table name> where customer_status='Active' order by
    institiution_id,customer_id', use_standard_sql=True) \
                 | 'ConvttoTupleofDict' >> beam.ParDo(convtotupleofdict()) \
                 | 'Groupbyinstitution_id' >> beam.GroupByKey() \
    op_gcs= csv_ip 
                   | 'WritetoGCS' >>
                                  sink=lambda dest

I am using GroupbyKey() transform to group the data based on institution_id so that I can split the data based on institution_id and create separate files for each institution_id in GCS bucket. The GroupbyKey() output is as follows-

({'institution_id' :'100'},{'customer_id' : '1000','customer_name': 'ABC','customer_email' : '[email protected]','phone_number': '00012345'},{'customer_id' : '2000','customer_name': 'XYZ','customer_email' : '[email protected]','phone_number': '12378'})
({'institution_id' :'200'},{'customer_id' : '3000','customer_name': 'MNO','customer_email' : '[email protected]','phone_number': '789102'},{'customer_id' : '4000','customer_name': 'PQR','customer_email' : '[email protected]','phone_number': '123789'})

Now,I am struggling to convert the GroupbyKey() output to csv file to upload to GCS bucket.I got to know about from as this can be used for writing files to dynamic destinations. In order to split the data by institution_id,how should I provide the following parameters of WritetoFiles - path,destination,sink and file_naming. I understand destination and sink are callables,but I am not able to build it.I am kind of stuck at this point and not able to proceed.I am actually getting confused between the params destination and sink ,how should I write it to split the data based on institution_id and generate csv file ? For now,I am testing my code with DirectRunner.


  • I hope it can help.

    I propose you a complete solution for your need.

    Beam file :

    import logging
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from your_root_folder.file_io_customer_by_institution_transform import \
    def run():
        pipeline_options = PipelineOptions()
        with beam.Pipeline(options=pipeline_options) as p:
            input = [
                    'institution_id': '100',
                    'customer_id': '1000',
                    'customer_name': 'ABC',
                    'customer_email': '[email protected]',
                    'phone_number': '00012345'
                    'institution_id': '100',
                    'customer_id': '1001',
                    'customer_name': 'ABCD',
                    'customer_email': '[email protected]',
                    'phone_number': '00012346'
                    'institution_id': '101',
                    'customer_id': '1001',
                    'customer_name': 'ABCD',
                    'customer_email': '[email protected]',
                    'phone_number': '00012346'
                    | beam.Create(input)
                    | "Group customers by institution" >> beam.GroupBy(lambda customers: customers['institution_id'])
                    | f"Write file to GCS" >> FileIOCustomerByInstitutionTransform('gs://mazlum_dev/dynamicfiles')
    if __name__ == "__main__":

    job_config file :

    class JobConfig:
        FILE_ENCODING = 'utf-8'
        CSV_SEPARATOR = ','
 file :

    from datetime import datetime
    from typing import List, Dict, Tuple, Iterable
    import apache_beam as beam
    from import TextSink, WriteToFiles
    from pytz import timezone
    from integration_ocd.pythonjobs.common.module.job_config import JobConfig
    class InstitutionDestinationParamError(Exception):
    class FileIOCustomerByInstitutionTransform(beam.PTransform):
        def __init__(self, output_path: str):
            self._output_path = output_path
        def expand(self, pcoll):
            return (
                    | f"Write files to GCS" >>
                        sink=lambda institution_dest: CustomCsvSink())
        def build_file_name(self, *args) -> str:
            Build the file name dynamically from parameters given by Beam in the 'writeToFile' PTransform
            A destination is built with institution as value (key of group in the PCollection), then the file name
            is built from this institution (5th argument)
            file_name_timestamp ='Europe/Paris')).strftime(JobConfig.FILE_NAME_TIMESTAMP_FORMAT)
                institution_destination: str = args[5]
                return f'CUSTOMER_INSTITUTION_{institution_destination}_{file_name_timestamp}.csv'
            except Exception as err:
                raise InstitutionDestinationParamError('The institution destination param must be passed', err)
    class CustomCsvSink(TextSink):
        def __init__(self):
        def write(self, customers_with_institution):
            customers: Iterable[Dict[str, str]] = customers_with_institution[1]
            for index, customer in enumerate(customers, start=1):
                if index == 1:
                    header_field_names: bytes = self.build_csv_header_file(customer)
                customer_csv_entry = self.convert_dict_to_csv_record(customer)
        def get_csv_line_break(self) -> bytes:
            return '\n'.encode(JobConfig.FILE_ENCODING)
        def build_csv_header_file(self, customer_dict: Dict[str, str]) -> bytes:
            header_field_names: str = JobConfig.CSV_SEPARATOR.join(customer_dict.keys())
            return header_field_names.encode(JobConfig.FILE_ENCODING)
        def convert_dict_to_csv_record(self, customer_dict: Dict[str, str]) -> bytes:
            Turns dictionary values into a comma-separated value formatted string
            The separator is added to a configuration file
            customer_csv_record: str = JobConfig.CSV_SEPARATOR.join(map(str, customer_dict.values()))
            return customer_csv_record.encode(JobConfig.FILE_ENCODING)
    def to_institution_destination(customers_with_institution: Tuple[str, List[Dict[str, str]]]) -> str:
        Map the given tuple to the institution as destination in 'WriteToFiles' PTransform.
        Then this destination can be used in the 'file_name' part.
        return customers_with_institution[0]

    Some explanations :

    • The input data is mocked in my example with customers data
    • The first operation in the main file, is a group by with Beam on institution_id field
    • For step after, I created a separated file with a PTransform containing all the logic to write dynamic files file :

    • This file use a little configuration with JobConfig object
    • A CSVSink is created to generate the CSV lines from elements in the PCollection
    • The filename is generated by the current institution ID and the current timestamp
    • A file is generated per elements grouped by institution ID

    In my example :

    • file 1 with institution 100 : CUSTOMER_INSTITUTION_100_20221011160828.csv => contains 2 CSV lines

    • file 2 with institution 101 : CUSTOMER_INSTITUTION_101_20221011160828.csv => contains 1 CSV line

    To be honnest the documentation is not complete in Beam Python for this kind of use case and the use of WriteToFiles.