Search code examples
pythongoogle-cloud-dataflowapache-beam

Apache Beam Find Top N elements Python SDK


My requirement is to read from a BQ table, do some processing, select the Top N rows on the "score" column and write it to the BQ Table and also publish the rows as a PubSub message.

I made a sample below to create a PCollection and select top 5 rows from this PCollection based on the value of "score".

import apache_beam as beam

with beam.Pipeline() as p:
    elements = (
      p
        | beam.Create([{"name": "Bob", "score": 5}, 
                       {"name":"Adam", "score": 5},
                       {"name":"John", "score": 2},
                       {"name":"Jose", "score": 1},
                       {"name":"Tim", "score": 1},
                       {"name":"Bryan", "score": 1},
                       {"name":"Kim", "score":1}])
        | "Filter Top 5 scores" >> beam.combiners.Top.Of(5, key=lambda t: t['score']) 
        | "Print" >> beam.Map(print)
        )
   # Write to BQ
   # Publish to PubSub

This returns a list instead of a PCollection. Hence I'm unable to write it back to BQ table nor publish to PubSub in this format.

What is the best way to select the top N elements but keep them as a PCollection?

In my real use case I might have around 2 million rows and I need to select 800k records from it based on a column. Also in one of the Apache Beam summit videos I remember hearing that the Top function will keep the results in one worker node instead of keeping it across. So thats why i assume it is a list. What would be the maximum number of the rows before which it could break?


Solution

  • To solve your issue, you can add a FlatMap transformation after the Top operator :

    def test_pipeline(self):
        with TestPipeline() as p:
            elements = (
                    p
                    | beam.Create([{"name": "Bob", "score": 5},
                                   {"name": "Adam", "score": 5},
                                   {"name": "John", "score": 2},
                                   {"name": "Jose", "score": 1},
                                   {"name": "Tim", "score": 1},
                                   {"name": "Bryan", "score": 1},
                                   {"name": "Kim", "score": 1}])
                    | "Filter Top 5 scores" >> beam.combiners.Top.Of(5, key=lambda t: t['score'])
                    | "Flatten" >> beam.FlatMap(lambda e: e)
                    | "Print" >> beam.Map(print)
            )
    

    In this case the result is a PCollection of Dict instead of a PCollection of List[Dict].

    You have to be carreful with high volume because the beam.combiners.Top.Of operation is done only in one worker.

    You can also optimize and make your Dataflow job more powerful with Dataflow Prime if needed (Right Fitting, Vertical Autoscaling inside a Worker...).