Search code examples
apache-sparkpysparkapache-spark-sqlregexp-replace

Mask/replace inner part of string column in Pyspark


I have an email column in a dataframe and I want to replace part of it with asterisks. I am unable to figure it out using PySpark functions.

My email column could be something like this"

email_col
abc123@gmail.com
123abc123@yahoo.com

What I want to achieve is this:

mod_email_col
ab**23@gmail.com
12*****23@yahoo.com

So essentially apart from the 1st 2 characters and the last 2 characters, I want the remaining part to be replaced by asterisks.

This is what I tried

from pyspark.sql import functions as F

split_email = F.split(df.email_address, "@")
df = df.withColumn('email_part', split_email.getItem(0))
df = df.withColumn('start', df.email_part.substr(0,2))
df = df.withColumn('end', df.email_part.substr(-2,2))

df.withColumn(
    'masked_part', 
     F.expr("regexp_replace(email_part, email_part[email_part.index(start)+len(start):email_part.index(end)], '*')")
).show(n=5)

Solution

  • I think you can achieve this with the help of following regular expression: (?<=.{2})\w+(?=.{2}@)

    • (?<=.{2}): Positive lookbehind for 2 characters
    • \w+: Any word characters
    • (?=.{2}@): Positive lookahead for 2 characters followed by a literal @

    First use regexp_extract to extract this pattern from your string.

    from pyspark.sql.functions import regexp_extract, regexp_replace
    
    df = df.withColumn(
        "pattern", 
        regexp_extract("email", r"(?<=.{2})\w+(?=.{2}@)", 0)
    )
    df.show()
    #+-------------------+-------+
    #|              email|pattern|
    #+-------------------+-------+
    #|   abc123@gmail.com|     c1|
    #|123abc123@yahoo.com|  3abc1|
    #|      abcd@test.com|       |
    #+-------------------+-------+
    

    Then use regexp_replace to create a replacement of * of the same length.

    df = df.withColumn(
        "replacement",
        regexp_replace("pattern", r"\w", "*")
    )
    df.show()
    #+-------------------+-------+-----------+
    #|              email|pattern|replacement|
    #+-------------------+-------+-----------+
    #|   abc123@gmail.com|     c1|         **|
    #|123abc123@yahoo.com|  3abc1|      *****|
    #|      abcd@test.com|       |           |
    #+-------------------+-------+-----------+
    

    Next use regexp_replace again on the original email column using the derived pattern and replacement columns.

    To be safe, concat the lookbehind/lookaheads from the original pattern when doing the replacment. To do this, we will have to use expr in order to pass the column values as parameters.

    from pyspark.sql.functions import concat, expr, lit
    
    df = df.withColumn(
        "mod_email_col",
        expr("regexp_replace(email, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)")
    )
    df.show()
    #+-------------------+-------+-----------+-------------------+
    #|              email|pattern|replacement|      mod_email_col|
    #+-------------------+-------+-----------+-------------------+
    #|   abc123@gmail.com|     c1|         **|   ab**23@gmail.com|
    #|123abc123@yahoo.com|  3abc1|      *****|12*****23@yahoo.com|
    #|      abcd@test.com|       |           |      abcd@test.com|
    #+-------------------+-------+-----------+-------------------+
    

    Finally drop the intermediate columns:

    df = df.drop("pattern", "replacement")
    df.show()
    #+-------------------+-------------------+
    #|              email|      mod_email_col|
    #+-------------------+-------------------+
    #|   abc123@gmail.com|   ab**23@gmail.com|
    #|123abc123@yahoo.com|12*****23@yahoo.com|
    #|      abcd@test.com|      abcd@test.com|
    #+-------------------+-------------------+
    

    Note: I added one test case to show that this does nothing if the email address part is 4 characters or less.


    Update: Here are some ways you can handle edge cases where the email address part is less than 4 characters.

    The rules I am using:

    • Email address length is more than 5: do the above
    • Email address length is 3, 4, or 5: keep the first and last characters, masking the others with *
    • Email address is length 1 or 2: mask single the character before the @

    Code:

    patA = "regexp_replace(email, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)"
    patB = "regexp_replace(email, concat('(?<=.{1})', pattern, '(?=.{1}@)'), replacement)"
    
    from pyspark.sql.functions import regexp_extract, regexp_replace
    from pyspark.sql.functions import concat, expr, length, lit, split, when
    
    df.withColumn("address_part", split("email", "@").getItem(0))\
    .withColumn(
        "pattern", 
        when(
            length("address_part") > 5, 
            regexp_extract("email", r"(?<=.{2})\w+(?=.{2}@)", 0)
        ).otherwise(
            regexp_extract("email", r"(?<=.{1})\w+(?=.{1}@)", 0)
        )
    ).withColumn(
        "replacement", regexp_replace("pattern", r"\w", "*")
    ).withColumn(
        "mod_email_col",
        when(
            length("address_part") > 5, expr(patA)
        ).when(
            length("address_part") > 3, expr(patB)
        ).otherwise(regexp_replace('email', '\w(?=@)', '*'))
    ).drop("pattern", "replacement", "address_part").show()
    #+-------------------+-------------------+
    #|              email|      mod_email_col|
    #+-------------------+-------------------+
    #|   abc123@gmail.com|   ab**23@gmail.com|
    #|123abc123@yahoo.com|12*****23@yahoo.com|
    #|     abcde@test.com|     a***e@test.com|
    #|      abcd@test.com|      a**d@test.com|
    #|        ab@test.com|        a*@test.com|
    #|         a@test.com|         *@test.com|
    #+-------------------+-------------------+