Search code examples
aggregation-frameworkazure-cosmosdbdatabricksazure-cosmosdb-mongoapi

Ingest Cosmos Mongo DB data using Databricks by applying filters


I would need to add a filter condition while ingesting data from a Cosmos Mongo DB using Databricks,

I am using the below query to ingest data of a Cosmos Collection:

df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.load()

How can I add a filter here to pick only selected data? Eg: I only want to ingest data where

{"type" : "student"}

I would really appreciate if anyone can help in this

I gave a try with the below query but getting error as below:

query = {"type" : "student"}
df = spark.read \
    .format('com.mongodb.spark.sql.DefaultSource') \
    .option('uri', sourceCosmosConnectionString) \
    .option('database', sourceCosmosDocument) \
    .option('collection', sourceCosmosCollection) \
    .option('pipeline', json.dumps(query)) \
    .load()

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 4 times, most recent failure: Lost task 0.3 in stage 16.0 (TID 34) (10.139.64.5 executor 0): com.mongodb.MongoCommandException: Command failed with error 40324 (40324): 'Unrecognized pipeline stage name: type' on server xxxxxxx-xxxxx.mongo.cosmos.azure.com:10255. The full response is {"ok": 0.0, "errmsg": "Unrecognized pipeline stage name: type", "code": 40324, "codeName": "40324"}


Solution

  • The error message means the stage name in your aggregation pipeline request wasn't recognized. The solution will be to ensure that all aggregation pipeline names are valid in your request.

    This article describes common errors and solutions for deployments using the Azure Cosmos DB for MongoDB.

    Give this a try, and please let me know if that works:-

    query = {'$match': { 'type':'student' }}
    

    df = spark.read \
     
    .format('com.mongodb.spark.sql.DefaultSource') \
     
    .option('uri', sourceCosmosConnectionString) \
     
    .option('database', sourceCosmosDocument) \
     
    .option('collection', sourceCosmosCollection) \
     
    .option('pipeline', query) \
     
    .load()