Search code examples
sqlpysparkschemadatabricks

Not able to add column to Databricks dataframe using withColumn but schema reflects the additional column


Trying to add an extra 2 columns into my databricks pyspark dataframe but it doesn't show up when I select * from the resulting table.

for file in file_list:
  
  try:
    sql_query = create_sql_statement(file)
    df = spark.sql(sql_query) \
    .withColumn('type', F.lit('animal_type')) \
    .withColumn('timestamp', F.current_timestamp())
    df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(f'{database}.{table}')

  except Exception as e:
    print(e)

Example of create_sql_statement: 'CREATE TABLE database.TABLE_NAME AS SELECT FIELD1, FIELD2, FIELD3, FIELD4, type, timestamp FROM DATABASE.TABLENAME'

When running the code above, the for loop successfully creates the table and I see the pyspark df result below but don't see the new columns implemented in my table.

num_affected_rows:long
num_inserted_rows:long
type:string
timestamp:timestamp

I see one of two results:

  1. When I select * from one of the tables, I see the result 'query returned no results', but if I select * from the source database.table_name the create_sql_statement is built from, there is definitely data there.
  2. Depending on the table, for some when I select * I see the correct output but without the added columns from the 'withColumn' clauses above.

Am I missing something on the syntax? This is a followup question from my previous question 'SQL Error mismatched input 'sql_query' expecting {EOF} when using Create Table in Pyspark' which was solved.


Solution

  • A couple of points:

    • A CREATE TABLE... statement will not return the resulting table. So when you run spark.sql(sql_query) it returns nothing... which you then try to modify and have chosen to ignore the ensuing error.
    • You can't overwrite a table you're currently reading from. It's not clear if this is what you're trying to do or not.

    What you likely mean to do is:

    df = spark.sql(
        """
        SELECT FIELD1, FIELD2, FIELD3, FIELD4 
          FROM DATABASE.table_you_read_from
        """
    )
    df = (
        df.withColumn('type', F.lit('animal_type'))
          .withColumn('timestamp', F.current_timestamp())
    )
    (
        df.write
          .format("delta")
          .option("overwriteSchema", "true")
          .mode("overwrite")
          .saveAsTable('DATABASE.new_table')
    )
    

    Alternatively, in pure SQL this would be:

    spark.sql("DROP TABLE IF EXISTS DATABASE.new_table")
    spark.sql(
        """
        CREATE TABLE DATABASE.new_table AS
        SELECT FIELD1, FIELD2, FIELD3, FIELD4
             , 'animal_type' AS type
             , current_timestamp AS timestamp
          FROM DATABASE.table_you_read_from
        """
    )