Code:
# Rename the columns after pivoting
new_column_names = ["Year"] + [col_name.replace(" ", "_") for col_name in pivot_table.columns[1:]]
pivot_table = pivot_table.toDF(*new_column_names)
# Task 4: Calculate percentage change
percentage_cols = pivot_table.columns[1:] # Exclude the "Year" column
window_spec = Window.orderBy("Year")
print(pivot_table.columns)
pivot_table = pivot_table.drop('Other_purchases_and_operating_expenses')
# Calculate percentage change using a loop
for col_name in percentage_cols:
pivot_table = pivot_table.withColumn(f"{col_name}_lag", lag(col(col_name)).over(window_spec))
pivot_table = pivot_table.withColumn(f"{col_name}_change", (col(col_name) - col(f"{col_name}_lag")) / col(f"{col_name}_lag") * 100)
pivot_table = pivot_table.drop(f"{col_name}_lag")
pivot_table = pivot_table.drop(f"{col_name}")
Output of code is
year column ,variablenames column 3
I want maximum percentage change of each column and when it occurred in dataframe
Created a sample dataframe according to your requirement and according to your question
from pyspark.sql import *
from pyspark.sql.window import *
from pyspark.sql.functions import *
# Initialize Spark session
spark = SparkSession.builder.appName("PercentageChangeCalculation").getOrCreate()
# Sample data
data = [
("2020", 100, 120, 80),
("2021", 130, 150, 110),
("2022", 260, 180, 140)
]
columns = ["Year", "Sales", "Expenses", "Profit"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
# Pivot the DataFrame (assuming you've already pivoted the data)
pivot_table = df
# Rename columns
new_column_names = ["Year"] + [col_name.replace(" ", "_") for col_name in pivot_table.columns[1:]]
pivot_table = pivot_table.toDF(*new_column_names)
# Task 4: Calculate percentage change
percentage_cols = pivot_table.columns[1:] # Exclude the "Year" column
window_spec = Window.orderBy("Year")
pivot_table = pivot_table.drop("Other_purchases_and_operating_expenses")
# Calculate percentage change using a loop
for col_name in percentage_cols:
pivot_table = pivot_table.withColumn(f"{col_name}_lag", lag(col(col_name)).over(window_spec))
pivot_table = pivot_table.withColumn(f"{col_name}_change", (col(col_name) - col(f"{col_name}_lag")) / col(f"{col_name}_lag") * 100)
pivot_table = pivot_table.drop(f"{col_name}_lag")
pivot_table = pivot_table.drop(f"{col_name}")
pivot_table.show()
OUTPUT:-
+----+------------+---------------+-----------------+
|Year|Sales_change|Expenses_change| Profit_change|
+----+------------+---------------+-----------------+
|2020| null| null| null|
|2021| 30.0| 25.0| 37.5|
|2022| 100.0| 20.0|27.27272727272727|
+----+------------+---------------+-----------------+
To get maximum percentage change of each column and when it occurred in dataframe use this
percentage_cols = pivot_table.columns[1:]
result_data = []
for column_name in percentage_cols:
max_value_row = pivot_table.select(column_name, "Year").orderBy(col(column_name).desc()).first()
max_value = max_value_row[column_name]
max_name = max_value_row["Year"]
# print(max_value_row)
result_data.append(Row(column_name=column_name, max_value=max_value, occored_in=max_name))
# print(column_name,max_value,max_name)
result_df = spark.createDataFrame(result_data)
result_df.show()
Output
+---------------+---------+----------+
| column_name|max_value|occored_in|
+---------------+---------+----------+
| Sales_change| 100.0| 2022|
|Expenses_change| 25.0| 2021|
| Profit_change| 37.5| 2021|
+---------------+---------+----------+