I'm doing few transformation to my Glue DF in Python and noticed that filter()
method, when implemented, causes below error:
when loading to SQL Server RDS:
`Can't get JDBC type for null Failed`
Code:
def get_credentials(self, glue_connection):
glue_context = self.gc
credentials = glue_context.extract_jdbc_conf(glue_connection)
url = credentials.get("url")
host = url[url.find("//") + 2 : url.rfind(":")]
port = url[url.rfind(":") + 1 :]
normalized_credentials = {
"url": credentials.get("url"),
"host": host,
"port": port,
"user": credentials.get("user"),
"password": credentials.get("password"),
}
return normalized_credentials
def create_source_df(self, schema_table):
credentials = self.get_credentials(self.landing_connection_name)
mssql_df = self.gc.create_dynamic_frame.from_options(
connection_type="sqlserver",
connection_options={
"url": credentials["url"] + ";database=" + self.database_name,
"user": credentials["user"],
"password": credentials["password"],
"dbtable": schema_table,
},
transformation_ctx="glue_df",
)
return mssql_df
glue_df = glue_obj.create_source_df(schema_table)
glue_df = glue_df.resolveChoice(specs=[("created_at", "cast:date")])
glue_df = glue_df.resolveChoice(specs=[("updated_at", "cast:date")])
#throws Can't get JDBC type for null Failed error
glue_df = glue_df.filter(f=lambda x: x["filed_to_filter"] not in ["DELETE"])
Glue's dynamicframe doesn't have cast
method thus it it required to convert into spark's dataframe, do the cast and back to glue's df...
DATA_TYPES = {"null": "int"}
job = Context()
rds = RDSClient(job)
glue_df = rds.rds_df()
spark_df = glue_df.toDF()
for column, dtype in spark_df.dtypes:
if dtype in DATA_TYPES.keys():
spark_df = spark_df.withColumn(column, spark_df[column].cast(DATA_TYPES[dtype]))
glue_df = DynamicFrame.fromDF(spark_df, job.gc, "glue_df")