Search code examples
amazon-web-servicesetlupsertaws-gluestaging-table

Getting duplicates in the Table when an ETL job Is ruined twice.ETL job fetch data from RDS to S3 bucket


When the ETL job is run it execute properly but as the table is not having Timestamp it duplicate the data when the same ETL job is run.How to perform staging and solve this problem using Upsert or if any other you are welcome to answer.How do I get rid of this problem the solution I find is either include timestamp in it or doing staging or is there any other way?


Solution

  • To prevent duplicates on s3 you need to load data from destination and filter out existing records before saving:

    val deltaDf = newDataDf.alias("new")
      .join(existingDf.alias("existing"), "id", "left_outer")
      .where(col("existing.id").isNull)
      .select("new.*")
    
    glueContext.getSinkWithFormat(
        connectionType = "s3",
        options = JsonOptions(Map(
          "path" -> path
        )),
        transformationContext = "save_to_s3"
        format = "avro"
      ).writeDynamicFrame(DynamicFrame(deltaDf, glueContext))
    

    However, this method doesn't overwrite updated records.

    Another option is to save updated records too with some updated_at field which can be used by downstream consumers to get the latest values.

    You can also consider dumping dataset into a separate folder each time you run your job (ie. every day you have a full dump of data in data/dataset_date=<year-month-day>)

    import org.apache.spark.sql.functions._
    
    val datedDf = sourceDf.withColumn("dataset_date", current_date())
    
    glueContext.getSinkWithFormat(
        connectionType = "s3",
        options = JsonOptions(Map(
          "path" -> path,
          "partitionKeys" -> Array("dataset_date")
        )),
        transformationContext = "save_to_s3"
        format = "avro"
      ).writeDynamicFrame(DynamicFrame(datedDf, glueContext))