Search code examples

How to convert csv into a dictionary in apache beam dataflow

I would like to read a csv file and write it to BigQuery using apache beam dataflow. In order to do this I need to present the data to BigQuery in the form of a dictionary. How can I transform the data using apache beam in order to do this?

My input csv file has two columns, and I want to create a subsequent two column table in BigQuery. I know how to create data in BigQuery, thats straight forward, what I don't know is how to transform the csv into a dictionary. The below code is not correct but should give an idea of what i'm trying to do.

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
| 'read solar data' >> beam.Read('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   schema='month:INTEGER, tornado_count:INTEGER',,


  • Edit: as of version 2.12.0, Beam comes with new fileio transforms that allow you to read from CSV without having to reimplement a source. You can do this like so:

    def get_csv_reader(readable_file):
      # You can return whichever kind of reader you want here
      # a DictReader, or a normal csv.reader.
      if sys.version_info >= (3, 0):
        return csv.reader(io.TextIOWrapper(
        return csv.reader(
    with Pipeline(...) as p:
      content_pc = (p
                    | beam.Reshuffle()  # Useful if you expect many matches
                    | beam.FlatMap(get_csv_reader))

    I recently wrote a test for this for Apache Beam. You can take a look on the Github repository.

    The old answer relied on reimplementing a source. This is no longer the main recommended way of doing this : )

    The idea is to have a source that returns parsed CSV rows. You can do this by subclassing the FileBasedSource class to include CSV parsing. Particularly, the read_records function would look something like this:

    class MyCsvFileSource(
      def read_records(self, file_name, range_tracker):
        self._file = self.open_file(file_name)
        reader = csv.reader(self._file)
        for rec in reader:
          yield rec