Search code examples
sqlapache-spark-sqlwindow-functionsdatabricksgaps-and-islands

How to distribute ranks when prior rank is zero (part 2)


This is an extension to my prior question How to distribute values when prior rank is zero. The solution worked great for the postgres environment, but now I need to replicate to a databricks environment (spark sql).

The question is the same as before, but now trying to determine how to convert this postgres query to spark sql. Basically, it's summing up allocation amounts if there are gaps in the data (ie, no micro_geo's when grouping by location and geo3). The "imputed allocation" will equal 1 for all location & zip3 groups.

This is the postgres query, which works great:

    select location_code, geo3, distance_group, has_micro_geo, imputed_allocation from 
        (
        select ia.*,
               (case when has_micro_geo > 0
                     then sum(allocation) over (partition by location_code, geo3, grp)
                     else 0
                end) as imputed_allocation
        from (select s.*,
                     count(*) filter (where has_micro_geo <> 0) over (partition by location_code, geo3 order by distance_group desc) as grp
              from staging_groups s
             ) ia
        )z

But it doesn't translate well and produces this error in databricks:

    Error in SQL statement: ParseException: 
    mismatched input 'from' expecting <EOF>(line 1, pos 78)

    == SQL ==
    select location_code, geo3, distance_group, has_micro_geo, imputed_allocation from 
    ------------------------------------------------------------------------------^^^
        (
        select ia.*,
               (case when has_micro_geo > 0
                     then sum(allocation) over (partition by location_code, geo3, grp)
                     else 0
                end) as imputed_allocation
        from (select s.*,
                     count(*) filter (where has_micro_geo <> 0) over (partition by location_code, geo3 order by distance_group desc) as grp
              from staging_groups s
             ) ia
        )z

Or at a minimum, how to convert just part of this inner query which creates a "grp", and then perhaps the rest will work. I have been trying to replace this filter-where logic with something else, but attempts have not worked as desired.

    select s.*,
    count(*) filter (where has_micro_geo <> 0) over (partition by location_code, geo3 order by distance_group desc) as grp
    from staging_groups s
    

Here's a db-fiddle with data https://www.db-fiddle.com/f/wisvDZJL9BkWxNFkfLXdEu/0 which is currently set to postgres, but again I need to run this in a spark sql environment. I've tried breaking this down and creating different tables, but my groups are not working as desired.

Here's an image to better visualize the output:

image of data and desired column(s) to create


Solution

  • You need to rewrite this subquery:

    select s.*,
        count(*) filter (where has_micro_geo <> 0) over (partition by location_code, geo3 order by distance_group desc) as grp
    from staging_groups s
    

    Although the filter() clause to window and aggregate functions is standard SQL, few databases support it so far. Instead, consider a conditional window sum(), which produces the same result:

    select s.*,
        sum(case when has_micro_geo <> 0 then 1 else 0 end) over (partition by location_code, geo3 order by distance_group desc) as grp
    from staging_groups s
    

    I think that the rest of the query should run fine in Spark SQL.