Search code examples
pythonapache-sparkpyspark

Bad performance over udf function on pyspark


I have a very bad performance with this udf function on pyspark.

I want to filter all rows over my dataframe that matches the specific regex expression on any value of the items in a list.

Where is the bottleneck? (the functions works with short dataframe but the complete universe is very big)

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

import re

# Creamos una función que tome una cadena y verifique si todas las palabras de la cadena están contenidas en la lista de filtro


def string_contains_all(string: str, search_list: set = nom_ciudades_chile_stop_words):
    
    string_is_only_stop_word = True
    string = string.replace(" ", "")
    for stop_word in search_list:
        stop_word = stop_word.replace(" ", "")
    # Creamos una expresión regular que verifique si search se repite exactamente el mismo número de veces que el string string
        regex = f"^({stop_word})+$"
        # Utilizamos re.fullmatch() para verificar si el string string cumple con la expresión regular
        match = re.fullmatch(regex, string)
        # Si hay una coincidencia, entonces search es una multiplicación de string
        if(match is not None):
            string_is_only_stop_word = False
    return string_is_only_stop_word
# Creamos un UDF (User-Defined Function) a partir de la función anterior
string_contains_all_udf = udf(string_contains_all, returnType=BooleanType())
display(df.filter(string_contains_all_udf(col("glosa"))))

Example list to filter by regex: ["LA SERENA","Australia"]

Example DF: enter image description here

Output df: Same df without "LA SERENA LA SERENA"


Solution

  • You can avoid UDF and do something like this, it should perform better that UDF,

    import pyspark.sql.functions as f
    
    search_list = ["LA SERENA","Australia"]
    
    df = df.withColumn("replaced_glosa", f.regexp_replace('glosa', ' ', ''))
    
    df.show(truncate=False)
    
    condn = []
    
    for i in range(len(search_list)):
        c = search_list[i].replace(" ", "")
        condn.append(f"^({c})+$")
    
    condn = "|".join(condn)
    
    print(condn)
    
    df = df.filter(~f.col("replaced_glosa").rlike(condn))
    
    df = df.drop("replaced_glosa")
    df.show(truncate=False)
    

    Output:

    +-------------------------------------+--------------------------------+
    |glosa                                |replaced_glosa                  |
    +-------------------------------------+--------------------------------+
    |LA SERENA LA SERENA                  |LASERENALASERENA                |
    |IMPORTADORA NOVA3PUERTO MONTT        |IMPORTADORANOVA3PUERTOMONTT     |
    |VINTAGE HOUSE CL                     |VINTAGEHOUSECL                  |
    |IMPORTADORA NOVA3SANTIAGO            |IMPORTADORANOVA3SANTIAGO        |
    |VINTAGE HOUSE CHL                    |VINTAGEHOUSECHL                 |
    |IMPORTADORA NOVA3 SPA PUERTO VARAS CL|IMPORTADORANOVA3SPAPUERTOVARASCL|
    +-------------------------------------+--------------------------------+
    
    ^(LASERENA)+$|^(Australia)+$
    +-------------------------------------+
    |glosa                                |
    +-------------------------------------+
    |IMPORTADORA NOVA3PUERTO MONTT        |
    |VINTAGE HOUSE CL                     |
    |IMPORTADORA NOVA3SANTIAGO            |
    |VINTAGE HOUSE CHL                    |
    |IMPORTADORA NOVA3 SPA PUERTO VARAS CL|
    +-------------------------------------+