I have the following UDF in PySpark under src/my_funcs/
@F.udf(returnType=T.ArrayType(T.ArrayType(T.StringType())))
def get_details(details_url: str) -> list[list[str]]:
details = requests.get(details_url)
details = details.json()
details = # Some logic
return details
Now I want to unit test this UDF. For that, I did the following:
from unittest import mock
@mock.patch("src.my_funcs.requests.get")
def test_get_details(mock_requests_get, spark):
request_json_response = # Expected result of .json()
mock_requests_get.return_value = mock.Mock(**{"status_code":200, "json.return_value":request_json_response})
input_data = [(1, "random_invalid_url")]
input_schema = ["ID", "details_url"]
input_df = spark.createDataFrame(input_data, input_schema)
expected_data = # expected array
expected_schema = ["ID", "details_url", "details"]
expected_df = spark.createDataFrame(expected_data, expected_schema)
transformed_df = input_df.withColumn("details", get_details("details_url")
assert sorted(expected_df.collect()) == sorted(transformed_df.collect())
I keep getting the error
requests.exceptions.MissingSchema: Invalid URL 'random_invalid_url': No scheme supplied. Perhaps you meant https://random_invalid_url?
This error makes it appear that my mocking is not working.
If I remove the UDF decorator from the get_details function, and test the function without Spark, the mocking works and the unit test succeeds. So I assume the error is related to how Spark works. How can I fix this and test the UDF on a Spark DataFrame? Is the error because of how Spark executes the UDF? Do I need to mock it in a different way, or is it just not possible?
Not sure if relevant, but the spark session is created as a pytest fixture like this
spark = SparkSession.builder.master("local[*]").appName("UnitTest").getOrCreate()
If you want to unit test your logic inside get_details
, I would suggest you extract that code into a separate function and test it (no Spark or mock necessary).
If you do not want to refactor your code, you can leave get_details
as is, but instead of using @F.udf
decorator call it as function before passing it to Spark methods (you will need to mock requests but will still avoid Spark):
udf_get_details = F.udf(get_details, returnType=T.ArrayType(T.ArrayType(T.StringType())))
transformed_df = input_df.withColumn("details", udf_get_details("details_url"))
Even if you did manage to run it through Spark, you would only: