Search code examples
apache-sparkpysparkazure-cosmosdbspark-streamingazure-eventhub

Unable to write streaming multiple records in Cosmos db


I am able to write the data in Cosmos DB being sent to Eventhub by the TelcoGenerator app. But it's getting one record(after refresh) in the cosmos at one time. Here is the code snippet

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()


def write2Cosmos (df2, batchid):
    df2.withColumn("batchId",current_timestamp())\
        .write.format("cosmos.oltp").option("spark.synapse.linkedService", "<LinkedServiceName>")\
        .option("spark.cosmos.container", "<DatabaseName>")\
        .mode('append')\
        .save()


df = df.withColumn("body", df["body"].cast("string"))

from pyspark.sql.functions import json_tuple
df=df.select(json_tuple(col("body"),"RecordType","SwitchNum")) \
    .toDF("id","SwitchNum")

streamQuery = df\
    .writeStream\
    .foreachBatch(write2Cosmos) \
    .option("checkpointLocation", "/mnt/temp")\
    .outputMode("append")\
    .start()
streamQuery.awaitTermination()

Any pointers to load the whole data(Till the streaming runs)?

Screengrab for input data enter image description here

Screengrab for Cosmos data with only one record enter image description here


Solution

  • Try changing the partition column which has multiple values. Try changing to the SwitchNum column as below:

      from pyspark.sql.functions import json_tuple
      df=df.select(json_tuple(col("body"),"RecordType","SwitchNum")) \
        .toDF("RecordType","id")