Search code examples
scalaapache-sparkpysparkpivotstreaming

Pivot a streaming dataframe pyspark


I have a streaming dataframe from kafka and I need to pivot two columns. This is the code I'm currently using:

streaming_df = streaming_df.groupBy('Id','Date')\
            .pivot('Var')\
            .agg(first('Val'))

query = streaming_df.limit(5) \
            .writeStream \
            .outputMode("append") \
            .format("memory") \
            .queryName("stream") \
            .start()

time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()

`

I recieve the following error: pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

pyspark version: 3.1.1

any ideas how to implement pivot with a streaming dataframe ?


Solution

  • The pivot transformation is not supported by Spark when applying to streaming data.

    What you can do is to use the foreachBatch with a user defined function like this:

    def apply_pivot(stream_df, batch_id):
        # Here your pivot transformation
        stream_df \
            .groupBy('Id','Date') \
            .pivot('Var') \
            .agg(first('Val')) \
            .write \
            .format('memory') \
            .outputMode('append') \
            .queryName("stream")
    
    query = streaming_df.limit(5) \
        .writeStream \
        .foreachBatch(apply_pivot) \
        .start()
    
    time.sleep(50)
    spark.sql("select * from stream").show(20, False)
    query.stop()
    

    Let me know if it helped you!