Search code examples
pythondata-sciencecudf

python cuDF groupby apply with ordered data


I have some ordered data where there is a hierarchy of events. Each column is a unique id of an event with respect to the event above it in the hierarchy. Something similar to how each day number is unique in a month, and each month number is unique in a year. I want to get the lowest level to be unique within the highest level, like making every day unique in a year by numbering from 1 to 365. My use case is not specific to days, months, and years.

Before:

| ID | EVENT_1 | EVENT_2 | EVENT_3 |
| -- | ------- | ------- | ------- |
|  1 |       1 |       1 |       1 |
|  1 |       1 |       1 |       2 |
|  1 |       1 |       1 |       3 |
|  1 |       1 |       2 |       1 |
|  1 |       1 |       2 |       2 |
|  1 |       1 |       3 |       1 |
|  1 |       1 |       3 |       2 |
|  1 |       2 |       1 |       1 |
|  1 |       2 |       1 |       2 |

After:

| ID | EVENT_1 | EVENT_2 | EVENT_3 | EVENT_3A |
| -- | ------- | ------- | ------- | -------- |
|  1 |       1 |       1 |       1 |        1 |
|  1 |       1 |       1 |       2 |        2 |
|  1 |       1 |       1 |       3 |        3 |
|  1 |       1 |       2 |       1 |        4 |
|  1 |       1 |       2 |       2 |        5 |
|  1 |       1 |       3 |       1 |        6 |
|  1 |       1 |       3 |       2 |        7 |
|  1 |       2 |       1 |       1 |        1 |
|  1 |       2 |       1 |       2 |        2 |

The goal is to get a column where for each id, there is an EVENT_3A such that EVENT_3A is the order in which EVENT_3 happens with respect to EVENT_1 (as if there was no EVENT_2). Additionally, there are many IDs which this must be calculated independently for. Right now I am doing this on the CPU, but it takes a long time so I would like to switch to doing it on a GPU.

My main idea is to do a groupby('ID').apply_grouped() or groupby('ID').agg() but I do not know what to put in the apply_grouped() or agg() functions. I was doing this before with dask on the CPU, but it was more intuitive because the grouped DataFrame was passed directly to the apply() function. It seems that in cuDF I have to pass the incols and I am unable to figure out how to treat those as a DataFrame.

There are approximately 5,000 IDs, so ideally each grouped ID would be processed by a core in the GPU, but I am not sure if it can work like that since I am new to programming for a GPU.

Any suggestions or solutions are helpful, thank you.


Solution

  • The goal is to get a column where for each id, there is an EVENT_3A such that EVENT_3A is the order in which EVENT_3 happens with respect to EVENT_1 (as if there was no EVENT_2).

    What you are describing is a groupby cumulative count operation with the keys as [ID, EVENT_1]. It's not yet implemented in cuDF, so you would want to use a user defined function. For example:

    Your setup:

    import cudf
    from numba import cuda
    import numpy as np
    ​
    data = {
        "ID":[1,1,1,1,1,1,1,1,1],
        "EVENT_1":[1,1,1,1,1,1,1,2,2,],
        "EVENT_2":[1,1,1,2,2,3,3,1,1],
        "EVENT_3":[1,2,3,1,2,1,2,1,2]
    }
    
    ​
    gdf = cudf.DataFrame(data)
    print(gdf)
       ID  EVENT_1  EVENT_2  EVENT_3
    0   1        1        1        1
    1   1        1        1        2
    2   1        1        1        3
    3   1        1        2        1
    4   1        1        2        2
    5   1        1        3        1
    6   1        1        3        2
    7   1        2        1        1
    8   1        2        1        2
    
    

    We can and should use apply_grouped here. I encourage you to look at the documentation to fully understand what's going on here, but at a high level we can use the within-group thread index as the index of that row as the count. We pass the EVENT_3 column, so we make sure the column name and function argument matches.

    def cumcount(EVENT_3, cumcount):
        for i in range(cuda.threadIdx.x, len(EVENT_3), cuda.blockDim.x):
            cumcount[i] = i + 1 # since your exmaple counts start with 1 rather than 0
    
    
    results = gdf.groupby(["ID", "EVENT_1"]).apply_grouped(cumcount,
                                   incols=['EVENT_3'],
                                   outcols=dict(cumcount=np.int32))
    
    print(results.sort_index()) # get the original row order, for demonstration
       ID  EVENT_1  EVENT_2  EVENT_3  cumcount
    0   1        1        1        1         1
    1   1        1        1        2         2
    2   1        1        1        3         3
    3   1        1        2        1         4
    4   1        1        2        2         5
    5   1        1        3        1         6
    6   1        1        3        2         7
    7   1        2        1        1         1
    8   1        2        1        2         2
    

    As a sanity check, you can prove these results match pandas on larger data.

    n_ids = 5000
    n_rows = 10000000
    ​
    df = pd.DataFrame({
        "ID": np.random.choice(range(n_ids), n_rows),
        "EVENT_1": np.random.choice(range(500), n_rows),
        "EVENT_2": np.random.choice(range(500), n_rows),
        "EVENT_3": np.random.choice(range(n_ids), n_rows)
    })
    
    gdf = cudf.from_pandas(df)
    results = gdf.groupby(["ID", "EVENT_1"]).apply_grouped(cumcount,
                                   incols=['EVENT_3'],
                                   outcols=dict(cumcount=np.int32))
    results = results.sort_index()
    
    pdf_res = df.groupby(["ID", "EVENT_1"]).EVENT_3.cumcount() + 1
    print(pdf_res.astype("int32").equals(results['cumcount'].to_pandas()))
    True
    

    Please note that using df.groupby([ID, EVENT_1]).EVENT_3.cumcount() + 1 in pandas is likely quite fast if you have < 1 million rows and a reasonable number of groups, as groupby cumcount is fairly efficient. With that said, the cuDF UDF will be much faster at scale.