Search code examples
pythonpandaspysparkapache-spark-sqlpyspark-transformer

how to find max and min timestamp when a value goes below min threshold in pyspark?


I have a table like below-

time_is_seconds value
1 4.5
2 4
3 3
4 5
5 6
6 7
7 6
8 5
9 4.5
10 4.2
11 3
12 3.5

I want to find the min time and max time when the value goes below 5.

Expected output-

time_is_seconds value min_time max_time
1 4.5 1 3
2 4 1 3
3 3 1 3
4 5 Null Null
5 6 Null Null
6 7 Null Null
7 6 Null Null
8 5 Null Null
9 4.5 9 12
10 4.2 9 12
11 3 9 12
12 3.5 9 12

So far I have filtered out the value below 5 and find min and max which gave me the values as 1 and 12 respectively. I am wondering if there is any way to group them to find the expected results.

Codes used -

df1=df.filter(col('value')<5)
df1=(df1.withColumn('min_time',min(col('time_in_seconds'))
        .withColumn('max_time',max(col('time_in_seconds')))
df=df.join(df1,['time_in_seconds','value'],'left')

Solution

  • In Pandas you can do this:

    data = {
        "time_is_seconds": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
        "value": [4.5, 4, 3, 5, 6, 7, 6, 5, 4.5, 4.2, 3, 3.5],
    }
    
    df = pd.DataFrame(data)
    
    m = df["value"].lt(5)
    g = m.ne(m.shift(1)).cumsum()
    
    df.loc[m, "min_time"] = df.groupby(g)["time_is_seconds"].transform(min)
    df.loc[m, "max_time"] = df.groupby(g)["time_is_seconds"].transform(max)
    
        time_is_seconds  value  min_time  max_time
    0                 1    4.5       1.0       3.0
    1                 2    4.0       1.0       3.0
    2                 3    3.0       1.0       3.0
    3                 4    5.0       NaN       NaN
    4                 5    6.0       NaN       NaN
    5                 6    7.0       NaN       NaN
    6                 7    6.0       NaN       NaN
    7                 8    5.0       NaN       NaN
    8                 9    4.5       9.0      12.0
    9                10    4.2       9.0      12.0
    10               11    3.0       9.0      12.0
    11               12    3.5       9.0      12.0
    

    I don't have experience with Pyspark, but maybe this helps you if you follow the same logic.