Search code examples
pysparkpalantir-foundry

Creating a schema data health expectation in Palantir Foundry Code Repositories


I have a dataset that is the output of a Python transform defined in Palantir Foundry Code Repository. It has certain columns, but given that over time the data may change I want to validate these columns(around 73) holds in the future.

How can I create a data health expectation or check to ensure that all 73 columns holds in future?


Solution

  • You can use expectations to make assertions about which columns exist in your output schema.

    See the official docs for schema expectations.

    There are 3 kinds of schema expectations:

    # Assert some columns exist.
    E.schema().contains({'col1': type1, 'col2': type2})
    
    # Assert the schema contains only columns from the given set (but not necessarily all of them).
    E.schema().is_subset_of({'col1': type1, 'col2': type2})
    
    # Assert the schema contains exactly the given columns.
    E.schema().equals({'col1': type1, 'col2': type2})
    

    Additionally, for checking a single column, you can use E.col('col1').exists(). But for 73 columns you're better off going with E.schema().

    So for a more fleshed-out example, you might have something like:

    from transforms.api import transform_df, Check, Input, Output
    import transforms.expectations as E
    from pyspark.sql import types as T
    
    COLUMNS_WHICH_MUST_EXIST = {
        'string_column': T.StringType(),
        'number_column': T.IntegerType(),
        # ...and 71 more.
    }
    
    @transform_df(
        Output("ri.foundry.main.dataset.abcdef", checks=[
            Check(E.schema().contains(COLUMNS_WHICH_MUST_EXIST), "contains important columns"),
        ]),
        input_data=Input("ri.foundry.main.dataset.12345678"),
    )
    def compute(input_data):
        # ... your logic here
    

    Also see the official docs for expectation checks for more details of the options available.