I am trying to lookup a value in a seperate big query table, i need to lookup the link_url in table one using the url in table 2. From table 1 i need just the link_id
table 1
link_id | link_url |
---|---|
id1 | https://link1.com |
id2 | https://link2.com |
id3 | https://link3.com |
table 2
url | app |
---|---|
https://link1.com | App1 |
https://link2.com | App2 |
https://link3.com | App3 |
https://link4.com | App4 |
desired output:
link_id | link_url | app |
---|---|---|
id1 | https://link1.com | App1 |
id2 | https://link2.com | App2 |
id3 | https://link3.com | App3 |
null | https://link4.com | App4 |
What i have
import logging
import apache_beam as beam
def use_side_input(main, side_input):
return side_input[main]
def run():
with beam.Pipeline() as p:
links = p | 'ReadLinks' >> beam.io.ReadFromBigQuery(table="table1", gcs_location="gs://bucket/tmp/")
linkviews = p | 'LinkViews' >> beam.io.ReadFromBigQuery(table="table2", gcs_location="gs://bucket/tmp/")
out | "use side input" >> beam.Map(use_side_input, side_input=beam.pvalue.AsDict(links))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
I know i should be using side inputs but i have no clue how to continue
I will leave the solution here for anyone else
def add_link(main, side_input):
url = main["url"]
result = side_input.get(url)
main["link_id"] = None
if result:
main["link_id"] = result[0]
return main
def link_key(event):
url = event["link_url"]
return url, [event["link_id"]]
links_keys = links | 'AddLinkKey' >> beam.Map(
link_key)
#
final = (out | "AddLink" >> beam.Map(add_link, side_input=beam.pvalue.AsDict(link_keys)))