Search code examples
amazon-web-servicesamazon-s3pysparkaws-glueaws-glue-spark

loop through multiple tables from source to s3 using glue (Python/Pyspark) through configuration file?


I am looking ingest multiple tables from a relational database to s3 using glue. The table details are present in a configuration file. The configuration file is a json file. Would be helpful to have a code that can loop through multiple table names and ingests these tables into s3. The glue script is written in python (pyspark)

this is sample how the configuration file looks :

{"main_key":{
      "source_type": "rdbms", 
      "source_schema": "DATABASE", 
      "source_table": "DATABASE.Table_1", 
}}

Solution

  • Assuming your Glue job can connect to the database and a Glue Connection has been added to it. Here's a sample extracted from my script that does something similar, you would need to update the jdbc url format that works for your database, this one uses sql server, implementation details for fetching the config file, looping through items, etc.

    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    
    from datetime import datetime
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    jdbc_url = f"jdbc:sqlserver://{hostname}:{port};databaseName={db_name}"
    connection_details = {
        "user": 'db_user',
        "password": 'db_password',
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    }
    
    tables_config = get_tables_config_from_s3_as_dict()
    date_partition = datetime.today().strftime('%Y%m%d')
    write_date_partition = f'year={date_partition[0:4]}/month={date_partition[4:6]}/day={date_partition[6:8]}'
    
    for key, value in tables_config.items():
        table = value['source_table']
    
        df = spark.read.jdbc(url=jdbc_url, table=table, properties=connection_details)
        write_path = f's3a://bucket-name/{table}/{write_date_partition}'
        df.write.parquet(write_path)