Search code examples
pythonapache-sparkpysparkmultiple-columnsmultiple-conditions

How to check multiple columns for matching regex rules to output in another column in PySpark


My dataframe:

enter image description here

I have to check if the value in each column matches a certain rule. For example:

  • If column 'a' has a number, column 'b' has xx or yy, column 'c' has 1 or 2, and column 'd' has 0 -> then the output should be 'output1'
  • It is not necessary that all columns should have rules. If a rule does not exist then it should simply ignore it. E.g., for 'output3', it does not matter what is there in column 'c'.
  • If it does not match any rules, it should say 'no matches found'.

Since there are so many rules, I created a dictionary of regex rules as follows:

rules_dict = 
{'output1': {'a': '^[0-9]*$',
             'b': 'xx | yy',
             'c': '^[1-2]*$',
             'd': '0'},

 'output2': {'a': '^[a-z]+$',
             'b': 'xx | yy',
             'c': '1',
             'd': '0'},

 'output3': {'a': '^[a-zA-Z0-9_.-]*$',
             'b': 'xx | yy',
             'd': '0'},

 'output4': {'a': '^[0-9]*$',
             'b': 'xx | yy',
             'c': '^[1-2]*$',
             'd': '0'}
}

The expected output:

enter image description here

I used the following PySpark script:

for out in rules_dict.keys():
    for column, rule in rules_dict[out].items():
        output_df = df.withColumn('output', F.when(df[column].rlike(rule), out).otherwise('no matches found'))
output_df.show()

But the output is:

enter image description here

P.S: I am doing it for a large dataset, with a large number of rules. I have only created a sample for simplifying the question.


Solution

  • @ZygD's answer is very good, but you could also use some for comprehensions and functools so that you don't have to for loop over the config. The other differences are that I am using coalesce to get the first not-null result:

    df.withColumn("output", F.coalesce(
        *[
            F.when(
                functools.reduce(lambda x, y: x & y, [F.col(column).rlike(rule) for column, rule in rules_dict[out].items()]),
                F.lit(out)
            )
            for out in rules_dict.keys()
        ],
        F.lit('no matches found')
    )).show()