Search code examples
dataframepyspark

Max value respect to year occurred pyspark


Code:

# Rename the columns after pivoting
new_column_names = ["Year"] + [col_name.replace(" ", "_") for col_name in pivot_table.columns[1:]]
pivot_table = pivot_table.toDF(*new_column_names)

# Task 4: Calculate percentage change
percentage_cols = pivot_table.columns[1:]  # Exclude the "Year" column
window_spec = Window.orderBy("Year")
print(pivot_table.columns)
pivot_table  = pivot_table.drop('Other_purchases_and_operating_expenses')

# Calculate percentage change using a loop
for col_name in percentage_cols:
    pivot_table = pivot_table.withColumn(f"{col_name}_lag", lag(col(col_name)).over(window_spec))
    pivot_table = pivot_table.withColumn(f"{col_name}_change", (col(col_name) - col(f"{col_name}_lag")) / col(f"{col_name}_lag") * 100)
    pivot_table = pivot_table.drop(f"{col_name}_lag")
    pivot_table = pivot_table.drop(f"{col_name}")

Output of code is

year column ,variablenames column 3

I want maximum percentage change of each column and when it occurred in dataframe


Solution

  • Created a sample dataframe according to your requirement and according to your question

    from pyspark.sql import *
    from pyspark.sql.window import *
    from pyspark.sql.functions import *
    
    # Initialize Spark session
    spark = SparkSession.builder.appName("PercentageChangeCalculation").getOrCreate()
    
    # Sample data
    data = [
        ("2020", 100, 120, 80),
        ("2021", 130, 150, 110),
        ("2022", 260, 180, 140)
    ]
    
    columns = ["Year", "Sales", "Expenses", "Profit"]
    
    # Create DataFrame
    df = spark.createDataFrame(data, columns)
    
    # Pivot the DataFrame (assuming you've already pivoted the data)
    pivot_table = df
    
    # Rename columns
    new_column_names = ["Year"] + [col_name.replace(" ", "_") for col_name in pivot_table.columns[1:]]
    pivot_table = pivot_table.toDF(*new_column_names)
    
    # Task 4: Calculate percentage change
    percentage_cols = pivot_table.columns[1:]  # Exclude the "Year" column
    window_spec = Window.orderBy("Year")
    
    pivot_table = pivot_table.drop("Other_purchases_and_operating_expenses")
    
    # Calculate percentage change using a loop
    for col_name in percentage_cols:
        pivot_table = pivot_table.withColumn(f"{col_name}_lag", lag(col(col_name)).over(window_spec))
        pivot_table = pivot_table.withColumn(f"{col_name}_change", (col(col_name) - col(f"{col_name}_lag")) / col(f"{col_name}_lag") * 100)
        pivot_table = pivot_table.drop(f"{col_name}_lag")
        pivot_table = pivot_table.drop(f"{col_name}")
    
    pivot_table.show()

    OUTPUT:-

    +----+------------+---------------+-----------------+
    |Year|Sales_change|Expenses_change|    Profit_change|
    +----+------------+---------------+-----------------+
    |2020|        null|           null|             null|
    |2021|        30.0|           25.0|             37.5|
    |2022|       100.0|           20.0|27.27272727272727|
    +----+------------+---------------+-----------------+
    

    To get maximum percentage change of each column and when it occurred in dataframe use this

    percentage_cols = pivot_table.columns[1:]  
    result_data = []
    
    for column_name in percentage_cols:
        max_value_row = pivot_table.select(column_name, "Year").orderBy(col(column_name).desc()).first()
        max_value = max_value_row[column_name]
        max_name = max_value_row["Year"]
        # print(max_value_row)
        result_data.append(Row(column_name=column_name, max_value=max_value, occored_in=max_name))
    
        # print(column_name,max_value,max_name)
    result_df = spark.createDataFrame(result_data)
    result_df.show()
    

    Output

    +---------------+---------+----------+
    |    column_name|max_value|occored_in|
    +---------------+---------+----------+
    |   Sales_change|    100.0|      2022|
    |Expenses_change|     25.0|      2021|
    |  Profit_change|     37.5|      2021|
    +---------------+---------+----------+