Search code examples
pythonamazon-web-servicesaws-glue

AWS Glue - filter method causes "JDBC type for null" error


I'm doing few transformation to my Glue DF in Python and noticed that filter() method, when implemented, causes below error:

when loading to SQL Server RDS:

`Can't get JDBC type for null Failed`

Code:

def get_credentials(self, glue_connection):
    glue_context = self.gc
    credentials = glue_context.extract_jdbc_conf(glue_connection)

    url = credentials.get("url")

    host = url[url.find("//") + 2 : url.rfind(":")]
    port = url[url.rfind(":") + 1 :]

    normalized_credentials = {
        "url": credentials.get("url"),
        "host": host,
        "port": port,
        "user": credentials.get("user"),
        "password": credentials.get("password"),
    }

    return normalized_credentials

def create_source_df(self, schema_table):

    credentials = self.get_credentials(self.landing_connection_name)

    mssql_df = self.gc.create_dynamic_frame.from_options(
        connection_type="sqlserver",
        connection_options={
            "url": credentials["url"] + ";database=" + self.database_name,
            "user": credentials["user"],
            "password": credentials["password"],
            "dbtable": schema_table,
        },
        transformation_ctx="glue_df",
    )

    return mssql_df

glue_df = glue_obj.create_source_df(schema_table)
glue_df = glue_df.resolveChoice(specs=[("created_at", "cast:date")])
glue_df = glue_df.resolveChoice(specs=[("updated_at", "cast:date")])

#throws Can't get JDBC type for null Failed error
glue_df = glue_df.filter(f=lambda x: x["filed_to_filter"] not in ["DELETE"])

Solution

  • Glue's dynamicframe doesn't have cast method thus it it required to convert into spark's dataframe, do the cast and back to glue's df...

    DATA_TYPES = {"null": "int"}
    
    job = Context()
    rds = RDSClient(job)
    
    glue_df = rds.rds_df()
    spark_df = glue_df.toDF()
    
    for column, dtype in spark_df.dtypes:
        if dtype in DATA_TYPES.keys():
            spark_df = spark_df.withColumn(column, spark_df[column].cast(DATA_TYPES[dtype]))
    
    glue_df = DynamicFrame.fromDF(spark_df, job.gc, "glue_df")