Search code examples
pandasdataframeapache-sparkpysparkapache-spark-sql

How do I transform the dataset for the problem posted?


I'd like to ask some help with the problem I am facing right now. Given the dataset:

df = spark.createDataFrame([
        ('2024-01-01', 1, 23),
        ('2024-01-02', 1, 43),
        ('2024-01-03', 1, -1),
        ('2024-01-08', 2, 266),
        ('2024-01-09', 2, -1),
        ('2024-01-10', 2, 13),
        ('2024-01-11', 2, 10),
        ('2024-01-04', 3, 66),
        ('2024-01-05', 3, -1),
        ('2024-01-06', 3, 13),
        ('2024-01-07', 3, 11),
    ],
    ["dates", "id", "mount"]
)

I would like to know which transformation should I apply to get per id the number of consecutive days you are above 0.

For instance id 1 is 2 days above 0 (first two days). Then id 2 is only 1 day above 0 and then 2 days (last 2 ones). Finally third user is 1 day and the last 2 ones above 0. The final dataframe should look something like this:

some_id, id, days
         1,  2
         2,  1
         2,  2
         3,  1
         3,  2

I added a third col names as some_id because I am pretty sure we need somehow identify some cases.

Really appreciate any help.


Solution

  • A good approach would be the following

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("Consecutive Days") \
        .getOrCreate()
    
    df = spark.createDataFrame([
        ('2024-01-01', 1, 23),
        ('2024-01-02', 1, 43),
        ('2024-01-03', 1, -1),
        ('2024-01-08', 2, 266),
        ('2024-01-09', 2, -1),
        ('2024-01-10', 2, 13),
        ('2024-01-11', 2, 10),
        ('2024-01-04', 3, 66),
        ('2024-01-05', 3, -1),
        ('2024-01-06', 3, 13),
        ('2024-01-07', 3, 11),
    ], ["dates", "id", "mount"])
    
    df = df.withColumn("dates", F.to_date("dates"))
    
    windowSpec = Window.partitionBy("id").orderBy("dates")
    
    df = df.withColumn("isPositive", F.when(F.col("mount") > 0, 1).otherwise(0))
    df = df.withColumn("prevValue", F.lag("isPositive").over(windowSpec))
    df = df.withColumn("change", F.when(F.col("isPositive") != F.col("prevValue"), 1).otherwise(0))
    
    df = df.withColumn("group", F.sum("change").over(windowSpec))
    
    df_positive = df.filter(F.col("mount") > 0)
    
    df_result = df_positive.groupBy("id", "group").agg(F.count("dates").alias("days"))
    
    windowSpec2 = Window.partitionBy("id").orderBy("days")
    df_final = df_result.withColumn("some_id", F.row_number().over(windowSpec2)).select("id", "days","some_id")
    
    df_final.show()
    

    Which gives you what you wanted

       id  days  some_id
    0   1     2        1
    1   2     1        1
    2   2     2        2
    3   3     1        1
    4   3     2        2
    

    Since you posted this as a pandas problem as well, here is how you would do this with pandas:

    import pandas as pd
    from itertools import groupby
    from operator import itemgetter
    
    data = [
        ('2024-01-01', 1, 23),
        ('2024-01-02', 1, 43),
        ('2024-01-03', 1, -1),
        ('2024-01-08', 2, 266),
        ('2024-01-09', 2, -1),
        ('2024-01-10', 2, 13),
        ('2024-01-11', 2, 10),
        ('2024-01-04', 3, 66),
        ('2024-01-05', 3, -1),
        ('2024-01-06', 3, 13),
        ('2024-01-07', 3, 11),
    ]
    
    df = pd.DataFrame(data, columns=["dates", "id", "mount"])
    df['dates'] = pd.to_datetime(df['dates'])
    
    df.sort_values(by=['id', 'dates'], inplace=True)
    
    df['positive'] = df['mount'] > 0
    df['group'] = (df['id'] != df['id'].shift(1)) | (df['positive'] != df['positive'].shift(1))
    df['group'] = df['group'].cumsum()
    
    df_final = df[df['positive']].groupby(['id', 'group']).size().reset_index(name='days')
    df_final.drop(columns=['group'], inplace=True)
    df_final['some_id'] = df_final.groupby('id').cumcount() + 1
    
    print(df_final)
    
    

    Which is

       id  days  some_id
    0   1     2        1
    1   2     1        1
    2   2     2        2
    3   3     1        1
    4   3     2        2