Search code examples
pythongoogle-bigqueryapache-beam

Apache beam python looking up value in other collection


I am trying to lookup a value in a seperate big query table, i need to lookup the link_url in table one using the url in table 2. From table 1 i need just the link_id

table 1

table 2

desired output:

link_id link_url app
id1 https://link1.com App1
id2 https://link2.com App2
id3 https://link3.com App3
null https://link4.com App4

What i have

import logging
import apache_beam as beam

def use_side_input(main, side_input):

   return side_input[main]

def run():
    with beam.Pipeline() as p:
        links = p | 'ReadLinks' >> beam.io.ReadFromBigQuery(table="table1", gcs_location="gs://bucket/tmp/")

        linkviews = p | 'LinkViews' >> beam.io.ReadFromBigQuery(table="table2", gcs_location="gs://bucket/tmp/")

        out | "use side input" >> beam.Map(use_side_input, side_input=beam.pvalue.AsDict(links))
       

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

I know i should be using side inputs but i have no clue how to continue


Solution

  • I will leave the solution here for anyone else

    
    def add_link(main, side_input):
        url = main["url"]
    
        result = side_input.get(url)
    
        main["link_id"] = None
        if result:
            main["link_id"] = result[0]
    
        return main
    
    
    def link_key(event):
        url = event["link_url"]
    
        return url, [event["link_id"]]
    
    links_keys = links | 'AddLinkKey' >> beam.Map(
                link_key)
    
    #
    
    final = (out | "AddLink" >> beam.Map(add_link, side_input=beam.pvalue.AsDict(link_keys)))