Search code examples
google-cloud-dataflowdataflowdataflowtaskapache-beam

How to use "Where" condition in Mongodb to bigquery dataflow template?


I have written python code for mongodb to bigquery data pipeline using apache beam (Dataflow Runner).

Mongodb has simple mysql like table with 2 columns (id and name) and No complex structure.My code is as below.

        #########################################
        import apache_beam as beam
        from apache_beam.options.pipeline_options import PipelineOptions
        from apache_beam.io.mongodbio import ReadFromMongoDB
        import json
        
        options = PipelineOptions()
           
        ################################
        def parse_json(line):
             new_line=str(line)
             record = new_line.split(',')

             key0, value0 = record[0].strip().split(":", 1)
             key1, value1 = record[1].strip().split(":", 1)

             json_data = {"_id":value0.replace('"','').replace('ObjectId(','').replace(')','').replace("'","").strip(),
                          "name":value1.replace('"','').replace("'","").strip()
                          }

             return json_data
        #################################
        
        p = beam.Pipeline(options=options)
        
        p | ReadFromMongoDB(uri='mongodb://mongo_ip:mongo_port',db="db
    _name",coll="collection_name") | beam.Map(parse_json) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bq_project_id.bq_dataset_id.bq_table_name',write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
        
        
        p.run()
       ###############################################

This code is working fine. It brings all documents from mongodb collection and insert into bigquery.

But i want to use where condition to process only few rows with specific id's.

How can i specify where condition in ReadFromMongoDB()?


Solution

  • You could use filter parameter in ReadFromMongoDB.

    https://beam.apache.org/releases/pydoc/2.14.0/apache_beam.io.mongodbio.html#apache_beam.io.mongodbio.ReadFromMongoDB