Search code examples
scalaapache-sparkrdd

Scala RDD - Relaxing data aggregation based on criteria


I have data in this format :

RDD[(String, String, String), Int)]

Which I can represent it like this

|------|------|------------|----------------|
|(Month|Gender|Nationality)|NumberOfCustomer|
|------|------|------------|----------------|
| 1    |  M   |    FRA     |      8         |
| 1    |  F   |    FRA     |      2         |
| 1    |      |    FRA     |      2         |
| 1    |  M   |            |      7         |
| 1    |  F   |            |      2         |
| 1    |  M   |    USA     |      3         |
| 1    |  F   |    USA     |      4         |
| 1    |      |    USA     |      13        |
|------|------|------------|----------------|

Due to some constraints, I can't show data when there is less than 10 customers in it. Hence I need to aggregate data by relaxing some criterias (nationality then gender).

For example since there is not enough (less than 10) Customers for the Month 1 and the Gender M and the Nationality FRA, I need to concatenate the data to the Nationality Other (Unknown). After processing data, I should have something like this :

|------|------|------------|----------------|
|(Month|Gender|Nationality)|NumberOfCustomer|
|------|------|------------|----------------|
| 1    |  M   |  Other     |      15        |
|------|------|------------|----------------|

Same for Month 1, Gender F and Nationality FRA, then USA and so on. The result should be :

|------|------|------------|----------------|
|(Month|Gender|Nationality)|NumberOfCustomer|
|------|------|------------|----------------|
| 1    |      |    FRA     |      2         |
| 1    |  M   |    Other   |      18        |
| 1    |  F   |    Other   |      8         |
| 1    |      |    USA     |      13        |
|------|------|------------|----------------|

After that, there is still not enough Customers for Month 1, Gender Unknown and Nationality FRA. So I need to concatenate it with the Month 1, Gender Unknown and the Nationality with the least Customers in (here USA) The result :

|------|------|------------|----------------|
|(Month|Gender|Nationality)|NumberOfCustomer|
|------|------|------------|----------------|
| 1    |  M   |    Other   |      18        |
| 1    |  F   |    Other   |      8         |
| 1    |      |    USA     |      15        |
|------|------|------------|----------------|

After that, there is still not enough Customers for Month 1, Gender F and Nationality Other.

I need to keep the Month 1 - Gender Unknown - USA Nationality (because there are more than 10 customers) But I need to remove the Nationality criteria to Month 1 - Gender F - Other Nationality because there are only 8 customers in it.

The final final result should be

|------|------|------------|----------------|
|(Month|Gender|Nationality)|NumberOfCustomer|
|------|------|------------|----------------|
| 1    |Other |    Other   |      26        |
| 1    |      |    USA     |      15        |
|------|------|------------|----------------|

My question is how can I achieve this as efficiently as possible with Scala RDD in Apache Spark please ? (There are more than 2 criteria to relax, for example Nationality, then Gender, then Age, then Weight and so on, always in the same order)

EDIT : Code and data added as asked in comment

To get my RDD[(String, String, String), Int)] :

val reducedByKey = myRDD.map(x =>
  (
    (
      x.month,
      x.gender,
      x.nationality
    ), 1
  )
).reduceByKey(_+_)

Some data :

((1,M,FRA),8)
((1,F,FRA),4)
((1,,FRA),46)
((1,M,ENG),13)
((1,F,ENG),40)
((1,M,USA),1)
((1,F,USA),4)
((1,,USA),3)
((2,M,FRA),4)
((2,F,FRA),1)
((2,M,USA),10)
((2,F,USA),4)
((2,,USA),60)

Solution

  • Your question can be summarized as changing the Gender and Nationality columns to Other when NumberOfCustomer column is less than 10

    So if you know of converting your rdd to dataframe as below

    +-----+------+-----------+----------------+
    |Month|Gender|Nationality|NumberOfCustomer|
    +-----+------+-----------+----------------+
    |    1|     M|        FRA|               8|
    |    1|     F|        FRA|               4|
    |    1|      |        FRA|              46|
    |    1|     M|        ENG|              13|
    |    1|     F|        ENG|              40|
    |    1|     M|        USA|               1|
    |    1|     F|        USA|               4|
    |    1|      |        USA|               3|
    |    2|     M|        FRA|               4|
    |    2|     F|        FRA|               1|
    |    2|     M|        USA|              10|
    |    2|     F|        USA|               4|
    |    2|      |        USA|              60|
    +-----+------+-----------+----------------+
    

    You can use the logic I explained above as

    import org.apache.spark.sql.functions._
    df.withColumn("Gender", when($"NumberOfCustomer" < 10, lit("Other")).otherwise($"Gender"))
      .withColumn("Nationality", when($"NumberOfCustomer" < 10, lit("Other")).otherwise($"Nationality"))
      .groupBy("Month","Gender","Nationality").agg(sum("NumberOfCustomer").as("NumberOfCustomer"))
      .show()
    

    You should have your desired result