Search code examples
loopsapache-sparkpysparkdatabricksazure-databricks

Trying to pass a table from a container into a pyspark variable and use it's columns in a select statement


I have multiple delta tables inside of an ADLS container, for each of these tables I want to generate a script that converts them into parquet files and explicitly lists the columns from each table. This is what I have at the moment but it's not creating different select statements, just dumping all the columns from all tables into a single select statement:

df = spark.sql("SHOW TABLES IN gold")
df_rows = df.collect()
table_names = [row["tableName"] for row in df_rows]
 
for table_name in table_names:
    df = spark.sql("SHOW COLUMNS IN gold." + table_name)
    df_rows = df.collect()
    column_names = [row["col_name"] for row in df_rows]
    select_statement ="spark.sql('SELECT " + ",".join(column_names) + "FROM gold." +tablename + "').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/" + tablename +"'" 

How can I get an output that would look like this:

spark.sql('SELECT column1,column2,etc FROM gold.table1').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table1')

spark.sql('SELECT column1,column2,etc FROM gold.table2').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table2')

spark.sql('SELECT column1,column2,etc FROM gold.table3').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table3')


Solution

  • You're overwriting the select_statement variable with each iteration. You'll need to use a list and append to it instead.

    df = spark.sql("SHOW TABLES IN gold")
    df_rows = df.collect()
    table_names = [row["tableName"] for row in df_rows]
    
    select_statements = []
    for table_name in table_names:
        df = spark.sql("SHOW COLUMNS IN gold." + table_name)
        df_rows = df.collect()
        column_names = [row["col_name"] for row in df_rows]
        select_statements.append("spark.sql('SELECT " + ", ".join(column_names) + " FROM gold." + table_name + "').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/" + table_name +"')")
    
    for statement in select_statements:
        print(statement+"\n")
    

    There were a few syntax errors in your select_statement values as well. This code should work.