I can specify checks in the transform decorator, such as Primary Key. Can I also specify a custom check which applies a lambda function, for example? Thanks!
I read the documentation and couldn't find an existing check type that confirms to my use case.
EDIT: Here's a code example of what I am trying to accomplish. For example, I want to check if an array column only contains distinct elements. The transform check should raise a warning if my UDF returns false. This is how I would implement the check with an extra column (rather than using checks):
df = (
df
.withColumn('my_array_col1', F.array(F.lit('first'), F.lit('second'), F.lit('third')))
.withColumn('my_array_col2', F.array(F.lit('first'), F.lit('first')))
.withColumn('custom_check1', check_for_distinct_array_elements(F.col('my_array_col1')))
.withColumn('custom_check2', check_for_distinct_array_elements(F.col('my_array_col2')))
)
@F.udf
def check_for_distinct_array_elements(arr):
return len(set(arr)) == len(arr)
You could create custom checks, here is an example given your described issue. I just extend the _SizeExpectation
expectation to change how the value to check is computed, and add the eq
function in order to work around the expectation-factory step.
from pyspark.sql import functions as F
from transforms.api import transform_df, Input, Output
from pyspark.sql import Column
from pyspark.sql import types as T
from transforms.expectations.utils._expectation_utils import check_columns_exist, check_column_type
from transforms.expectations.evaluator import EvaluationTarget
from transforms.expectations.column._column_expectation import _SizeExpectation
from transforms.api import Check
import operator
class CountDuplicatesExpectation(_SizeExpectation):
def __init__(self, col, op=None, threshold=None):
super(CountDuplicatesExpectation, self).__init__(col, op, threshold)
self._col = col
self._op = op
self._threshold = threshold
@check_columns_exist
@check_column_type([T.ArrayType], lambda self: self._col)
def value(self, target: EvaluationTarget) -> Column:
return F.size(self._col) - F.size(F.array_distinct(self._col))
def eq(self, target):
return CountDuplicatesExpectation(self._col, operator.eq, target)
@transform_df(
Output(
"<output-rid>",
checks=[
Check(CountDuplicatesExpectation('values').eq(0), 'custom check', 'WARN'),
]
),
...