Search code examples
unit-testingpysparkmockingpytestdatabricks

Mock Requests Function in PySpark UDF


I have the following UDF in PySpark under src/my_funcs/

@F.udf(returnType=T.ArrayType(T.ArrayType(T.StringType())))
def get_details(details_url: str) -> list[list[str]]:
   details = requests.get(details_url)
   details = details.json()
   details = # Some logic
   return details

Now I want to unit test this UDF. For that, I did the following:

from unittest import mock

@mock.patch("src.my_funcs.requests.get")
def test_get_details(mock_requests_get, spark):
   request_json_response = # Expected result of .json()
   mock_requests_get.return_value = mock.Mock(**{"status_code":200, "json.return_value":request_json_response})
   input_data = [(1, "random_invalid_url")]
   input_schema = ["ID", "details_url"]
   input_df = spark.createDataFrame(input_data, input_schema)

   expected_data = # expected array
   expected_schema = ["ID", "details_url", "details"]
   expected_df = spark.createDataFrame(expected_data, expected_schema)
   
   transformed_df = input_df.withColumn("details", get_details("details_url")
   assert sorted(expected_df.collect()) == sorted(transformed_df.collect())

I keep getting the error

requests.exceptions.MissingSchema: Invalid URL 'random_invalid_url': No scheme supplied. Perhaps you meant https://random_invalid_url?

This error makes it appear that my mocking is not working.

If I remove the UDF decorator from the get_details function, and test the function without Spark, the mocking works and the unit test succeeds. So I assume the error is related to how Spark works. How can I fix this and test the UDF on a Spark DataFrame? Is the error because of how Spark executes the UDF? Do I need to mock it in a different way, or is it just not possible?

Not sure if relevant, but the spark session is created as a pytest fixture like this

spark = SparkSession.builder.master("local[*]").appName("UnitTest").getOrCreate()

Solution

  • If you want to unit test your logic inside get_details, I would suggest you extract that code into a separate function and test it (no Spark or mock necessary).

    If you do not want to refactor your code, you can leave get_details as is, but instead of using @F.udf decorator call it as function before passing it to Spark methods (you will need to mock requests but will still avoid Spark):

    udf_get_details = F.udf(get_details, returnType=T.ArrayType(T.ArrayType(T.StringType())))
    transformed_df = input_df.withColumn("details", udf_get_details("details_url"))
    

    Even if you did manage to run it through Spark, you would only:

    1. be checking if Spark is correctly calling UDFs
    2. make the tests run slower