Search code examples
dataframeapache-sparkpysparksumaggregate

Sum of weighted values


My original data frame looks like this one below, you can generate data frame by code at the end.

+-----+-------+---+
| name|ts_week|tag|
+-----+-------+---+
|  Bob|  week1|  a|
|  Bob|  week1|  b|
|  Bob|  week1|  c|
|  Bob|  week2|  a|
|  Bob|  week2|  b|
|  Bob|  week2|  d|
|  Bob|  week3|  c|
|  Bob|  week3|  d|
|  Bob|  week4|  a|
|  Bob|  week4|  d|
|Allen|  week1|  a|
|Allen|  week2|  c|
|Allen|  week3|  a|
|Allen|  week3|  b|
|Allen|  week4|   |
+-----+-------+---+

For all tags in week 1 (each time we only consider all tags in one month time-window, whatever happens in week 5,6,7,8 has nothing to do with this question) get 0.1 points, in week 2 get 0.2 points, in week 3 get 0.3, and tags in week4 get 0.1 points.

And finally, we get the points for each name:

Bob:                 
a= 0.7
b= 0.3
c= 0.4
d= 0.7

Allen:
a= 0.4
b= 0.3
c= 0.2

Thus, for Bob his tag for week 4 (target week in this one) is a (I didn’t get rule about dealing tier-up tag so I pick one), for Allen his tag for week 4 is a. In both two cases, 'a' gets higher score.

FYI, this data frame is quite large, contains millions row, so based on what I know using anything in PySpark pandas may create OOM problem, maybe because it stores all in memory directly.

data_ls = [('Bob', 'week1', 'a'),
                ('Bob', 'week1', 'b'),
                ('Bob', 'week1', 'c'),
                ('Bob', 'week2', 'a'),
                ('Bob', 'week2', 'b'),
                ('Bob', 'week2', 'd'),
                ('Bob', 'week3', 'c'),
                ('Bob', 'week3', 'd'),
                ('Bob', 'week4', 'a'),
                ('Bob', 'week4', 'd'),
                ('Allen', 'week1', 'a'),
                ('Allen', 'week2', 'c'),
                ('Allen', 'week3', 'a'),
                ('Allen', 'week3', 'b'),
                ('Allen', 'week4', '')]
data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['name', 'ts_week', 'tag'])

Solution

  • An approach creating a when condition and then aggregating.

    score = F
    for k, v in {'week1': .1, 'week2': .2, 'week3': .3, 'week4': .1}.items():
        score = score.when(F.col('ts_week') == k, v)
    
    data_sdf = (data_sdf
        .groupBy('name', 'tag')
        .agg(F.round(F.sum(score), 1).alias('w_score')))
    
    data_sdf.show()
    # +-----+---+-------+
    # | name|tag|w_score|
    # +-----+---+-------+
    # |  Bob|  c|    0.4|
    # |  Bob|  d|    0.6|
    # |  Bob|  b|    0.3|
    # |  Bob|  a|    0.4|
    # |Allen|  a|    0.4|
    # |Allen|  b|    0.3|
    # |Allen|   |    0.1|
    # |Allen|  c|    0.2|
    # +-----+---+-------+