Search code examples
palantir-foundryfoundry-code-repositories

Use custom checks in the Palantir Foundry transform decorator


I can specify checks in the transform decorator, such as Primary Key. Can I also specify a custom check which applies a lambda function, for example? Thanks!

I read the documentation and couldn't find an existing check type that confirms to my use case.

EDIT: Here's a code example of what I am trying to accomplish. For example, I want to check if an array column only contains distinct elements. The transform check should raise a warning if my UDF returns false. This is how I would implement the check with an extra column (rather than using checks):

    df = (
        df
        .withColumn('my_array_col1', F.array(F.lit('first'), F.lit('second'), F.lit('third')))
        .withColumn('my_array_col2', F.array(F.lit('first'), F.lit('first')))
        .withColumn('custom_check1', check_for_distinct_array_elements(F.col('my_array_col1')))
        .withColumn('custom_check2', check_for_distinct_array_elements(F.col('my_array_col2')))
    )

@F.udf
def check_for_distinct_array_elements(arr):
    return len(set(arr)) == len(arr)

Solution

  • You could create custom checks, here is an example given your described issue. I just extend the _SizeExpectation expectation to change how the value to check is computed, and add the eq function in order to work around the expectation-factory step.

    from pyspark.sql import functions as F
    from transforms.api import transform_df, Input, Output
    
    from pyspark.sql import Column
    from pyspark.sql import types as T
    
    from transforms.expectations.utils._expectation_utils import check_columns_exist, check_column_type
    from transforms.expectations.evaluator import EvaluationTarget
    from transforms.expectations.column._column_expectation import _SizeExpectation
    from transforms.api import Check
    import operator
    
    
    class CountDuplicatesExpectation(_SizeExpectation):
    
        def __init__(self, col, op=None, threshold=None):
            super(CountDuplicatesExpectation, self).__init__(col, op, threshold)
            self._col = col
            self._op = op
            self._threshold = threshold
    
        @check_columns_exist
        @check_column_type([T.ArrayType], lambda self: self._col)
        def value(self, target: EvaluationTarget) -> Column:
            return F.size(self._col) - F.size(F.array_distinct(self._col))
    
        def eq(self, target):
            return CountDuplicatesExpectation(self._col, operator.eq, target)
    
    
    @transform_df(
        Output(
            "<output-rid>",
            checks=[
                Check(CountDuplicatesExpectation('values').eq(0), 'custom check', 'WARN'),
            ]
        ),
    ...