Search code examples
pythongoogle-cloud-dataflowapache-beam

Transformation of csv to json in Dataflow


The below is my transformation part from csv to json. The csv has three columns.

class ConvertCsvToJson(beam.DoFn):
    def process(self, element):
        # Assuming the CSV has three columns: col1, col2, col3
        col1, col2, col3 = element.split(',')
        
        # Create a JSON object
        json_obj = {
            'col1': col1.strip(),
            'col2': col2.strip(),
            'col3': col3.strip()
        }
        
        yield json_obj

and the pipeline code as

(pipeline
        | 'Read CSV' >> ReadFromText(input_file, skip_header_lines=1)  # Skip the first line
        | 'Convert to JSON' >> beam.ParDo(ConvertCsvToJson())

the problem is the json code is coming as below because I dont have a loop. I cant seem to figure out on the fix

{'col1': '100001', 'col2': 'Claude Potier (Earth-616)', 'col3': 'Secret'}
{'col1': '100002', 'col2': 'Elektra Natchios (Earth-616)', 'col3': 'Secret'}

it should be coming as

[
  {
    "col1": "100001",
    "col2": "Claude Potier (Earth-616)",
    "col3": "Secret"
  },
  {
    "col1": "100002",
    "col2": "Elektra Natchios (Earth-616)",
    "col3": "Secret"
  }
]

also is there a way I can generate the col1,clo2 based on the headers from the csv itself? Also the pipeline almost takes 5 minutes to run, is there a way to speed up the process?


Solution

    1. Beam is for the distributed computing. So it does not know the entire data to dump everything as a big json object.
    2. You can grab the headers by just reading the first line.
    3. Maybe a better way to handle the json format is to use beam.dataframe:
    import apache_beam as beam
    pipeline = beam.Pipeline()
    
    beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')
    beam.dataframe.io.to_json(beam_df, 'solar_events.jsonl') 
    
    pipeline.run()
    

    solar_events.csv is

    timestamp,event
    2021-03-20 09:37:00,March Equinox
    2021-06-21 03:32:00,June Solstice
    2021-09-22 19:21:00,September Equinox
    2021-12-21 15:59:00,December Solstice
    

    but the output json is

    {"timestamp":{"0":"2021-03-20 09:37:00","1":"2021-06-21 03:32:00","2":"2021-09-22 19:21:00","3":"2021-12-21 15:59:00"},"event":{"0":"March Equinox","1":"June Solstice","2":"September Equinox","3":"December Solstice"}}
    

    This allows the data can be stored distributively. 4. Dataflow has the overhead to setup the computing environment. It could take 5 minutes or even more if your python code has many dependencies.