Search code examples
mongodbpysparkapache-kafka-streamsspark-structured-streaming

Issue with Writing Aggregated Data to MongoDB from PySpark Structured Streaming


I'm encountering an issue while trying to write aggregated data to MongoDB from a PySpark Structured Streaming job. Here's my setup:

I have a Kafka topic from which I'm consuming JSON messages. I'm using PySpark to process these messages in a structured streaming job. I've defined a watermark and a window on the timestamp field to aggregate the data based on a window duration of 1 minute. My aggregation logic calculates the approximate count of distinct clients per gameId within each window. I'm attempting to write this aggregated data to MongoDB. Here's the relevant portion of my code:


schema = StructType([
    StructField("gameType", StringType(), True),
    StructField("clientId", StringType(), True),
    StructField("gameId", StringType(), True),
    StructField("timestamp", TimestampType(), True),
])

kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test_topic").load()

json_df = kafka_df.selectExpr("CAST(value AS STRING)").select(from_json(col("value"), schema).alias("data")).select("data.*")

json_df_with_watermark = json_df.withWatermark("timestamp", "1 minute")

window_duration = "1 minute"

clients_per_gameId = json_df_with_watermark.groupBy(window("timestamp", window_duration), "gameId").agg(
    approx_count_distinct("clientId").alias("client_count")
)

def write_to_mongo(df, epoch_id):
    df = df.drop("window")
    df.write \
        .format("mongodb") \
        .mode("append") \
        .option("spark.mongodb.connection.uri", mongodb_uri) \
        .option("spark.mongodb.database", "pyspark_test") \
        .option("spark.mongodb.collection", "clients_per_gameId") \
        .save()

query = clients_per_gameId.writeStream \
    .foreachBatch(write_to_mongo) \
    .option("checkpointLocation", checkpoint_dir) \
    .start()

query.awaitTermination()


The issue I'm facing is that while the raw JSON data (json_df_with_watermark) is being successfully written to the MongoDB collection, the aggregated data (clients_per_gameId) is not. I've ensured that the aggregation logic is correct and that there are no errors reported during the execution of the streaming job.

Any insights or suggestions on why the aggregated data isn't being written to MongoDB would be greatly appreciated.

Thank you!


Solution

  • I resolved the issue by explicitly setting the output mode to "update" or "append"in the writeStream query like this and this solved the issue :

    query = clients_per_gameId.writeStream \
    .foreachBatch(write_to_mongo) \
    .option("checkpointLocation", checkpoint_dir) \
    .outputMode("update") \
    .start()