Search code examples
amazon-redshiftaws-glue

Glue creates duplicates of records, how to fix it?


Currently, we use Glue (python scripts) for data migration from MySQL database into RedShift database. Yesterday, we found an issue: some records are duplicates, these records have the same primary key which is used in MySQL database. According to our requirements, all data in RedShift database should be the same as in MySQL database.

I tried to remove a RedShift table before migration, but didn't find method for that...

Could you help me to fix the issue?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "glue-db", table_name = "table", transformation_ctx = "datasource0")
applymapping0_1 = ApplyMapping.apply(frame = datasource0, mappings = [...], transformation_ctx = "applymapping0_1")
resolvechoice0_2 = ResolveChoice.apply(frame = applymapping0_1, choice = "make_cols", transformation_ctx = "resolvechoice0_2")
dropnullfields0_3 = DropNullFields.apply(frame = resolvechoice0_2, transformation_ctx = "dropnullfields0_3")
datasink0_4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields0_3, catalog_connection = "redshift-cluster", connection_options = {"dbtable": "table", "database": "database"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink0_4")

My solution is:

datasink0_4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields0_3, catalog_connection = "redshift-cluster", connection_options = {"dbtable": "mytable", "database": "mydatabase", "preactions": "delete from public.mytable;"}

Solution

  • If your goal is not to have duplicates in destination table you can use postactions option for JBDC sink (see this answer for more details). Basically it allows to implement Redshift merge using staging table.

    For your case it should be like this (replaces existing records):

    post_actions = (
             "DELETE FROM dest_table USING staging_table AS S WHERE dest_table.id = S.id;"
             "INSERT INTO dest_table (id,name) SELECT id,name FROM staging_table;"
             "DROP TABLE IF EXISTS staging_table"
        )
    datasink0_4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields0_3, catalog_connection = "redshift-cluster", connection_options = {"dbtable": "staging_table", "database": "database", "overwrite" -> "true", "postactions" -> post_actions}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink0_4")