I am trying to join thee RDDs together using the Python Core API, which is through Apache Spark; however, I am having no luck trying to accomplish this.
At the moment, I have these three RDDs with the common attributes:
Now, when joining two of the RDDs together, it works perfectly fine with no issues:
user_rev_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
)
Though, in order to join all three together, I have tried this, but it does not work for me at all for some reason:
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
)
Any assistance on how to join all three of my RDDs together would be very helpful, as I am unsure how to do such a thing correctly. Thanks.
After the first join, the key is user_id
, but you're joining to companies_rdd
with the key being company_id
, so the joining key is incorrect. You need to change the key to company_id
, e.g.
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.map(lambda r: (r[1][1]['company_id'], r[1]))
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
)
To combine the elements from the three RDDs and remove the joining keys after joining, you can add a map
at the end:
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.map(lambda r: (r[1][1]['company_id'], r[1]))
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
.map(lambda r: (*r[1][0], r[1][1]))
)