I'd like to ask some help with the problem I am facing right now. Given the dataset:
df = spark.createDataFrame([
('2024-01-01', 1, 23),
('2024-01-02', 1, 43),
('2024-01-03', 1, -1),
('2024-01-08', 2, 266),
('2024-01-09', 2, -1),
('2024-01-10', 2, 13),
('2024-01-11', 2, 10),
('2024-01-04', 3, 66),
('2024-01-05', 3, -1),
('2024-01-06', 3, 13),
('2024-01-07', 3, 11),
],
["dates", "id", "mount"]
)
I would like to know which transformation should I apply to get per id the number of consecutive days you are above 0.
For instance id 1 is 2 days above 0 (first two days). Then id 2 is only 1 day above 0 and then 2 days (last 2 ones). Finally third user is 1 day and the last 2 ones above 0. The final dataframe should look something like this:
some_id, id, days
1, 2
2, 1
2, 2
3, 1
3, 2
I added a third col names as some_id because I am pretty sure we need somehow identify some cases.
Really appreciate any help.
A good approach would be the following
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Consecutive Days") \
.getOrCreate()
df = spark.createDataFrame([
('2024-01-01', 1, 23),
('2024-01-02', 1, 43),
('2024-01-03', 1, -1),
('2024-01-08', 2, 266),
('2024-01-09', 2, -1),
('2024-01-10', 2, 13),
('2024-01-11', 2, 10),
('2024-01-04', 3, 66),
('2024-01-05', 3, -1),
('2024-01-06', 3, 13),
('2024-01-07', 3, 11),
], ["dates", "id", "mount"])
df = df.withColumn("dates", F.to_date("dates"))
windowSpec = Window.partitionBy("id").orderBy("dates")
df = df.withColumn("isPositive", F.when(F.col("mount") > 0, 1).otherwise(0))
df = df.withColumn("prevValue", F.lag("isPositive").over(windowSpec))
df = df.withColumn("change", F.when(F.col("isPositive") != F.col("prevValue"), 1).otherwise(0))
df = df.withColumn("group", F.sum("change").over(windowSpec))
df_positive = df.filter(F.col("mount") > 0)
df_result = df_positive.groupBy("id", "group").agg(F.count("dates").alias("days"))
windowSpec2 = Window.partitionBy("id").orderBy("days")
df_final = df_result.withColumn("some_id", F.row_number().over(windowSpec2)).select("id", "days","some_id")
df_final.show()
Which gives you what you wanted
id days some_id
0 1 2 1
1 2 1 1
2 2 2 2
3 3 1 1
4 3 2 2
Since you posted this as a pandas problem as well, here is how you would do this with pandas:
import pandas as pd
from itertools import groupby
from operator import itemgetter
data = [
('2024-01-01', 1, 23),
('2024-01-02', 1, 43),
('2024-01-03', 1, -1),
('2024-01-08', 2, 266),
('2024-01-09', 2, -1),
('2024-01-10', 2, 13),
('2024-01-11', 2, 10),
('2024-01-04', 3, 66),
('2024-01-05', 3, -1),
('2024-01-06', 3, 13),
('2024-01-07', 3, 11),
]
df = pd.DataFrame(data, columns=["dates", "id", "mount"])
df['dates'] = pd.to_datetime(df['dates'])
df.sort_values(by=['id', 'dates'], inplace=True)
df['positive'] = df['mount'] > 0
df['group'] = (df['id'] != df['id'].shift(1)) | (df['positive'] != df['positive'].shift(1))
df['group'] = df['group'].cumsum()
df_final = df[df['positive']].groupby(['id', 'group']).size().reset_index(name='days')
df_final.drop(columns=['group'], inplace=True)
df_final['some_id'] = df_final.groupby('id').cumcount() + 1
print(df_final)
Which is
id days some_id
0 1 2 1
1 2 1 1
2 2 2 2
3 3 1 1
4 3 2 2