Search code examples
dataframepyspark

pyspark syntax error using when/otherwise


i have a dataframe with the following schema:

   root
   |-- key: string (nullable = true)
   |-- points: array (nullable = true)
   |    |-- element: struct (containsNull = true)
   |    |    |-- time: long (nullable = true)
   |    |    |-- latitude: double (nullable = true)
   |    |    |-- longitude: double (nullable = true)
   |    |    |-- altitude: float (nullable = true)

i add an altitude_list column that is a string concatenation of the altitude field

df.withColumn("altitude_list", stringify_altitudes("points")).drop("points").show()

i'm using this funtion to stringify

def stringify_litetrack_points(points):
   return F.expr("array_join(transform(points, x -> concat(round(x.altitude))), ':')")

this works fine. however, sometimes the altitude is None (null) and in these cases, the function does nothing with that point array entry, meaning that if i have five points and one of the points has a None altitude, the resulting altitude_list will only have four items. i'd to insert a literal for None values.

if tried to use a when().otherwise() as follows...

def stringify_litetrack_points(points):
    return F.expr("array_join(transform(points, x -> concat(when(x.altitude.isNotNull(), round(x.altitude)).otherwise('*'))), ':')")

...but i get a syntax error at the otherwise().

ParseException: 
Syntax error at or near '('(line 1, pos 98)

== SQL ==
array_join(transform(points, x -> concat(when(x.altitude.isNotNull(), round(x.altitude)).otherwise('*'))), ':')
--------------------------------------------------------------------------------------------------^^^

what am i doing wrong? should i be doing this a different way? thanks.


Solution

  • There are two options: we can use either Spark-SQL or the PySpark-API.

    Option 1: Using Spark-SQL

    Instead of using case when... use the (easier) ifnull:

    F.expr("array_join(transform(points, x -> ifnull(round(x.altitude), '*')),':')"
    

    The inner concat is not necessary, the array_join already does the job.

    Option 2: Using the PySpark-API

    from pyspark.sql import functions as F
    
    df.select(
      F.array_join(
        F.transform('points', 
          lambda c: F.when(
                    c['altitude'].isNotNull(), F.round(c['altitude'])).otherwise('*')
          ), 
      ':')) \
    .show()
    

    Using the null_replacement parameter of array_join the code can be shortened to

    df.select(
      F.array_join(
        F.transform('points', 
          lambda c: F.round(c['altitude'])
        ), 
      ':', '*')) \
    .show()
    

    What to use?

    Although Spark SQL's case when... and PySpark's when function look very similar, they belong to two completely separate worlds. While Spark-SQL basically is a Python string that is parsed at runtime the PySpark API is a set of Python functions. It is possible to combine both approaches using expr, but I usually try to stick to only one of them. In most cases I try to use the API as I get some support from my IDE when using the API.