How to define UDAF over event-time windows in PySpark 2.1.0

Problem definition

I'm writing a Python application which slides a window over a sequence of values each with a timestamp. I want to apply a function to values in the sliding window in order to calculate a score from N latest values as shown in the figure. We already implemented that function using a Python library to make use of GPUs.

I found that Apache Spark 2.0 ships with Structured Streaming and it supports window operations on event time. If you want to read a finite sequence of records from a .csv file and want to count the records in such a sliding window, you can use the following code in PySpark:

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd

spark = SparkSession \
    .builder \
    .master('local[*]') \

schema = StructType() \
    .add('ts', 'timestamp') \
    .add('value', 'double') \

sqlContext = SQLContext(spark)
lines = sqlContext \
    .readStream \
    .format('csv') \
    .schema(schema) \

windowedCount = lines.groupBy(
    window(lines.ts, '30 minutes', '10 minutes')

query = windowedCount \
   .writeStream \
    .outputMode('complete') \
    .format('console') \


However, I want to apply UDAFs other than predefined aggregation functions over sliding windows. According to, the available aggregate functions are only avg, max, min, sum, and count.

It is not supported yet? If so, when will it be supported in PySpark? shows that one can define UserDefinedAggregateFunction in Java or Scala and then invoke it in PySpark. It seems interesting but I want to apply my own Python function over values in sliding windows. I want a purely Pythonic way.

p.s. let me know any frameworks in Python other than PySpark that can solve this sort of problems (applying UDAFs on a window sliding over stream).


  • In Spark <2.3, you cannot do this.

    For Spark >= 2.3, this is possible for Grouped data, but not yet for Windows using "PySpark UDAFs with Pandas".

    Currently, PySpark cannot run UserDefined functions on Windows.

    Here is a well described SO question on this: Applying UDFs on GroupedData in PySpark (with functioning python example)

    Here is the JIRA ticket that added this feature -