Search code examples
apache-sparkpysparkapache-spark-sqlspark-structured-streaming

Why an exception while writing an aggregated dataframe to a file sink?


I'm performing an aggregation on a streaming dataframe and trying to write the result to an output directory. But I'm getting an exception saying

pyspark.sql.utils.AnalysisException: 'Data source json does not support Update output mode;

I'm getting similar error with "complete" output mode.

This is my code:

grouped_df = logs_df.groupBy('host', 'timestamp').agg(count('host').alias('total_count'))
 
result_host = grouped_df.filter(col('total_count') > threshold)
 
writer_query = result_host.writeStream \
    .format("json") \
    .queryName("JSON Writer") \
    .outputMode("update") \
    .option("path", "output") \
    .option("checkpointLocation", "chk-point-dir") \
    .trigger(processingTime="1 minute") \
    .start()

writer_query.awaitTermination()

Solution

  • FileSinks only support "append" mode according to documentation on OutputSinks, see "supported output modes" in below table.

    enter image description here