I have written python code for mongodb to bigquery data pipeline using apache beam (Dataflow Runner).
Mongodb has simple mysql like table with 2 columns (id and name) and No complex structure.My code is as below.
#########################################
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.mongodbio import ReadFromMongoDB
import json
options = PipelineOptions()
################################
def parse_json(line):
new_line=str(line)
record = new_line.split(',')
key0, value0 = record[0].strip().split(":", 1)
key1, value1 = record[1].strip().split(":", 1)
json_data = {"_id":value0.replace('"','').replace('ObjectId(','').replace(')','').replace("'","").strip(),
"name":value1.replace('"','').replace("'","").strip()
}
return json_data
#################################
p = beam.Pipeline(options=options)
p | ReadFromMongoDB(uri='mongodb://mongo_ip:mongo_port',db="db
_name",coll="collection_name") | beam.Map(parse_json) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bq_project_id.bq_dataset_id.bq_table_name',write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
p.run()
###############################################
This code is working fine. It brings all documents from mongodb collection and insert into bigquery.
But i want to use where condition to process only few rows with specific id's.
How can i specify where condition in ReadFromMongoDB()?
You could use filter parameter in ReadFromMongoDB.