I have multiple delta tables inside of an ADLS container, for each of these tables I want to generate a script that converts them into parquet files and explicitly lists the columns from each table. This is what I have at the moment but it's not creating different select statements, just dumping all the columns from all tables into a single select statement:
df = spark.sql("SHOW TABLES IN gold")
df_rows = df.collect()
table_names = [row["tableName"] for row in df_rows]
for table_name in table_names:
df = spark.sql("SHOW COLUMNS IN gold." + table_name)
df_rows = df.collect()
column_names = [row["col_name"] for row in df_rows]
select_statement ="spark.sql('SELECT " + ",".join(column_names) + "FROM gold." +tablename + "').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/" + tablename +"'"
How can I get an output that would look like this:
spark.sql('SELECT column1,column2,etc FROM gold.table1').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table1')
spark.sql('SELECT column1,column2,etc FROM gold.table2').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table2')
spark.sql('SELECT column1,column2,etc FROM gold.table3').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table3')
You're overwriting the select_statement
variable with each iteration. You'll need to use a list and append to it instead.
df = spark.sql("SHOW TABLES IN gold")
df_rows = df.collect()
table_names = [row["tableName"] for row in df_rows]
select_statements = []
for table_name in table_names:
df = spark.sql("SHOW COLUMNS IN gold." + table_name)
df_rows = df.collect()
column_names = [row["col_name"] for row in df_rows]
select_statements.append("spark.sql('SELECT " + ", ".join(column_names) + " FROM gold." + table_name + "').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/" + table_name +"')")
for statement in select_statements:
print(statement+"\n")
There were a few syntax errors in your select_statement
values as well. This code should work.