Search code examples
pythonpandaspyarrowapache-arrow

Getting the Most Frequent Value by An Aggregated Group in a Pyarrow Table


I have a pyarrow table that looks something like the following:

import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc

test_table = pa.table([
    pa.array(["a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "c", "c", "c", "c", "c", "d", "d", "d", "d", "e", "e", "e", "e", "e", "f", "f", "f", "f", "f", "f"]),
    pa.array([1, 2, 3, 4, 4, 1, 1, 1, 1, 1, 5, 5, 5, 5, 4, 6, 6, 8, 9, 7, 7, 7, 7, 10, 11, 12, 33, 22, 55, 22]),
    pa.array([1, 3, 2, 2, 4, 1, 1, 1, 1, 1, 4, 4, 4, 5, 4, 8, 8, 8, 9, 10, 10, 22, 12, 10, 11, 12, 33, 22, 55, 22]),
    pa.array([1, 2, 3, 3, 4, 1, 1, 1, 1, 1, 6, 6, 5, 6, 4, 6, 6, 6, 9, 7, 33, 22, 22, 10, 11, 12, 33, 22, 55, 22])
], names=["ID", "Col1", "Col2", "Col3"])

I need to get the mean, min/max, and most frequent values of the columns, most of which is easily done in the documentation: https://arrow.apache.org/docs/python/compute.html#py-grouped-aggrs

And I can implement with examples like the following:

output = "mean"

grenerator = [([x for x in test_table.schema.names if "Col" in x][i], output) for i in range(len([x for x in test_table.schema.names if "Col" in x]))]

test_table.group_by("ID").aggregate(grenerator).to_pandas()

output = "max"

grenerator = [([x for x in test_table.schema.names if "Col" in x][i], output) for i in range(len([x for x in test_table.schema.names if "Col" in x]))]

test_table.group_by("ID").aggregate(grenerator).to_pandas()

However, getting the most frequent value (aka, mode) appears to be a computational bottleneck as while the function is there for computing the value across an array: https://arrow.apache.org/docs/python/generated/pyarrow.compute.unique.html

It is not present for group-level aggregation.

I have implemented the following for now:

grenerator = test_table.schema.names
output_table = pd.DataFrame(columns = grenerator)
for parts in range(len(pa.compute.unique(test_table["ID"]).to_pandas())):
    temp = test_table.filter(pa.compute.field("ID") == pa.compute.unique(test_table["ID"])[parts])
    output_table.loc[parts, "ID"] = pa.compute.unique(test_table["ID"])[parts].as_py()
    if len(test_table.filter(pa.compute.field("ID") == pa.compute.unique(test_table["ID"])[parts])) > 0:
        for bits in range(len([x for x in output_table.columns if "Col" in x])):
            output_table.loc[parts, [x for x in output_table.columns if "Col" in x][bits]] = pc.mode(temp[[x for x in output_table.columns if "Col" in x][bits]])[0][0].as_py()
    else:
        print(test_table["ID"][parts].as_py() + " has no data. Moving on.....")
        continue

print(output_table)

But it takes around 36 minutes to run with my actual data as a cell-wise loop as opposed to 2.5 seconds or less for the mean and max using the aggregate function.

I realize that there is a way to implement user defined functions in pyarrow: https://arrow.apache.org/docs/python/compute.html#user-defined-functions

But I'm not sure where to begin if I were to create such a thing.

Any suggestions as to how I might be able to apply pc.compute.mode across columns, create a custom group aggregation function and apply it across the columns, or otherwise speed up the workflow, would be greatly appreciated!

Thank you in advance.


Solution

  • You can implement mode with a couple of GroupBy and a sort.

    (
        test_table.group_by(["ID", "Col1"])
        .aggregate([([], "count_all")])
        .sort_by("count_all")
        .group_by(["ID"], use_threads=False)
        .aggregate([("Col1", "last")])
    )
    
    ID Col1_last
    a 4
    b 1
    c 5
    d 6
    e 7
    f 22

    You'll have to do it for each column and them join each column's result back together.

    PS: there is an issue for it, but mode isn't the simplest aggregation to implement given it requires an unbounded state https://github.com/apache/arrow/issues/20359