I have a dataset that is the output of a Python transform defined in Palantir Foundry Code Repository. It has certain columns, but given that over time the data may change I want to validate these columns(around 73) holds in the future.
How can I create a data health expectation or check to ensure that all 73 columns holds in future?
You can use expectations to make assertions about which columns exist in your output schema.
See the official docs for schema expectations.
There are 3 kinds of schema expectations:
# Assert some columns exist.
E.schema().contains({'col1': type1, 'col2': type2})
# Assert the schema contains only columns from the given set (but not necessarily all of them).
E.schema().is_subset_of({'col1': type1, 'col2': type2})
# Assert the schema contains exactly the given columns.
E.schema().equals({'col1': type1, 'col2': type2})
Additionally, for checking a single column, you can use E.col('col1').exists()
. But for 73 columns you're better off going with E.schema()
.
So for a more fleshed-out example, you might have something like:
from transforms.api import transform_df, Check, Input, Output
import transforms.expectations as E
from pyspark.sql import types as T
COLUMNS_WHICH_MUST_EXIST = {
'string_column': T.StringType(),
'number_column': T.IntegerType(),
# ...and 71 more.
}
@transform_df(
Output("ri.foundry.main.dataset.abcdef", checks=[
Check(E.schema().contains(COLUMNS_WHICH_MUST_EXIST), "contains important columns"),
]),
input_data=Input("ri.foundry.main.dataset.12345678"),
)
def compute(input_data):
# ... your logic here
Also see the official docs for expectation checks for more details of the options available.