I am working on a spark cluster and I have two dataframes. One contains text. The other is a look-up table. Both tables are huge (M and N both could easily exceed 100,000 entries). What is the best way to match them?
Doing a cross-join then filtering results based on matches seems like a crazy idea since I would most certainly run out of memory.
My dataframes look something like this:
0 i like apples
1 oranges are good
2 eating bananas is healthy
. ...
. ...
M tomatoes are red, bananas are yellow
0 apples
1 oranges
2 bananas
. ...
. ...
N tomatoes
I am expecting an output dataframe to look something like this:
text extracted_fruits
0 i like apples ['apples']
1 oranges are good ['oranges']
2 eating bananas is healthy ['bananas']
. ...
. ...
M tomatoes are red, bananas are yellow . ['tomatoes','bananas']
One way is to use CountVectorizerModel as 100K lookup words should be manageable for this model (default vocabSize=262144):
The basic idea is to create the CountVectorizerModel based on a customized list from df2
(lookup table). split df1.text
into an array column and then transform this column into a SparseVector which can then be mapped into words:
Edit: in split function , adjusted the regex from \s+
to [\s\p{Punct}]+
so that all punctuation marks are removed. change 'text'
to lower(col('text'))
if the lookup is case insensitive.
from pyspark.ml.feature import CountVectorizerModel
from pyspark.sql.functions import split, udf, regexp_replace, lower
| id|fruit_lookup|
| 0| apples|
| 1| oranges|
| 2| bananas|
| 3| tomatoes|
| 4|dragon fruit|
Edit-2: Added the following df1 pre-process step and create an array column including all N-gram combinations. For each string with L
words, N=2 will add (L-1)
more items in array, if N=3, (L-1)+(L-2)
more items.
# max number of words in a single entry of the lookup table df2
N = 2
# Pre-process the `text` field up to N-grams,
# example: ngram_str('oranges are good', 3)
# --> ['oranges', 'are', 'good', 'oranges are', 'are good', 'oranges are good']
def ngram_str(s_t_r, N):
arr = s_t_r.split()
L = len(arr)
for i in range(2,N+1):
if L - i < 0: break
arr += [ ' '.join(arr[j:j+i]) for j in range(L-i+1) ]
return arr
udf_ngram_str = udf(lambda x: ngram_str(x, N), 'array<string>')
df1_processed = df1.withColumn('words_arr', udf_ngram_str(lower(regexp_replace('text', r'[\s\p{Punct}]+', ' '))))
Implement the model on the processed df1:
lst = [ r.fruit_lookup for r in df2.collect() ]
model = CountVectorizerModel.from_vocabulary(lst, inputCol='words_arr', outputCol='fruits_vec')
df3 = model.transform(df1_processed)
#| text| words_arr| fruits_vec|
#| I like apples| [i, like, apples, i like, like apples]| (5,[0],[1.0])|
#| oranges are good|[oranges, are, good, oranges are, are...| (5,[1],[1.0])|
#| eating bananas is healthy|[eating, bananas, is, healthy, eating...| (5,[2],[1.0])|
#| tomatoes are red, bananas are yellow|[tomatoes, are, red, bananas, are, ye...|(5,[2,3],[1.0,1.0])|
#| test| [test]| (5,[],[])|
#|I have dragon fruit and apples in my bag|[i, have, dragon, fruit, and, apples,...|(5,[0,4],[1.0,1.0])|
Then you can map the fruits_vec back to the fruits using model.vocabulary
vocabulary = model.vocabulary
#['apples', 'oranges', 'bananas', 'tomatoes', 'dragon fruit']
to_match = udf(lambda v: [ vocabulary[i] for i in v.indices ], 'array<string>')
df_new = df3.withColumn('extracted_fruits', to_match('fruits_vec')).drop('words_arr', 'fruits_vec')
#|text |extracted_fruits |
#|I like apples |[apples] |
#|oranges are good |[oranges] |
#|eating bananas is healthy |[bananas] |
#|tomatoes are red, bananas are yellow |[bananas, tomatoes] |
#|test |[] |
#|I have dragon fruit and apples in my bag|[apples, dragon fruit]|
Method-2: As your dataset are not huge in terms of Spark context, the following might work, this will work with lookup value having multiple words as per your comment:
from pyspark.sql.functions import expr, collect_set
, expr('d1.text rlike concat("\\\\b", d2.fruit_lookup, "\\\\b")')
, 'left'
).groupby('text') \
.agg(collect_set('fruit_lookup').alias('extracted_fruits')) \
| text| extracted_fruits|
| oranges are good| [oranges]|
| I like apples| [apples]|
|tomatoes are red,...|[tomatoes, bananas]|
|eating bananas is...| [bananas]|
| test| []|
Where: "\\\\b
is word boundary so that the lookup values do not mess up with their contexts.
Note: you may need to clean up all punctuation marks and redundant spaces on both columns before the dataframe join.