I am coding a beam pipeline which read pubsub messages with an attribute called 'uid' which is a unique id of current message. Then I would like to use this 'uid' to query bigquery to get additional information to enrich the message.
It may not be appropriate to use the whole table as side input since the bigquery table is very large, the ideal approach should be something like:
mainInput=(p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
sideInput=(p|beam.io.ReadFromBigquery(query= 'select * from tablex where uid="id_from_message"'))
def fn(left,right):
for x in right:
yield (left,x)
process = (mainInput|map(fn,right=beam.pvalue.asDict(sideInput))
However, I don't know how to get the 'id_from_message' because it resides in pipeline mainInput, can you please help to advice on how to do that in python?
My current code is as below
class enrichByBQClient(beam.DoFn):
def process(self, element, *args, **kwargs):
try:
print("Enrich from BQ Start")
attribute = element[1]
query = 'select uid,status from `XXXXXX` where uid="{}" limit 1' \
.format(attribute.get('uid'))
client=bigquery.Client()
query_job = client.query(query)
result=query_job.result()
len_result = 0
for row in result:
status=row.status
len_result+=1
if len_result == 0:
status=OUTPUT_TAG_NEW
elif status != 'complete':
status = OUTPUT_TAG_INCOMPLETE
else:
status = OUTPUT_TAG_COMPLETE
yield (element[0],element[1],status)
message=(
p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
......
message=(p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
enrichData,failure_enrich=(message
|'enrich by bigquery client' >> beam.ParDo(enrichByBQClient())
)
I add the query bigquery logic in a Pardo Function which might not be the best practice, am I right?
You are right, it's not a good practice to execute a query to BigQuery
in a ParDo
. It will execute to many queries and it's not performant with Beam
and not cost effective in the BigQuery
side.
You can think about another approach and design to solve your issue.
You have a streaming pipeline and need having a state.
If it's possible for you, it could be interesting to use a In Memory
storage beside your BigQuery
table.
In Google Cloud
, there a MemoryStore cache with Redis
.
With this approach, you can use a Python Redis client in a ParDo
and find the element from the cache.
This solution is performant in response time and cost effective with MemoryStore
.
However, there is a downside with this solution, you have to manage your cache and synchronize it with your BigQuery
table.
You also have to plan a way to rebuild your cache if you loose it for any reason.
I hope this answer will help.