The below is my transformation part from csv to json. The csv has three columns.
class ConvertCsvToJson(beam.DoFn):
def process(self, element):
# Assuming the CSV has three columns: col1, col2, col3
col1, col2, col3 = element.split(',')
# Create a JSON object
json_obj = {
'col1': col1.strip(),
'col2': col2.strip(),
'col3': col3.strip()
}
yield json_obj
and the pipeline code as
(pipeline
| 'Read CSV' >> ReadFromText(input_file, skip_header_lines=1) # Skip the first line
| 'Convert to JSON' >> beam.ParDo(ConvertCsvToJson())
the problem is the json code is coming as below because I dont have a loop. I cant seem to figure out on the fix
{'col1': '100001', 'col2': 'Claude Potier (Earth-616)', 'col3': 'Secret'}
{'col1': '100002', 'col2': 'Elektra Natchios (Earth-616)', 'col3': 'Secret'}
it should be coming as
[
{
"col1": "100001",
"col2": "Claude Potier (Earth-616)",
"col3": "Secret"
},
{
"col1": "100002",
"col2": "Elektra Natchios (Earth-616)",
"col3": "Secret"
}
]
also is there a way I can generate the col1,clo2 based on the headers from the csv itself? Also the pipeline almost takes 5 minutes to run, is there a way to speed up the process?
import apache_beam as beam
pipeline = beam.Pipeline()
beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')
beam.dataframe.io.to_json(beam_df, 'solar_events.jsonl')
pipeline.run()
solar_events.csv
is
timestamp,event
2021-03-20 09:37:00,March Equinox
2021-06-21 03:32:00,June Solstice
2021-09-22 19:21:00,September Equinox
2021-12-21 15:59:00,December Solstice
but the output json is
{"timestamp":{"0":"2021-03-20 09:37:00","1":"2021-06-21 03:32:00","2":"2021-09-22 19:21:00","3":"2021-12-21 15:59:00"},"event":{"0":"March Equinox","1":"June Solstice","2":"September Equinox","3":"December Solstice"}}
This allows the data can be stored distributively. 4. Dataflow has the overhead to setup the computing environment. It could take 5 minutes or even more if your python code has many dependencies.