Search code examples
pythonpysparkamazon-redshiftaws-glueaws-glue-spark

AWS Glue Data moving from S3 to Redshift


I have around 70 tables in one S3 bucket and I would like to move them to the redshift using glue. I could move only few tables. Rest of them are having data type issue. Redshift is not accepting some of the data types. I resolved the issue in a set of code which moves tables one by one:

table1 = glueContext.create_dynamic_frame.from_catalog(
    database="db1_g", table_name="table1"
)
table1 = table1.resolveChoice(
    specs=[
        ("column1", "cast:char"),
        ("column2", "cast:varchar"),
        ("column3", "cast:varchar"),
    ]
)
table1 = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=table1,
    catalog_connection="redshift",
    connection_options={"dbtable": "schema1.table1", "database": "db1"},
    redshift_tmp_dir=args["TempDir"],
    transformation_ctx="table1",
)

The same script is used for all other tables having data type change issue. But, As I would like to automate the script, I used looping tables script which iterate through all the tables and write them to redshift. I have 2 issues related to this script.

  1. Unable to move the tables to respective schemas in redshift.
  2. Unable to add if condition in the loop script for those tables which needs data type change.
client = boto3.client("glue", region_name="us-east-1")

databaseName = "db1_g"
Tables = client.get_tables(DatabaseName=databaseName)
tableList = Tables["TableList"]

for table in tableList:
    tableName = table["Name"]
    datasource0 = glueContext.create_dynamic_frame.from_catalog(
        database="db1_g", table_name=tableName, transformation_ctx="datasource0"
    )

    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame=datasource0,
        catalog_connection="redshift",
        connection_options={
            "dbtable": tableName,
            "database": "schema1.db1",
        },
        redshift_tmp_dir=args["TempDir"],
        transformation_ctx="datasink4",
    )
job.commit()

Mentioning redshift schema name along with tableName like this: schema1.tableName is throwing error which says schema1 is not defined.

Can anybody help in changing data type for all tables which requires the same, inside the looping script itself?


Solution

  • So the first problem is fixed rather easily. The schema belongs into the dbtable attribute and not the database, like this:

    connection_options={
                "dbtable": f"schema1.{tableName},
                "database": "db1",
    }
    

    Your second problem is that you want to call resolveChoice inside of the for Loop, correct? What kind of error occurs there? Why doesn't it work?