Search code examples
pythongoogle-cloud-dataflowapache-beamgoogle-cloud-pubsubdataflow

Process file from a Pub/Sub message in Dataflow streaming


I want to deploy a streaming Dataflow job which is listening into a Pub/Sub topic.

The Pub/Sub message content looks like this:

{
   "file_path": "gs://my_bucket_name/my_file.csv",
   "transformations": [
      {
         "column_name": "NAME",
         "transformation": "to_upper"
      },
      {
         "column_name": "SURNAME",
         "transformation": "to_lower"
      }
   ]
}

My problem is that I would like to process the file specified by the message (file_path) and apply the given transformations for each column in the CSV file.

I have tried several ways to achieve this, but none of them worked and I am wondering if this is not possible at all or I am missing something.

  1. First try:
class ProcessMessage(beam.DoFn):

    def process(self, message):
        from apache_beam.pvalue import TaggedOutput
        try:
            file_path = message.get('file_path')
            yield TaggedOutput('file_path', file_path)
        except Exception as e:
            raise Exception(e)

with beam.Pipeline(options=pipeline_options) as p:
    file_path = (
        p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic=input_topic,timestamp_attribute='ts')
          | "Parse JSON" >> beam.Map(json.loads)
          | "Process Message" >> beam.ParDo(ProcessMessage).with_outputs('file_path')
    )
    file_content = (
        p
        | "Read file" >> beam.io.ReadFromText(file_path)
    )

This fails with: file_pattern must be of type string or ValueProvider; got <DoOutputsTuple main_tag=None tags=('file_path',) transform=<ParDo(PTransform) label=[ParDo(ProcessMessage)]> at 0x1441f9550> instead

  1. Second Try -> Read file with custom csv reader and then return the content:
class ReadFile(beam.DoFn):

    def process(self, element):
        import csv
        import io as io_file

        from apache_beam import io

        file_path = element.get('file_path')

        reader = csv.DictReader(io_file.TextIOWrapper(
            io.filesystems.FileSystems.open(file_path),
            encoding='utf-8'),
            delimiter=';')

        for row in reader:
            yield row

with beam.Pipeline(options=pipeline_options) as p:

    message = (
        p | "Read from Pubsub" >> beam.io.ReadFromPubSub(
            topic=pipeline_config.get('input_topic'),
            timestamp_attribute='ts')
        | "Parse JSON" >> beam.Map(json.loads)
        | "Process message" >> beam.ParDo(ProcessMessage())
    )

    file_content = (
        message
        | beam.ParDo(ReadFile())
        | beam.Map(print)
    )

This does not produce any error and neither prints the file lines.

I know this post is a bit on the long side, but I hope someone may help me,

Thanks!


Solution

  • First solution does not work because ReadFromText takes as argument string, for example bucket path "gs://bucket/file". In your example you insert into this class PCollection (result of previous PTransform) - so it will not work. Instead, you should use ReadAllFromText that takes as input PCollection, so it is the result of previous PTransform.

    Also your code would need to be modified a bit:

    If DoFn class returns only one type of output, there is no reason to use TaggedOutput so let's return just regular iterator.

    class ProcessMessage(beam.DoFn):
    
        def process(self, message):
            try:
                file_path = message.get('file_path')
                yield file_path 
            except Exception as e:
                raise Exception(e)
    

    Next, ReadAllFromText should be connected to previous step of the pipeline, not to p.

    file_content = (
                p 
                | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic=p.options.topic, timestamp_attribute='ts')
                | "Parse JSON" >> beam.Map(json.loads)
                | "Process Message" >> beam.ParDo(ProcessMessage())
                | "Read file" >> beam.io.ReadAllFromText()   
            )
    

    Be aware that file_content variable will be Pcollection of elements, where each element will be single row of your CSV file in form of string. Because of that it will be more complex to easily apply transformations per each column, because in first element will be columns names, next will be just single row without columns names applied.

    Your second try seems to be better for this:

    class ApplyTransforms(beam.DoFn):
    
        def process(self, element):
    
            file_path = element.get('file_path')
            transformations = element.get('transformations')
    
            with beam.io.gcsio.GcsIO().open(file_path) as file:
                reader = csv.DictReader(io.TextIOWrapper(file, encoding="utf-8"), delimiter=';')
                for row in reader:
                    for transform in transformations:
                        col_name = transform.get("column_name")
                        transformation = transform.get("transformation")
                        # apply your transform per row 
                    yield row
    
    

    Something like this could work, but probably better idea will be to split it into two classes - one for reading, another for applying transformations :)