Search code examples
mongodbazureazure-databricks

Streaming Data from Azure Databricks to MongoDB Atlas


I am attempting to create a streaming job to write data to MongoDB, but I am encountering an error indicating that MongoDB does not support streaming jobs.

My compute version is 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)

and my spark connector version is org.mongodb.spark:mongo-spark-connector_2.12:3.0.1

My code to write the data is as below

json_df.writeStream \
    .format("mongo") \
    .option("uri", mongo_uri) \
    .option("database", "test_db") \
    .option("collection", "test") \
    .outputMode("append") \
    .trigger(processingTime="1 minute") \
    .start()

the error that I get is

Data source mongo does not support streamed writing.

Is there a driver or library available for writing streaming data to MongoDB using Databricks?

Note: The standard read and write operations from Databricks to MongoDB are functioning correctly.


Solution

  • As you mention you have already used org.mongodb.spark:mongo-spark-connector_2.12:3.0.1

    As you mention need library to driver or library available for writing streaming data to MongoDB using Databricks?

    Know more about MongoDB - Azure Databricks MongoDB Connector for Spark Documentation.

    Version 10.x utilizes the new namespace com.mongodb.spark.sql.connector.MongoTableProvider. This change enables the use of older connector versions (3.x and earlier) concurrently with version 10.x.

    Using the connector, you can use all Spark libraries with MongoDB datasets, including:

    • Dataset: For SQL analysis with automatic schema inference.
    • Streaming: For real-time data processing.

    You can try installing the below:

    org.mongodb.spark:mongo-spark-connector_2.13:10.3.0
    

    enter image description here

    And for WriteStream:

    dsw = (
      slidingWindows
        .writeStream
        .format("mongodb")
        .queryName("7DaySlidingWindow")
        .option("checkpointLocation", "/tmp/pyspark/")
        .option("forceDeleteTempCheckpointLocation", "true")
        .option('spark.mongodb.connection.uri', 'MONGODB CONNECTION HERE')
        .option('spark.mongodb.database', 'Pricing')
        .option('spark.mongodb.collection', 'NaturalGas')
        .outputMode("complete"))
    query = dsw.start()
    query.processAllAvailable()
    query.stop()
    

    Reference: MongoDB Connector for Spark How to sink streaming data from spark to Mongodb?