Search code examples
apache-sparkpysparkapache-spark-sqlregexp-replace

How can I write conditional regex replace in PySpark?


I'm trying to get an if else statement working in the form of when() and otherwise() function. I tried thinking of many ways but my problem is the when() statement needs a column. Is there a way to get the following code working in when() and otherwise() format?

get_data = spark.sql("SELECT STRING({}) AS {} FROM {} WHERE Mobile='{}'".format(dynamic_tag_mapping_column_name, match[0], dynamic_tag_mapping_table_name, mobile_numbers[mob])).collect()[0][0]
if get_data!='0':
     textList.append(campaign_segment_text.withColumn('CampaignSMSText', func.expr("regexp_replace(CampaignSMSText, '{}', {})".format(match[0], get_data)))
else:
     textList.append(campaign_segment_text.withColumn('CampaignSMSText', func.expr("regexp_replace(CampaignSMSText, '{}', {})".format(match[0], dynamic_default_value)))

What I'm trying to achieve is if I get the get_data>0 value from database it will take the get_data value but if get_data=0, it will assign a default value that is set. I tried setting the String value to Int that I'm getting from the DB but that won't compare because get_data isn't a column. Is there anyway to get this working?


Solution

  • Try putting case when inside F.expr?

    get_data = spark.sql("SELECT {} AS {} FROM {} WHERE Mobile='{}'".format(dynamic_tag_mapping_column_name, match[0], dynamic_tag_mapping_table_name, mobile_numbers[mob])).collect()[0][0]
    
    textList.append(
        campaign_segment_text.withColumn(
            'CampaignSMSText',
            func.expr(
                "regexp_replace(CampaignSMSText, '{}', case when {} != 0 then {} else {} end)".format(
                    match[0], get_data, get_data, dynamic_default_value
                )
            )
        )
    )