Search code examples
python-3.xaws-glueaws-glue-spark

Glue: map/process source table's column data and write it to columns in pre-existing redshift table


I am very new to Glue and came across to a scenario where we've source table in glue catalog and we need to write it's data to specific columns in pre-existing table in redshift. e.g.

source_table_name[source_table_column_name].       target_table_name[target_table_column_name]

employee[id]                                      resource[resource_id]
employee[name]                                    resource[resource_name]
employee[policy]                                  resource[policy_name]
employee[zip]                                     resource[zipcode]
...                                               ...
...                                               ...
...                                               ...

could you please share how/what glue function can be utilized to write UDF in python which can iterate through given subset of column names from source_table and map/write this data to specified column name in target table (as in example above) in redshift?

For example: write id column data from employee <source table> to resource_id column in resource <target table> in redshift and so on.

I've written following logic to load data in source_dynf:

    def load_data(self):
        self.logger.info(f"Loading data from Glue Catalog source [{self.source_database}/{self.source_table}]")
        source_dynf = self.glue_context.create_dynamic_frame.from_catalog(
            database=self.source_database,
            table_name=self.source_table,
            transformation_ctx=f"load_{self.source_database}_{self.source_table}"
        )
   return source_dynf

  def process_data(self, source_dynf):
      ###how I can map data as mentioned above and return processed_dynf from here which I can write to redshift target table

  def write_data(self):
      ###write to redshift target table

thanks in advance for suggestions/help!


Solution

  • If you are just renaming all columns, the typical pattern is:

    # in your imports
    from awsglue.transforms import ApplyMapping
    
    #just after your from_catalog
    source_dynf = ApplyMapping.apply(frame=source_dynf, mappings=[
        ("id", "string", "resource_id","string"),#the pattern here is source column name, source column type, target column name, target column type
        ("name", "string", "resource_name","string")
        #and so on, follow the pattern.
    ], transformation_ctx="mapping")
    
    

    If you plan to use pyspark dataframes instead, the syntax is easier, and doesn't fool with the types:

    #in your imports
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    
    spark_context = SparkContext.getOrCreate()
    glue_context = GlueContext(spark_context)
    
    frame = source_dynf.toDF()
    frame = frame.toDF("resource_id","resource_name")#and so on, the arguments are the new column names, make sure the number of string arguments equals the number of columns in the frame
    source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")
    

    Per discussion below you would like to pull the schema from your target database and push it onto the source data. Something like this should do the trick:

    #get the schema for the target frame
    # see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-redshift.html
    # note: you may want to read from a small partition for performance, see:
    # https://docs.aws.amazon.com/glue/latest/dg/run-jdbc-parallel-read-job.html
    my_conn_options = {  
        "url": "jdbc:redshift://host:port/redshift database name",
        "dbtable": "redshift table name",
        "user": "username",
        "password": "password",
        "redshiftTmpDir": args["TempDir"],
        "aws_iam_role": "arn:aws:iam::account id:role/role name"
    }
    
    target_frame = glueContext.create_dynamic_frame_from_options("redshift", my_conn_options)
    frame = source_dynf.toDF()
    frame = frame.ToDf(*[field.name for field in target_frame.schema.fields])# note, number of columns must match!
    source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")