How can I stream into a table the following:
difference between Column A and B aggregated by column C and D.
+-------------+-------------------+--+-
| Column_A|Column_B |Column_C|Column_D|
+-------------+-------------------+--+-
|52 |67 |boy |car |
|44 |25 |girl |bike |
|98 |85 |boy |car |
|52 |41 |girl |car |
+-------------+-------------------+--+-
this is my try, but it is not working :
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C")
differenceStream = difference.writeStream\
.queryName("diff_aggr")\
.format("memory").outputMode("append")\
.start()
I am getting this error: 'GroupedData' object has no attribute 'writeStream'
Depending how do you want to aggregate grouped data - you can do e.g.
Prerequisites (in case if you didn't set them already):
from pyspark.sql import functions as F
from pyspark.sql.functions import *
For sum
:
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))
For max
:
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))
And then:
differenceStream = difference.writeStream\
.queryName("diff_aggr")\
.format("memory").outputMode("append")\
.start()
The point is - if you do groupBy
you need to also reduce, by aggregating. If you wanted to sort your values together instead try df.sort(...)