Search code examples
unit-testingapache-sparkpysparkpalantir-foundryfoundry-code-repositories

How to test a transformation in Palantir Foundry?


We try to create a test function for the whole transformation.

import os
from transforms.verbs.testing.TransformRunner import TransformRunner
from transforms.api import Pipeline
from .myproject.datasets import my_transform

# This assumes your test data exists in the folder /test/fixtures/data/ within the repo next to this test
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'fixtures', 'data')

def test_my_transform(spark_session):
    pipeline = Pipeline()
    pipeline.add_transforms(my_transform)

    runner = TransformRunner(pipeline, '/my_fabulous_project', TEST_DATA_DIR)

    output = runner.build_dataset(spark_session, '/my_fabulous_project/output/test')
    assert output.first()['col_c'] == 3

Based on the documentation and this post, we tried to modify the import of the function, but we always get one of these errors:

transforms._errors.TransformTypeError: Expected arguments to be of type <class 'transforms.api._transform.Transform'>

ModuleNotFoundError: No module named 'test.myproject'

ValueError: attempted relative import beyond top-level package

How to create a working end-to-end testing function for a transformation?


Solution

  • After trying out several approaches with different conditions, the following approach seems cleanest to me.

    • no hard-coding paths to datasets
    • it is very explicit about adding/removing transformation inputs
    • in-memory dataframes are used as test inputs

    test_my_transform.py

    from transforms.api import Pipeline
    from transforms.verbs.testing.TransformRunner import TransformRunner
    from transforms.verbs.testing.datastores import InMemoryDatastore
    from myproject.datasets.my_transform import compute_sum
    
    
    def test_compute_sum(spark_session):
    
        df_input1 = spark_session.createDataFrame([
            (0, 2)
        ], ['col_a', 'col_b'])
    
        df_input2 = spark_session.createDataFrame([
            (0, 1)
        ], ['col_a', 'col_b'])
    
        df_expected = spark_session.createDataFrame([
            (0, 1, 1),
            (0, 2, 2)
        ], ['col_a', 'col_b', 'col_c'])
    
        # If @transform_df or @transform_pandas, the key is 'bound_output'
        # If @transform, the key is the name of variable Output
        output_map = {'out': df_expected}
        input_map = {
            'input_a': df_input1,
            'input_b': df_input2,
        }
    
        pipeline = Pipeline()
        pipeline.add_transforms(compute_sum)
        store = InMemoryDatastore()
        for inp_name, inp_obj in pipeline.transforms[0].inputs.items():
            store.store_dataframe(inp_obj.alias, input_map[inp_name])
        path_out = pipeline.transforms[0].outputs[list(output_map)[0]].alias
        runner = TransformRunner(pipeline, datastore=store)
        df_out = runner.build_dataset(spark_session, path_out)
    
        assert df_out.subtract(df_expected).count() == 0
        assert df_expected.subtract(df_out).count() == 0
        assert df_out.schema == df_expected.schema
    

    my_transform.py

    from transforms.api import Input, Output, transform
    from pyspark.sql import functions as F
    
    
    @transform(
        out=Output('/some_foundry_path/my_dir/out3'),
        input_a=Input('/some_foundry_path/my_dir/in'),
        input_b=Input('/some_foundry_path/my_dir/in2'))
    def compute_sum(input_a, input_b, out):
        input_a = input_a.dataframe()
        input_b = input_b.dataframe()
        df = input_a.unionByName(input_b)
        df = df.withColumn('col_c', F.col('col_a') + F.col('col_b'))
        out.write_dataframe(df)