I am trying to manipulate two dataframes using PySpark as part of an AWS Glue job.
df1:
item tag
1 AB
2 CD
3 EF
4 QQ
df2:
key1 key2 tags
A1 B1 [AB]
A1 B2 [AB, CD, EF]
A2 B1 [CD, EF]
A2 B3 [AB, EF, ZZ]
I would like to match up the array in df2 with the tag in df1, in the following way:
item key1 key2 tag
1 A1 B1 AB
1 A1 B2 AB
2 A1 B2 CD
2 A2 B1 CD
3 A1 B2 EF
3 A2 B1 EF
3 A2 B3 EF
So, the tag in df1 is used to expand the row based on the tag entries in df2. For example, item 1's tag "AB" occurs in the tags array in df2 for the first two rows.
Also note how 4 gets ignored as the tag QQ does not exist in any array in df2.
I know this is going to be an inner join, but I am not sure how to match up df1.tag with df2.tags to pull in key1 and key2. Any assistance would be greatly appreciated.
You can do a join using an array_contains
condition:
import pyspark.sql.functions as F
result = (df1.join(df2, F.array_contains(df2.tags, df1.tag))
.select('item', 'key1', 'key2', 'tag')
.orderBy('item', 'key1', 'key2')
)
result.show()
+----+----+----+---+
|item|key1|key2|tag|
+----+----+----+---+
| 1| A1| B1| AB|
| 1| A1| B2| AB|
| 1| A2| B3| AB|
| 2| A1| B2| CD|
| 2| A2| B1| CD|
| 3| A1| B2| EF|
| 3| A2| B1| EF|
| 3| A2| B3| EF|
+----+----+----+---+