Search code examples
pythonapache-sparkpysparkspark-streaming

stream difference between column A and B aggregated by column C and D


How can I stream into a table the following:

difference between Column A and B aggregated by column C and D.

+-------------+-------------------+--+-
| Column_A|Column_B |Column_C|Column_D|
+-------------+-------------------+--+-
|52       |67       |boy     |car     |
|44       |25       |girl    |bike    |
|98       |85       |boy     |car     |
|52       |41       |girl    |car     |
+-------------+-------------------+--+-

this is my try, but it is not working :

difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C")
differenceStream = difference.writeStream\
  .queryName("diff_aggr")\
  .format("memory").outputMode("append")\
  .start()

I am getting this error: 'GroupedData' object has no attribute 'writeStream'


Solution

  • Depending how do you want to aggregate grouped data - you can do e.g.

    Prerequisites (in case if you didn't set them already):

    from pyspark.sql import functions as F 
    from pyspark.sql.functions import *
    

    For sum:

    difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))
    

    For max:

    difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))
    

    And then:

    differenceStream = difference.writeStream\
      .queryName("diff_aggr")\
      .format("memory").outputMode("append")\
      .start()
    

    The point is - if you do groupBy you need to also reduce, by aggregating. If you wanted to sort your values together instead try df.sort(...)