Search code examples
google-cloud-dataflowapache-beam

Dynamic side input from Bigquery in python


I am coding a beam pipeline which read pubsub messages with an attribute called 'uid' which is a unique id of current message. Then I would like to use this 'uid' to query bigquery to get additional information to enrich the message.

It may not be appropriate to use the whole table as side input since the bigquery table is very large, the ideal approach should be something like:

mainInput=(p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
sideInput=(p|beam.io.ReadFromBigquery(query= 'select * from tablex where uid="id_from_message"'))
def fn(left,right):
   for x in right:
      yield (left,x)
process = (mainInput|map(fn,right=beam.pvalue.asDict(sideInput))

However, I don't know how to get the 'id_from_message' because it resides in pipeline mainInput, can you please help to advice on how to do that in python?

My current code is as below

class enrichByBQClient(beam.DoFn):

    def process(self, element, *args, **kwargs):
        try:
            print("Enrich from BQ Start")
            attribute = element[1]
            query = 'select uid,status from `XXXXXX` where uid="{}" limit 1' \
               .format(attribute.get('uid'))
            client=bigquery.Client()
            query_job = client.query(query)
            result=query_job.result()

            len_result = 0
            for row in result:
                status=row.status
                len_result+=1

            if len_result == 0:
                status=OUTPUT_TAG_NEW
            elif status != 'complete':
                status = OUTPUT_TAG_INCOMPLETE
            else:
                status = OUTPUT_TAG_COMPLETE

            yield (element[0],element[1],status)

      message=(
           p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))

......
message=(p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))

enrichData,failure_enrich=(message 
|'enrich by bigquery client' >> beam.ParDo(enrichByBQClient())
)

I add the query bigquery logic in a Pardo Function which might not be the best practice, am I right?


Solution

  • You are right, it's not a good practice to execute a query to BigQuery in a ParDo. It will execute to many queries and it's not performant with Beam and not cost effective in the BigQuery side.

    You can think about another approach and design to solve your issue.

    You have a streaming pipeline and need having a state. If it's possible for you, it could be interesting to use a In Memory storage beside your BigQuery table.

    In Google Cloud, there a MemoryStore cache with Redis.

    With this approach, you can use a Python Redis client in a ParDo and find the element from the cache.

    This solution is performant in response time and cost effective with MemoryStore.

    However, there is a downside with this solution, you have to manage your cache and synchronize it with your BigQuery table.

    You also have to plan a way to rebuild your cache if you loose it for any reason.

    I hope this answer will help.