I am able to write the data in Cosmos DB being sent to Eventhub by the TelcoGenerator app. But it's getting one record(after refresh) in the cosmos at one time. Here is the code snippet
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
def write2Cosmos (df2, batchid):
df2.withColumn("batchId",current_timestamp())\
.write.format("cosmos.oltp").option("spark.synapse.linkedService", "<LinkedServiceName>")\
.option("spark.cosmos.container", "<DatabaseName>")\
.mode('append')\
.save()
df = df.withColumn("body", df["body"].cast("string"))
from pyspark.sql.functions import json_tuple
df=df.select(json_tuple(col("body"),"RecordType","SwitchNum")) \
.toDF("id","SwitchNum")
streamQuery = df\
.writeStream\
.foreachBatch(write2Cosmos) \
.option("checkpointLocation", "/mnt/temp")\
.outputMode("append")\
.start()
streamQuery.awaitTermination()
Any pointers to load the whole data(Till the streaming runs)?
Try changing the partition column which has multiple values. Try changing to the SwitchNum
column as below:
from pyspark.sql.functions import json_tuple
df=df.select(json_tuple(col("body"),"RecordType","SwitchNum")) \
.toDF("RecordType","id")