i have a dataframe with the following schema:
root
|-- key: string (nullable = true)
|-- points: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- time: long (nullable = true)
| | |-- latitude: double (nullable = true)
| | |-- longitude: double (nullable = true)
| | |-- altitude: float (nullable = true)
i add an altitude_list column that is a string concatenation of the altitude field
df.withColumn("altitude_list", stringify_altitudes("points")).drop("points").show()
i'm using this funtion to stringify
def stringify_litetrack_points(points):
return F.expr("array_join(transform(points, x -> concat(round(x.altitude))), ':')")
this works fine. however, sometimes the altitude is None (null) and in these cases, the function does nothing with that point array entry, meaning that if i have five points and one of the points has a None altitude, the resulting altitude_list will only have four items. i'd to insert a literal for None values.
if tried to use a when().otherwise() as follows...
def stringify_litetrack_points(points):
return F.expr("array_join(transform(points, x -> concat(when(x.altitude.isNotNull(), round(x.altitude)).otherwise('*'))), ':')")
...but i get a syntax error at the otherwise().
ParseException:
Syntax error at or near '('(line 1, pos 98)
== SQL ==
array_join(transform(points, x -> concat(when(x.altitude.isNotNull(), round(x.altitude)).otherwise('*'))), ':')
--------------------------------------------------------------------------------------------------^^^
what am i doing wrong? should i be doing this a different way? thanks.
There are two options: we can use either Spark-SQL or the PySpark-API.
Option 1: Using Spark-SQL
Instead of using case when... use the (easier) ifnull:
F.expr("array_join(transform(points, x -> ifnull(round(x.altitude), '*')),':')"
The inner concat
is not necessary, the array_join
already does the job.
Option 2: Using the PySpark-API
from pyspark.sql import functions as F
df.select(
F.array_join(
F.transform('points',
lambda c: F.when(
c['altitude'].isNotNull(), F.round(c['altitude'])).otherwise('*')
),
':')) \
.show()
Using the null_replacement
parameter of array_join the code can be shortened to
df.select(
F.array_join(
F.transform('points',
lambda c: F.round(c['altitude'])
),
':', '*')) \
.show()
What to use?
Although Spark SQL's case when... and PySpark's when function look very similar, they belong to two completely separate worlds. While Spark-SQL basically is a Python string that is parsed at runtime the PySpark API is a set of Python functions. It is possible to combine both approaches using expr, but I usually try to stick to only one of them. In most cases I try to use the API as I get some support from my IDE when using the API.