I have two files one is file1.csv and another one is file2.csv I have put file1 data in one dataframe and when second file file2.csv will arrive then I have to write a code in such a way that if second file data matches in first file data on basis of year and month columns then delete the data from file1 dataframe as it is old data and insert new file2 data in file1 dataframe
File1.csv
year month Amount
2022 Aug 12
2022 Oct 10
2021 Jan 20
2020 March 30
file2.csv
year month Amount
2022 Jan 220
2022 Feb 130
2022 Oct 100
final output
year month Amount
2022 Aug 12
2022 Oct 100
2021 Jan 20
2020 March 30
2022 Feb 130
2022 Jan 220
I have been trying if exists condition in pyspark but it is not working
Here are my 2 cents:
Create 2 dataframes from 2 CSV files(in my case I'm just creating with static data)
from pyspark.sql.functions import *
from pyspark.sql.window import *
data1 = [
(2022, 'Aug', 12),
(2022, 'Oct', 10),
(2021, 'Jan', 20),
(2020, 'March', 30)]
data2 = [
(2022, 'Jan', 220),
(2022, 'Feb', 130),
(2022, 'Oct', 100)]
df_main = spark.createDataFrame(data1,schema = ['year', 'month', 'Amount'])
df_incremental = spark.createDataFrame(data2,schema = ['year', 'month', 'Amount'])
Then Use row_number() on top of year and month and then once evaluated filter only such rows whose row_number is 1, and then drop the row_number column.
df_merge = df_incremental.unionAll(df_main)
windowSpec = Window.partitionBy('year', 'month').orderBy('year', 'month')
df_merge = df_merge.withColumn("_row_number", row_number().over(windowSpec))
df_merge = df_merge.where(df_merge._row_number == 1).drop("_row_number")
df_merge.show()