Search code examples
pysparkapache-spark-sqlcase

Implement SQL/CASE Statement in Pyspark where a column 'contains' a list of string or a column 'like' a list of string


I am trying to implement a SQL/Case statement type logic in Pyspark. I know there's spark.sql() to run sql code within spark or df.withColumn with expr() but my situation is bit different in that there's a part of the logic that I can't replicate in SQL.

CASE WHEN trim(col1)='000' then '0'
WHEN trim(col1) in ('061', '062', '063', '064', '081', '082', '083', '084','110','112','113','114','115','116')  then 'U'
WHEN ( col1 like '%A%'  | col1 like '%A%' | col1 like '%B%' | col1 like '%C%' | col1 like '%C%' | col1 like '%D%' ... ... )
else 'Y' end as col2

My issue is with line 3 . It needs to cover A-Z all 26 alphabates.

WHEN ( col1 like '%A%'  | col1 like '%A%' | col1 like '%B%' | col1 like '%C%' | col1 like '%C%' | col1 like '%D%' ... ...

'Contains' could have worked - but It works for only single string value. I dont know if there's a way to pass a list of elements for contains - like " col1 contains ('A', 'B', 'C', 'D', 'E', 'F' ... ...) "

So far what I have is something like following .

list_a = ['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','Z']

from functools import reduce
newdf1 = df.where(reduce(lambda a, b: a|b, (df['col1'].like('%'+pat+"%") for pat in list_a)))

from pyspark.sql.functions import lit
newdf1 = newdf1.withColumn("col2", lit('C'))

So the output looks like this :

+---------+----------+--
col1    |  col2        |
+---------+----------+--
      A00|            C|                
      A00|            C|                
      B00|            C|                
      G00|            C| 
      K00|            C|                
      M00|            C| 

But this is not a good solution as it is filtering the dataframe as opposed to keeping the dataframe newdf1 intact. Any suggestions how I can improve this ?


Solution

  • rlike(other): SQL RLIKE expression (LIKE with Regex). Returns a boolean Column based on a regex match. Parameters: other – an extended regex expression

    df = spark.createDataFrame(
    [
    ('A00',),
    ('B00',),
    ('C00',),
    ('K00',),
    ('M00',),
    ('000',),
    ],["col1"])
    
    df.withColumn('col2', when( col('col1').rlike("[a-zA-Z]"), regexp_extract(col('col1'), "[a-zA-Z]", 0) ).otherwise(lit(None))).show(10)
    
    # +----+----+
    # |col1|col2|
    # +----+----+
    # | A00|   A|
    # | B00|   B|
    # | C00|   C|
    # | K00|   K|
    # | M00|   M|
    # | 000|null|
    # +----+----+