Search code examples
azurepysparkapache-spark-sqlazure-cosmosdbazure-databricks

Saving data to Cosmos DB from Azure Databricks is very slow


I am a beginner with Azure and Databricks and I'm having a difficulty saving a lot of data (ex. 5 GB) from Azure Databricks to Cosmos DB with Spark connector.

Preface: I am facing a problem because I want to populate Cosmos DB container with a lot of rows (ex. 500mil+) for some further testing. Currently I have one container (ex. Container #1) in which I am streaming telemetry data using Stream Analytics. What I want to achieve is taking data from container #1, multiply the data many times, and then save it to a new container #2. I wanted to solve this using Spark connector and Databricks, but the saving is very slow, ex. 1mil rows per 40min, which is not acceptable when I want to save gigabytes of rows to it. Reading from a container on the other hand is blazingly fast, ex. 10s for 1.3mil rows. I disabled indexing on container #2 as I read that this should provide some benefit but it doesn't really make a very noticable difference. The container #2 is set to 2000 RUs, container #1 to 1000 RUs, both throughputs set to autoscale.

Unfortunately I don't know what I am doing wrong, so any help would be appreciated, even recommendations for other possible ways of solving this (ex. Synapse Analytics, etc.).

P.S. I read somewhere that I should try first saving to ADLS 2 storage, then read dataframe from it and then to Cosmos DB. It did actually provide an improvement, but again still not near acceptable speeds.

I apologize in advance for my sloppy english, not my first language.

Code

Config

connectionConfigRead = {
  "spark.cosmos.accountEndpoint" : Endpoint,
  "spark.cosmos.accountKey" : Masterkey,
  "spark.cosmos.database" : Database,
  "spark.cosmos.container": Container,
  "spark.cosmos.read.inferSchema.enabled" : "false",
  "spark.cosmos.changeFeed.startFrom" : "Now"
}

connectionConfigWrite = {
  "spark.cosmos.accountEndpoint" : Endpoint,
  "spark.cosmos.accountKey" : Masterkey,
  "spark.cosmos.database" : Database,
  "spark.cosmos.container": Container2,
  "spark.cosmos.changeFeed.startFrom" : "Now"
}

Read from a container

customSchema = StructType([
          StructField("iothub-connection-module-id", StringType()),
          StructField("value_type", StringType()),
          StructField("timestamp", DoubleType()),
          StructField("data", StringType()),
          StructField("IoTHub", StringType()),
          StructField("id", StringType()),
          StructField("EventEnqueuedUtcTime", StringType()),
          StructField("sensorId", StringType()),
          StructField("EventProcessedUtcTime", StringType()),
          StructField("PartitionId", IntegerType()),
          StructField("value", DoubleType())
    ])

readDF = (spark.read.schema(customSchema).format("cosmos.oltp").options(**connectionConfigRead).load())

Writing to container

readDF.write.mode("append").format("cosmos.oltp").options(**connectionConfigWrite).save()

Solution

  • Container #2 does not have enough throughput. Assuming 10 RU/s for each insert, 2000 RU/s can do at most 200 inserts/second. That's 12k/minute or 720k/hour. Since you're only dealing with 5 GB of data and millions of rows, I would scale up to 10000 RU/s autoscale max throughput, that will do 1000/second or 3.6M/hour. Get more information on the Spark connector, Spark OLTP connector resources. Also, make sure Databricks and Cosmos DB are running in the same region. This can be a cause for increased latency.

    If you are going to query container #2 you need to index it. Find out which properties you use for your filter predicates, order by, etc. and create the necessary range and composite indexes where needed.