My requirement is to read from a BQ table, do some processing, select the Top N rows on the "score" column and write it to the BQ Table and also publish the rows as a PubSub message.
I made a sample below to create a PCollection and select top 5 rows from this PCollection based on the value of "score".
import apache_beam as beam
with beam.Pipeline() as p:
elements = (
p
| beam.Create([{"name": "Bob", "score": 5},
{"name":"Adam", "score": 5},
{"name":"John", "score": 2},
{"name":"Jose", "score": 1},
{"name":"Tim", "score": 1},
{"name":"Bryan", "score": 1},
{"name":"Kim", "score":1}])
| "Filter Top 5 scores" >> beam.combiners.Top.Of(5, key=lambda t: t['score'])
| "Print" >> beam.Map(print)
)
# Write to BQ
# Publish to PubSub
This returns a list instead of a PCollection. Hence I'm unable to write it back to BQ table nor publish to PubSub in this format.
What is the best way to select the top N elements but keep them as a PCollection?
In my real use case I might have around 2 million rows and I need to select 800k records from it based on a column. Also in one of the Apache Beam summit videos I remember hearing that the Top function will keep the results in one worker node instead of keeping it across. So thats why i assume it is a list. What would be the maximum number of the rows before which it could break?
To solve your issue, you can add a FlatMap
transformation after the Top
operator :
def test_pipeline(self):
with TestPipeline() as p:
elements = (
p
| beam.Create([{"name": "Bob", "score": 5},
{"name": "Adam", "score": 5},
{"name": "John", "score": 2},
{"name": "Jose", "score": 1},
{"name": "Tim", "score": 1},
{"name": "Bryan", "score": 1},
{"name": "Kim", "score": 1}])
| "Filter Top 5 scores" >> beam.combiners.Top.Of(5, key=lambda t: t['score'])
| "Flatten" >> beam.FlatMap(lambda e: e)
| "Print" >> beam.Map(print)
)
In this case the result is a PCollection
of Dict
instead of a PCollection
of List[Dict]
.
You have to be carreful with high volume because the beam.combiners.Top.Of
operation is done only in one worker.
You can also optimize and make your Dataflow
job more powerful with Dataflow Prime if needed (Right Fitting, Vertical Autoscaling inside a Worker...).