Suppose I have a PySpark DataFrame called "values" containing the values of a voltage over time (Time series). The DataFrame contains 1 billions of measure points (10^9 rows) and is originally stored in a Parquet file. The schema of the DataFrame is:
values
|-- time: float (nullable = true)
|-- voltage: float (nullable = true)
values: 1.000.000.000 values
[time, voltage]
[0.000000000, 4.1870174]
[0.001000141, 4.199591]
[0.002001438, 4.2184515]
[0.003001813, 4.237312]
[0.004002469, 4.256172]
.....
[459586.004002469, 459586.256172]
Then I have another PySpark DataFrame called "timeperiods" containing starts/ends of time periods with 1.000.000 of rows:
timeperiods
|-- start: float (nullable = true)
|-- end: float (nullable = true)
timeperiods: 1.000.000 values
[start, end]
[0.000000000, 1.1870174]
[2.001000141, 4.199591]
[5.002001438, 5.2184515]
[6.003001813, 6.237312]
.....
[459585.004002469, 459586.256172]
I would like, for each time period in the DataFrame "timeperiods", to compute the average for the voltage values in the DataFrame "values" between "start" and "end". How do would you solve this problem? In a traditional programming environment I would go through each time period, calculate the sum of voltage values between start and end, divide the sum by the number of values between start and end and finally save the result in a table. How can I do this with Spark?
As posted in the comments, you can solve this by:
start
in this case)F.mean
I've added some extra values in df1
to make sure some values exist in other buckets than the first one.
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[(0.000000000, 4.1870174),
(0.001000141, 4.199591),
(0.002001438, 4.2184515),
(0.003001813, 4.237312),
(0.004002469, 4.256172),
(5.1, 10.0),
(5.11, 14.0),
(6.1, 20.0),
(6.11, 0.0)],
["time", "voltage"]
)
df2 = spark.createDataFrame(
[(0.000000000, 1.1870174),
(2.001000141, 4.199591),
(5.002001438, 5.2184515),
(6.003001813, 6.237312)],
["start", "end"]
)
df2.join(df1, on=[df1.time >= df2.start, df1.time <= df2.end]) \
.groupBy("start") \
.agg(F.mean("voltage")) \
.show()
+-----------+------------------+
| start| avg(voltage)|
+-----------+------------------+
| 0.0|4.2197087799999995|
|5.002001438| 12.0|
|6.003001813| 10.0|
+-----------+------------------+