I am trying to compare pairs of names by converting the levenshtein distance between them to a matching coef such as :
coef = 1 - Levenstein(str1, str2) / max(length(str1) , length(str2))
However, when I implement it in PySpark using withColumn(), I get errors whe computing the max() function. Both numpy.max and pyspark.sql.functions.max throw errors. Any idea ?
from pyspark.sql.functions import col, length, levenshtein
valuesA = [('Pirate',1),('Monkey',2),('Ninja',3),('Spaghetti',4)]
TableA = spark.createDataFrame(valuesA,['firstname','id'])
test_compare = TableA.withColumnRenamed('firstname', 'firstname2').withColumnRenamed('id', 'id2').crossJoin(TableA)
test_compare.withColumn("distance_firstname", levenshtein('firstname', 'firstname2') / max(length(col('firstname')), length(col('firstname2'))))
max
is an aggregate function, to find greatest between two values you want to use greatest
, also from pyspark.sql.functions
from pyspark.sql.functions import col, length, greatest
from pyspark.sql.functions import levenshtein
valuesA = [('Pirate',1),('Monkey',2),('Ninja',3),('Spaghetti',4)]
TableA = spark.createDataFrame(valuesA,['firstname','id'])
test_compare = TableA.withColumnRenamed('firstname', 'firstname2').withColumnRenamed('id', 'id2').crossJoin(TableA)
test_compare.withColumn("distance_firstname", levenshtein('firstname', 'firstname2') / greatest(length(col('firstname')), length(col('firstname2')))).show()