Search code examples
pythonapache-sparkjoinpysparkrdd

How to join three RDDs using the Python Core API (Apache Spark)?


I am trying to join thee RDDs together using the Python Core API, which is through Apache Spark; however, I am having no luck trying to accomplish this.

At the moment, I have these three RDDs with the common attributes:

  • users_rdd: user_id
  • reviews_rdd: review_id, company_id and user_id
  • companies_rdd: company_id

Now, when joining two of the RDDs together, it works perfectly fine with no issues:

user_rev_rdd = (users_rdd
  .keyBy(lambda user: user['user_id'])
  .join(
      reviews_rdd.keyBy(lambda rev: rev['user_id'])
  )
)

Though, in order to join all three together, I have tried this, but it does not work for me at all for some reason:

user_rev_com_rdd = (users_rdd
  .keyBy(lambda user: user['user_id'])
  .join(
      reviews_rdd.keyBy(lambda rev: rev['user_id'])
  )
 .join(
      companies_rdd.keyBy(lambda com: com['company_id'])
  )
)

Any assistance on how to join all three of my RDDs together would be very helpful, as I am unsure how to do such a thing correctly. Thanks.


Solution

  • After the first join, the key is user_id, but you're joining to companies_rdd with the key being company_id, so the joining key is incorrect. You need to change the key to company_id, e.g.

    user_rev_com_rdd = (users_rdd
        .keyBy(lambda user: user['user_id'])
        .join(
            reviews_rdd.keyBy(lambda rev: rev['user_id'])
        )
        .map(lambda r: (r[1][1]['company_id'], r[1]))
        .join(
            companies_rdd.keyBy(lambda com: com['company_id'])
        )
    )
    

    To combine the elements from the three RDDs and remove the joining keys after joining, you can add a map at the end:

    user_rev_com_rdd = (users_rdd
        .keyBy(lambda user: user['user_id'])
        .join(
            reviews_rdd.keyBy(lambda rev: rev['user_id'])
        )
        .map(lambda r: (r[1][1]['company_id'], r[1]))
        .join(
            companies_rdd.keyBy(lambda com: com['company_id'])
        )
        .map(lambda r: (*r[1][0], r[1][1]))
    )