I am very new to Glue and came across to a scenario where we've source table in glue catalog and we need to write it's data to specific columns in pre-existing table in redshift. e.g.
source_table_name[source_table_column_name]. target_table_name[target_table_column_name]
employee[id] resource[resource_id]
employee[name] resource[resource_name]
employee[policy] resource[policy_name]
employee[zip] resource[zipcode]
... ...
... ...
... ...
could you please share how/what glue function can be utilized to write UDF in python which can iterate through given subset of column names from source_table and map/write this data to specified column name in target table (as in example above) in redshift?
For example: write id column data from employee <source table> to resource_id column in resource <target table> in redshift and so on.
I've written following logic to load data in source_dynf:
def load_data(self):
self.logger.info(f"Loading data from Glue Catalog source [{self.source_database}/{self.source_table}]")
source_dynf = self.glue_context.create_dynamic_frame.from_catalog(
database=self.source_database,
table_name=self.source_table,
transformation_ctx=f"load_{self.source_database}_{self.source_table}"
)
return source_dynf
def process_data(self, source_dynf):
###how I can map data as mentioned above and return processed_dynf from here which I can write to redshift target table
def write_data(self):
###write to redshift target table
thanks in advance for suggestions/help!
If you are just renaming all columns, the typical pattern is:
# in your imports
from awsglue.transforms import ApplyMapping
#just after your from_catalog
source_dynf = ApplyMapping.apply(frame=source_dynf, mappings=[
("id", "string", "resource_id","string"),#the pattern here is source column name, source column type, target column name, target column type
("name", "string", "resource_name","string")
#and so on, follow the pattern.
], transformation_ctx="mapping")
If you plan to use pyspark dataframes instead, the syntax is easier, and doesn't fool with the types:
#in your imports
from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
frame = source_dynf.toDF()
frame = frame.toDF("resource_id","resource_name")#and so on, the arguments are the new column names, make sure the number of string arguments equals the number of columns in the frame
source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")
Per discussion below you would like to pull the schema from your target database and push it onto the source data. Something like this should do the trick:
#get the schema for the target frame
# see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-redshift.html
# note: you may want to read from a small partition for performance, see:
# https://docs.aws.amazon.com/glue/latest/dg/run-jdbc-parallel-read-job.html
my_conn_options = {
"url": "jdbc:redshift://host:port/redshift database name",
"dbtable": "redshift table name",
"user": "username",
"password": "password",
"redshiftTmpDir": args["TempDir"],
"aws_iam_role": "arn:aws:iam::account id:role/role name"
}
target_frame = glueContext.create_dynamic_frame_from_options("redshift", my_conn_options)
frame = source_dynf.toDF()
frame = frame.ToDf(*[field.name for field in target_frame.schema.fields])# note, number of columns must match!
source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")