Search code examples
dataframeapache-sparkpysparkaverage

Efficient way to compute several thousands of averages from time segments of one single TimeSeries DataFrame


Suppose I have a PySpark DataFrame called "values" containing the values of a voltage over time (Time series). The DataFrame contains 1 billions of measure points (10^9 rows) and is originally stored in a Parquet file. The schema of the DataFrame is:

values
|-- time: float (nullable = true)
|-- voltage: float (nullable = true)

values: 1.000.000.000 values
[time, voltage]
[0.000000000, 4.1870174]
[0.001000141, 4.199591]
[0.002001438, 4.2184515]
[0.003001813, 4.237312]
[0.004002469, 4.256172]
.....
[459586.004002469, 459586.256172]

Then I have another PySpark DataFrame called "timeperiods" containing starts/ends of time periods with 1.000.000 of rows:

timeperiods
|-- start: float (nullable = true)
|-- end: float (nullable = true)

timeperiods: 1.000.000 values
[start, end]
[0.000000000, 1.1870174]
[2.001000141, 4.199591]
[5.002001438, 5.2184515]
[6.003001813, 6.237312]
.....
[459585.004002469, 459586.256172]

I would like, for each time period in the DataFrame "timeperiods", to compute the average for the voltage values in the DataFrame "values" between "start" and "end". How do would you solve this problem? In a traditional programming environment I would go through each time period, calculate the sum of voltage values between start and end, divide the sum by the number of values between start and end and finally save the result in a table. How can I do this with Spark?


Solution

  • As posted in the comments, you can solve this by:

    • using a range join on your timestamps
    • grouping by the on of the times that you're using to "bucket" your calculations in (I'm using start in this case)
    • aggregating the the result of the groupBy with F.mean

    I've added some extra values in df1 to make sure some values exist in other buckets than the first one.

    from pyspark.sql import functions as F
    
    df1 = spark.createDataFrame(
        [(0.000000000, 4.1870174),
         (0.001000141, 4.199591),
         (0.002001438, 4.2184515),
         (0.003001813, 4.237312),
         (0.004002469, 4.256172),
         (5.1, 10.0),
         (5.11, 14.0),
         (6.1, 20.0),
         (6.11, 0.0)],
        ["time", "voltage"]
    )
    
    df2 = spark.createDataFrame(
        [(0.000000000, 1.1870174),
        (2.001000141, 4.199591),
        (5.002001438, 5.2184515),
        (6.003001813, 6.237312)],
        ["start", "end"]
    )
    
    df2.join(df1, on=[df1.time >= df2.start, df1.time <= df2.end]) \
       .groupBy("start") \
       .agg(F.mean("voltage")) \
       .show()
    
    +-----------+------------------+                                                
    |      start|      avg(voltage)|
    +-----------+------------------+
    |        0.0|4.2197087799999995|
    |5.002001438|              12.0|
    |6.003001813|              10.0|
    +-----------+------------------+