We try to create a test function for the whole transformation.
import os
from transforms.verbs.testing.TransformRunner import TransformRunner
from transforms.api import Pipeline
from .myproject.datasets import my_transform
# This assumes your test data exists in the folder /test/fixtures/data/ within the repo next to this test
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'fixtures', 'data')
def test_my_transform(spark_session):
pipeline = Pipeline()
pipeline.add_transforms(my_transform)
runner = TransformRunner(pipeline, '/my_fabulous_project', TEST_DATA_DIR)
output = runner.build_dataset(spark_session, '/my_fabulous_project/output/test')
assert output.first()['col_c'] == 3
Based on the documentation and this post, we tried to modify the import of the function, but we always get one of these errors:
transforms._errors.TransformTypeError: Expected arguments to be of type <class 'transforms.api._transform.Transform'>
ModuleNotFoundError: No module named 'test.myproject'
ValueError: attempted relative import beyond top-level package
How to create a working end-to-end testing function for a transformation?
After trying out several approaches with different conditions, the following approach seems cleanest to me.
test_my_transform.py
from transforms.api import Pipeline
from transforms.verbs.testing.TransformRunner import TransformRunner
from transforms.verbs.testing.datastores import InMemoryDatastore
from myproject.datasets.my_transform import compute_sum
def test_compute_sum(spark_session):
df_input1 = spark_session.createDataFrame([
(0, 2)
], ['col_a', 'col_b'])
df_input2 = spark_session.createDataFrame([
(0, 1)
], ['col_a', 'col_b'])
df_expected = spark_session.createDataFrame([
(0, 1, 1),
(0, 2, 2)
], ['col_a', 'col_b', 'col_c'])
# If @transform_df or @transform_pandas, the key is 'bound_output'
# If @transform, the key is the name of variable Output
output_map = {'out': df_expected}
input_map = {
'input_a': df_input1,
'input_b': df_input2,
}
pipeline = Pipeline()
pipeline.add_transforms(compute_sum)
store = InMemoryDatastore()
for inp_name, inp_obj in pipeline.transforms[0].inputs.items():
store.store_dataframe(inp_obj.alias, input_map[inp_name])
path_out = pipeline.transforms[0].outputs[list(output_map)[0]].alias
runner = TransformRunner(pipeline, datastore=store)
df_out = runner.build_dataset(spark_session, path_out)
assert df_out.subtract(df_expected).count() == 0
assert df_expected.subtract(df_out).count() == 0
assert df_out.schema == df_expected.schema
my_transform.py
from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
@transform(
out=Output('/some_foundry_path/my_dir/out3'),
input_a=Input('/some_foundry_path/my_dir/in'),
input_b=Input('/some_foundry_path/my_dir/in2'))
def compute_sum(input_a, input_b, out):
input_a = input_a.dataframe()
input_b = input_b.dataframe()
df = input_a.unionByName(input_b)
df = df.withColumn('col_c', F.col('col_a') + F.col('col_b'))
out.write_dataframe(df)