Search code examples
pythondataframeapache-sparkpysparklevenshtein-distance

String matching function between two columns using Levenshtein distance in PySpark


I am trying to compare pairs of names by converting the levenshtein distance between them to a matching coef such as :

coef = 1 - Levenstein(str1, str2) / max(length(str1) , length(str2))

However, when I implement it in PySpark using withColumn(), I get errors whe computing the max() function. Both numpy.max and pyspark.sql.functions.max throw errors. Any idea ?

from pyspark.sql.functions import col, length, levenshtein

valuesA = [('Pirate',1),('Monkey',2),('Ninja',3),('Spaghetti',4)]
TableA = spark.createDataFrame(valuesA,['firstname','id'])

test_compare = TableA.withColumnRenamed('firstname', 'firstname2').withColumnRenamed('id', 'id2').crossJoin(TableA)
test_compare.withColumn("distance_firstname", levenshtein('firstname', 'firstname2') / max(length(col('firstname')), length(col('firstname2'))))

Solution

  • max is an aggregate function, to find greatest between two values you want to use greatest, also from pyspark.sql.functions

    from pyspark.sql.functions import col, length, greatest
    from pyspark.sql.functions import levenshtein  
    valuesA = [('Pirate',1),('Monkey',2),('Ninja',3),('Spaghetti',4)]
    TableA = spark.createDataFrame(valuesA,['firstname','id'])
    
    test_compare = TableA.withColumnRenamed('firstname', 'firstname2').withColumnRenamed('id', 'id2').crossJoin(TableA)
    test_compare.withColumn("distance_firstname", levenshtein('firstname', 'firstname2') / greatest(length(col('firstname')), length(col('firstname2')))).show()