Search code examples
pythondatabasepysparkrecommendation-enginetext-comparison

Pyspark find the nearest text


I'm a new user for pyspark. I want to compare text from two different dataframes (containing news information) for recommendation.

I was able to do this very easily with Python:

def get_recommendations(title, cosine_sim, indices):
    idx = indices[title]
    # Get the pairwsie similarity scores
    sim_scores = list(enumerate(cosine_sim[idx]))
    print(sim_scores)
    # Sort the movies based on the similarity scores
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    # Get the scores for 10 most similar movies
    sim_scores = sim_scores[1:11]
    talk_indices = [i[0] for i in sim_scores]
    # Return the top 10 most
    return ted['News Data'].iloc[talk_indices]

indices = pd.Series(det.index, index=det['Unnamed: 0']).drop_duplicates()

transcripts = det['News Data']
transcripts2 = ted['News Data']

tfidf = TfidfVectorizer(stop_words='english')
tfidf_matrix = tfidf.fit_transform(transcripts)
tfidf_matrixx = tfidf.transform(transcripts2)
cosine_sim = linear_kernel(tfidf_matrix, tfidf_matrixx)

print(get_recommendations(0, cosine_sim, indices))

When I switched to pyspark, I got very different results when calculating TF-IDF. I know that computation of Cosine Similarity is required to make "news" recommendations.

I'm using following in Pyspark for tfidf calculation:

df1 = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('bbcclear.csv')
df2 = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('yenisafakcategorypredict.csv')

# tokenize
tokenizer = Tokenizer().setInputCol("News Data").setOutputCol("word")
wordsData = tokenizer.transform(df2)
wordsData2 = tokenizer.transform(df1)

# vectorize
vectorizer = CountVectorizer(inputCol='word', outputCol='vectorizer').fit(wordsData)
wordsData = vectorizer.transform(wordsData)
wordsData2 = vectorizer.transform(wordsData2)

# calculate scores
idf = IDF(inputCol="vectorizer", outputCol="tfidf_features")
idf_model = idf.fit(wordsData)
wordsData = idf_model.transform(wordsData)

idf_model = idf.fit(wordsData2)
wordsData2 = idf_model.transform(wordsData2)

How do I compute Cosine Similarity using ID-IDF obtained above to make recommendation?


Solution

  • Following is an example of TF-IDF usage in Spark from my PoC assignment. I will highly recommend to use advanced NLP frameworks like BERT than TF-IDF for meaningful similarity.

    Sample dataset:

    df = spark.createDataFrame(
        [
            ["cricket sport team player"],
            ["global politics"],
            ["football sport player team"],
        ],
        ["news"]
    )
    
    +--------------------------+
    |news                      |
    +--------------------------+
    |cricket sport team player |
    |global politics           |
    |football sport player team|
    +--------------------------+
    

    TF-IDF vectorisation and cosine similarity computation:

    from pyspark.ml.feature import RegexTokenizer, CountVectorizer, IDF
    from pyspark.ml import Pipeline
    
    regex_tokenizer = RegexTokenizer(gaps=False, pattern="\w+", inputCol="news", outputCol="tokens")
    count_vectorizer = CountVectorizer(inputCol="tokens", outputCol="tf")
    idf = IDF(inputCol="tf", outputCol="idf")
    tf_idf_pipeline = Pipeline(stages=[regex_tokenizer, count_vectorizer, idf])
    df = tf_idf_pipeline.fit(df).transform(df).drop("news", "tokens", "tf")
    df = df.crossJoin(df.withColumnRenamed("idf", "idf2"))
    
    @F.udf(returnType=FloatType())
    def cos_sim(u, v):
      return float(u.dot(v) / (u.norm(2) * v.norm(2)))
    # 
    
    df.withColumn("cos_sim", cos_sim(F.col("idf"), F.col("idf2")))
    
    +--------------------+--------------------+----------+
    |                 idf|                idf2|   cos_sim|
    +--------------------+--------------------+----------+
    |(7,[0,1,2,4],[0.2...|(7,[0,1,2,4],[0.2...|       1.0|
    |(7,[0,1,2,4],[0.2...|(7,[5,6],[0.69314...|       0.0|
    |(7,[0,1,2,4],[0.2...|(7,[0,1,2,3],[0.2...|0.34070355|
    |(7,[5,6],[0.69314...|(7,[0,1,2,4],[0.2...|       0.0|
    |(7,[5,6],[0.69314...|(7,[5,6],[0.69314...|       1.0|
    |(7,[5,6],[0.69314...|(7,[0,1,2,3],[0.2...|       0.0|
    |(7,[0,1,2,3],[0.2...|(7,[0,1,2,4],[0.2...|0.34070355|
    |(7,[0,1,2,3],[0.2...|(7,[5,6],[0.69314...|       0.0|
    |(7,[0,1,2,3],[0.2...|(7,[0,1,2,3],[0.2...|       1.0|
    +--------------------+--------------------+----------+