Search code examples
pythonsnowflake-cloud-data-platform

Dynamic pivot or lateral flatten in Snowflake / Snowpark to columns


Given this sample table in Snowflake:

CREATE OR REPLACE TABLE vnt
(src variant)
AS SELECT parse_json(column1) as src
FROM values
('{"a": 1,"b": 2,"c": 3}'),
('{"a": 1,"b": 2,"c": 3,"d": 4}');

select * from vnt;

I would like to output a table with two rows such as

a b c d
1 2 3 NULL
1 2 3 4

meaning that I would like to flatten the JSON data into columns instead of rows. I tried in Snowpark to flatten it, but I have the problem with the pivot as it does not work. And since the Keys can dynamically change, how could I handle this?

import snowflake.snowpark as snowpark

def main(session: snowpark.Session): 
    df = session.sql("select * from vnt")
    df = df.join_table_function("flatten", df["SRC"]) \
            .drop(["SEQ", "SRC", "PATH", "INDEX", "THIS"])
    df = df.pivot("VALUE",['a','b','c','d']).min("KEY")

    # Return value will appear in the Results tab.
    return df

Solution

  • Snowflake supports dynamic PIVOT both for SQL clause and snowflake.snowpark.DataFrame.pivot:

    Parameters

    • pivot_col – The column or name of the column to use.
    • values – A list of values in the column, or dynamic based on the DataFrame query, or None (default) will use all values of the pivot column.
    • default_on_null – Expression to replace empty result values.

    Therefore:

    import snowflake.snowpark as snowpark
    from snowflake.snowpark.functions import any_value
    
    def main(session: snowpark.Session): 
        df = session.table("vnt")
        df = df.join_table_function("flatten", df["SRC"]).drop(["SEQ", "PATH", "INDEX", "THIS"])
        
        df = df.pivot("key").min("value")
    
        return df
    

    Previous version:

    It can be achieved by generating column list for pivoting:

    import snowflake.snowpark as snowpark
    from snowflake.snowpark.functions import any_value
    
    def main(session: snowpark.Session): 
        df = session.table("vnt")
        df = df.join_table_function("flatten", df["SRC"]).drop(["SEQ", "PATH", "INDEX", "THIS"])
        cols = [c[0] for c in df.group_by("key").agg(any_value("key")).collect()]
        df = df.pivot("key", cols).min("value")
        return df
    

    Output:

    enter image description here


    EDIT:

    In order to remove ' around pivoted columns you need to explicitly alias columns as per documentation:

    PIVOT

    If you prefer the column names without quotes, or if you prefer that the output have different column names than the input, you can include the column names in the AS clause, as shown below:

    SELECT EMPID AS EMP_ID, "'JAN'" AS JANUARY, "'FEB'" AS FEBRUARY,
        "'MAR'" AS MARCH, "'APR'" AS APRIL
    FROM monthly_sales
    PIVOT(sum(amount) FOR MONTH IN ('JAN', 'FEB', 'MAR', 'APR')) AS p
    ORDER BY EMPID;
    

    Using Snowpark for Python:

    import snowflake.snowpark as snowpark
    from snowflake.snowpark.functions import any_value, col
    
    def main(session: snowpark.Session): 
        df = session.table("vnt")
        df = df.join_table_function("flatten", df["SRC"]).drop(["SEQ", "PATH", "INDEX", "THIS"])
        cols = [c[0] for c in df.group_by("key").agg(any_value("key")).collect()]
        cols_alias = [col("'" + c + "'").alias(c) for c in cols]
        df = df.pivot("key", cols).min("value").select(cols_alias)
        return df
    

    Output:

    enter image description here