pythonapache-sparkpysparkapache-spark-sql# PySpark: How to apply a Python UDF to PySpark DataFrame columns?

I have a PySpark DataFrame with two sets of latitude, longitude coordinates. I am trying to calculate the Haversine distance between each set of coordinates for a given row. I am using the following `haversine()`

that I found online. The problem is that it cannot be applied to columns, or at least I do not know the syntax to do so. Can someone share the syntax or point out a better solution?

```
from math import radians, cos, sin, asin, sqrt
def haversine(lat1, lon1, lat2, lon2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
# Radius of earth in miles is 3,963; 5280 ft in 1 mile
ft = 3963 * 5280 * c
return ft
```

I know the `haversine()`

function above works because I tested it using some lat/lon coordinates from my dataframe and got sensible results:

```
haversine(-85.8059, 38.250134,
-85.805122, 38.250098)
284.1302325439314
```

When I replace sample coordinates with column names corresponding to lat/lons in my PySpark dataframe, I get an error. I have tried the following code in an attempt to create a new column containing the calculated Haversine distance as measured in feet:

```
df.select('id', 'p1_longitude', 'p1_latitude', 'p2_lon', 'p2_lat').withColumn('haversine_dist',
haversine(df['p1_latitude'],
df['p1_longitude'],
df['p2_lat'],
df['p2_lon']))
.show()
```

but I get the error:

must be real number, not Column Traceback (most recent call last):

File "", line 8, in haversine TypeError: must be real number, not Column

This indicates to me that I must somehow iteratively apply my haversine function to each row of my PySpark DataFrame, but I'm not sure if that guess is correct and even if so, I don't know how to do it. As an aside, my lat/lons are float types.

Solution

Don't use UDF when you can use Spark built-in functions as they are generally less performant.

Here is a solution using only Spark SQL functions that do the same as your function :

```
from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos
df.withColumn("dlon", radians(col("p2_lon")) - radians(col("p1_longitude"))) \
.withColumn("dlat", radians(col("p2_lat")) - radians(col("p1_latitude"))) \
.withColumn("haversine_dist", asin(sqrt(
sin(col("dlat") / 2) ** 2 + cos(radians(col("p1_latitude")))
* cos(radians(col("p2_lat"))) * sin(col("dlon") / 2) ** 2
)
) * 2 * 3963 * 5280) \
.drop("dlon", "dlat")\
.show(truncate=False)
```

Gives:

```
+-----------+------------+----------+---------+------------------+
|p1_latitude|p1_longitude|p2_lat |p2_lon |haversine_dist |
+-----------+------------+----------+---------+------------------+
|-85.8059 |38.250134 |-85.805122|38.250098|284.13023254857814|
+-----------+------------+----------+---------+------------------+
```

You can find available Spark builtin functions here.

- Python Jinja2 LaTeX Table
- Getting attributes of a class
- How can I print many significant figures in Python?
- How to allow list append() method to return the new list
- Calculate Last Friday of Month in Pandas
- Python type hint for Iterable[str] that isn't str
- How to iterate over a list in chunks
- How to exit the entire application from a Python thread?
- Running shell command and capturing the output
- How do I pass a variable by reference?
- Convert range(r) to list of strings of length 2 in python
- How can I get the start and end dates for each week?
- how to use send_message() in python-telegram-bot
- Python conditional replacement based on element type
- How can I count the number of items in an arbitrary iterable (such as a generator)?
- Find longest consecutive range of numbers in list
- Insert text in braces with asyncpg
- How does one put a link / url to the web-site's home page in Django?
- How to determine if a path is a subdirectory of another?
- Custom Keybindings for Ipython terminal
- FastAPI asynchronous background tasks blocks other requests?
- How to make sure that information from one file is duplicated into several text documents, without specific lines
- Installing a Python environment with Anaconda
- sklearn pipeline model predicting same results for all input
- Brew command not found after installing Anaconda Python
- How to get an XPath from selenium webelement or from lxml?
- Pipe PuTTY console to Python script
- How to align the axes of a figure in matplotlib?
- Persist ParentDocumentRetriever of langchain
- How to reset index in a pandas dataframe?