Search code examples
pythonapache-sparkapache-spark-sqludf

User defined aggregation function for spark dataframe (python)


I have the below spark dataframe, where id is int and attributes is a list of string

id | attributes
1  | ['a','c', 'd']
2  | ['a', 'e']
1  | ['e', 'f']
1  | ['g']
3  | ['a', 'b']
2  | ['e', 'g']

I need to perform an aggregation, where the attributes lists for each id are concatenated. The results of the aggregation are:

id | concat(attributes)
1  | ['a', 'c', 'd', 'e', 'f', 'g']
2  | ['a', 'e', 'e', 'g']
3  | ['a', 'b']

Is there a way to achieve this using python?

Thanks.


Solution

  • One way is to create a new frame, using reduceByKey:

    >>> df.show()
    +---+----------+
    | id|attributes|
    +---+----------+
    |  1| [a, c, d]|
    |  2|    [a, e]|
    |  1|    [e, f]|
    |  1|       [g]|
    |  3|    [a, b]|
    |  2|    [e, g]|
    +---+----------+
    
    >>> custom_list = df.rdd.reduceByKey(lambda x,y:x+y).collect()
    >>> new_df = sqlCtx.createDataFrame(custom_list, ["id", "attributes"])
    >>> new_df.show()
    +---+------------------+
    | id|        attributes|
    +---+------------------+
    |  1|[a, c, d, e, f, g]|
    |  2|      [a, e, e, g]|
    |  3|            [a, b]|
    +---+------------------+
    

    reduceByKey(func, [numTasks]):

    When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.