Search code examples
scalaapache-sparkdistributed-computinglazy-evaluationpartitioning

How to handle redistribution/allocation algorithm using Spark in Scala


Let's say I have a bunch of penguins around the country and I need to allocate food provisioning (which are distributed around the country as well) to the penguins.

I tried to simplify the problem as solving :

Input

The distribution of the penguins by area, grouped by proximity and prioritized as

+------------+------+-------+--------------------------------------+----------+
| PENGUIN ID | AERA | GROUP | PRIORITY (lower are allocated first) | QUANTITY |
+------------+------+-------+--------------------------------------+----------+
| P1         | A    | A1    |                                    1 |        5 |
| P2         | A    | A1    |                                    2 |        5 |
| P3         | A    | A2    |                                    1 |        5 |
| P4         | B    | B1    |                                    1 |        5 |
| P5         | B    | B2    |                                    1 |        5 |
+------------+------+-------+--------------------------------------+----------+

The distribution of the food by area, also grouped by proximity and prioritized as

+---------+------+-------+--------------------------------------+----------+
| FOOD ID | AERA | GROUP | PRIORITY (lower are allocated first) | QUANTITY |
+---------+------+-------+--------------------------------------+----------+
| F1      | A    | A1    |                                    2 |        5 |
| F2      | A    | A1    |                                    1 |        2 |
| F3      | A    | A2    |                                    1 |        7 |
| F4      | B    | B1    |                                    1 |        7 |
+---------+------+-------+--------------------------------------+----------+

Expected output

The challenge is to allocate the food to the penguins from the same group first, respecting the priority order of both food and penguin and then take the left food to the other area.

So based on above data we would first allocate within same area and group as:

Stage 1: A1 (same area and group)

+------+-------+---------+------------+--------------------+
| AREA | GROUP | FOOD ID | PINGUIN ID | ALLOCATED_QUANTITY |
+------+-------+---------+------------+--------------------+
| A    | A1    | F2      | P1         |                  2 |
| A    | A1    | F1      | P1         |                  3 |
| A    | A1    | F1      | P2         |                  2 |
| A    | A1    | X       | P2         |                  3 |
+------+-------+---------+------------+--------------------+

Stage 1: A2 (same area and group)

+------+-------+---------+------------+--------------------+
| AREA | GROUP | FOOD ID | PINGUIN ID | ALLOCATED_QUANTITY |
+------+-------+---------+------------+--------------------+
| A    | A2    | F3      | P3         |                  5 |
| A    | A2    | F3      | X          |                  2 |
+------+-------+---------+------------+--------------------+

Stage 2: A (same area, food left from Stage 1:A2 can now be delivered to Stage 1:A1 penguin)

+------+---------+------------+--------------------+
| AREA | FOOD ID | PINGUIN ID | ALLOCATED_QUANTITY |
+------+---------+------------+--------------------+
| A    | F2      | P1         |                  2 |
| A    | F1      | P1         |                  3 |
| A    | F1      | P2         |                  2 |
| A    | F3      | P3         |                  5 |
| A    | F3      | P2         |                  2 |
| A    | X       | P2         |                  1 |
+------+---------+------------+--------------------+

and then we continue do the same for Stage 3 (across AERA), Stage 4 (across AERA2 (by train), which is a different geography cut than AERA (by truck) so we can't just re-aggregate), 5...

What I tried

I'm well familiar how to do it efficiently with a simple R code using a bunch of For loop, array pointer and creating output row by row for each allocation. However with Spark/Scala i could only end up with big and none-efficient code for solving such a simple problem and i would like to reach the community because its probably just that i missed a spark functionality.

I can do it using a lot of spark row transformation as [withColumn,groupby,agg(sum),join,union,filters] but the DAG creation end up being so big that it start to slow the DAG build up after 5/6 stages. I can go around that by saving the output as a file after each stage but then i got an IO issue as i have millions of records to save per stage.

I can also do it running a UDAF (using .split() buffer) for each stage, explode result then join back to the original table to update each quantities per stage. It does make the DAG much more simple and fast to build but unfortunately likely due to the string manipulation inside the UDAF it is too slow for few partitions.

In the end both of the above method feel wrong as they are more like hacks and there must be a more simple way to solve this issue. Ideally i would prefer use transformation to not loose the lazy-evaluations as this is just one step among many other transformations

Thanks a lot for your time. I'm happy to discuss any suggested approach.


Solution

  • This is psuedocode/description, but my solution to Stage 1. The problem is pretty interesting, and I thought you described it quite well.

    My thought is to use spark's window, struct, collect_list (and maybe a sortWithinPartitions), cumulative sums, and lagging to get to something like this:

    C1   C2  C3  C4    C5   C6                C7   | C8
    P1 | A | A1 | 5  | 0 | [(F1,2), (F2,7)] | [F2] | 2
    P1 | A | A1 | 10 | 5 | [(F1,2), (F2,7)] | []   | -3
    
    C4 = cumulative sum of quantity, grouped by area/group, ordered by priority
    C5 = lag of C4 down a row, and null = 0
    C6 = structure of food / quantity, with a cumulative sum of food quantity
    C7/C8 = remaining food/food ids
    

    Now you can use a plain udf to return the array of food groups that belong to a penguin, since you can find the first instance where C5 < C6.quantity and the first instance where C4 > C6.quantity. Everything in between is returned. If C4 is never larger than C6.quantity, then you can append X. Exploding this result of this array will get you all penguins and if a penguin does not have food.

    To determine whether there is extra food, you can have a udf which calculates the amount of "remaining food" for each row and use a window and row_number to get the the last area that is fed. If remaining food > 0, those food ids have left over food, it will be reflected in the array, and you can also make it struct to map to the number of food items left over.

    I think in the end I'm still doing a fair number of aggregations, but hopefully grouping some things together into arrays makes it faster to do comparisons across each individual item.