I have a strange problem that I can't figure out. I'm stuck.
We are writing data as it comes to s3 in the format of delta lake like this:
df.write.format("delta").mode("overwrite").save("s3://path_to_table/)
delta_table = DeltaTable.forPath(spark, "s3://path_to_table/")
delta_table.generate("symlink_format_manifest")
We also manually create the glue database and glue table like this:
glue_clt = boto3.client("glue", region_name="us-east-1")
glue_clt.create_table(
DatabaseName="database_name",
TableInput={
"Name": "table_name",
"StorageDescriptor": {
"Columns": [{"Name": "column1", "Type": "double"}, {"Name": "column2", "Type": "string"}],
"Location": "s3://path_to_table/_symlink_format_manifest",
"InputFormat": "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"SerdeInfo": {
"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
}
},
"PartitionKeys": [{"Name": "column3", "Type": "string"}, {"Name": "column4", "Type": "string"}],
"TableType": "EXTERNAL_TABLE"
}
)
We then had new data come in and a data type change on "column1". So we catch this error then want to run this through a new process. This new process changes the type like this:
df = spark.read.format('delta').load("s3://path_to_table/")
delta_table_df = delta_table_df.withColumn("column1", col("column1").cast("string"))
delta_table_df.write.format("delta").mode("overwrite") \
.partitionBy(["column3", "column4"]).option("overwriteSchema","true") \
.save("s3://path_to_table/")
delta_table = DeltaTable.forPath(spark, "s3://path_to_table/")
delta_table.generate("symlink_format_manifest")
But after doing this I can confirm the schema has changed on the underlying parquet files because when I read the data back in after completing above, I see that "column1" has a type of "string".
Yet I get this error when trying to query in Athena:
HIVE_BAD_DATA: Field column1's type BINARY in parquet file s3://bucket/test_source/test_database/test_schema/test_table/column3=Q/column4=541/part-00001-2a918783-6cd1-4cd8-9a68-28c63ab40989.c000.snappy.parquet is incompatible with type double defined in table schema
what am I missing?
You need to update the glue table definition. The parquet files themselves might be updated, but athena is reading the column type from the glue data catalogue and it is no longer correct.
import boto3
glue = boto3.client('glue')
column_name = 'column1'
new_column_type = 'string'
table = glue.get_table(DatabaseName=database_name, Name=table_name)
# Update schema with new column type
for column in table['Table']['StorageDescriptor']['Columns']:
if column['Name'] == column_name:
column['Type'] = new_column_type
glue.update_table(DatabaseName=database_name, TableInput=table['Table'])