Search code examples
azurepysparkspark-streamingazure-databricksdatabricks-autoloader

Autoloader filter duplicates


Im heaving streaming dataframe and wonder how can I eliminate duplicataes plus select only latest modifiedon row.

For example.

id modifiedon
1 03/08/2023
1 03/08/2023
2 02/08/2023
2 03/08/2023

Desired df

id modifiedon
1 03/08/2023
2 03/08/2023

So, if there is the idential (100%) identical rows, like id 1, it needs to drop all and keep only 1 row per ID. and also, if the same ID is represented multiple times (updated multiple times) I need to pick based on the maximum modifiedon column.

df = (
    spark.readStream
    .option("delimiter", ",")
    .option("quote", '"')
    .option("mode", "permissive")
    .option("lineSep", "\r\n")
    .option("multiLine", "true")
    .format("cloudFiles")
    .option("cloudFiles.format", source_format)
    .option("cloudFiles.schemaLocation", checkpoint_directory)
    .option("header", "false")
    .option("escape", '"')
    .schema(schema)
    .load(data_source)
)

Solution

  • You can follow below approach.

    Below is the data I used.

    enter image description here

    df = df.dropDuplicates().withColumn("modifiedon",to_timestamp("modifiedon","dd/MM/yyyy"))
    df = df.withWatermark("modifiedon", "1 day")
    df = df.groupBy("id").agg(max("modifiedon").alias("modifiedon"))
    display(df)
    

    Here, over period of 1 day on streaming data apply aggregation and do max on the column modifiedon.

    Output:

    enter image description here