Search code examples
pythonpandasscikit-learndaskparquet

How do I GroupShuffleSplit a parquet dataframe lazily?


I have a parquet dataset that looks like this (I'm using polars, but any dataframe library is fine):

df = pl.DataFrame(
    {
        "match_id": [
            1, 1, 1,
            2, 2, 2, 2,
            3, 3, 3, 3,
        ],
        "team_id": [
            1, 2, 2,
            1, 1, 2, 2,
            1, 2, 2, 3,
        ],
        "player_name": [
            "kevin", "james", "kelly",
            "john", "jenny", "jim", "josh",
            "alice", "kevin", "lilly", "erica",
        ],
    }
)

I would like to group by match_id and test train split such that 80% of matches are in training set, and the rest in test set. So something like this:

group_df = df.group_by(["match_id"])
train, test = group_split(group_df, test_size=0.20)

I need a python solution, preferably with dask, pandas or another dataframe library. Currently pandas doesn't support lazy evaluation, as the dataset is quite large. So it seems out of the question to use pandas. Dask on the other hand doesn't support any of the sklearn.model_selection splitters since it doesn't have integer based indexing support.

Ideally a simple GroupShuffleSplit working with dask is all I need. Is there any other library that supports this? If so, how do I do this with parquet in a lazy way?


Solution

  • Mayby some this like this will work for you.

    However, it is not a perfect answer, it is trying to tackle problem of a big size of data.

    In this solution, GroupShuffleSplit works for each partiotion of a data but not a hole dataset and due to match_id.unique is used resulting train/test could be not 20/80 size at all.

    Solution

    import dask.dataframe as dd
    import numpy as np
    from sklearn.model_selection import GroupShuffleSplit
    
    
    train = []
    test = []
    gss = GroupShuffleSplit(n_splits=1, test_size=0.20, random_state=42)  # Adjust random_state as needed
    
    for i in range(df.npartitions):
        part = df.partitions[i]
        groups = part.match_id.unique().compute()
        train_groups, test_groups = next(gss.split(groups, groups=groups))
        train += [part[part.match_id.isin(train_groups)]]
        test += [part[part.match_id.isin(test_groups)]]
    
    
    # now in test array you will have list of dask dataframes
    # to fetch data from them just concat and compute
    
    dd.concat(test).shape[0].compute() # will give in my case 282_111_648
    
    

    Solution tested with this data

    import polars as pl
    import numpy as np
    from sklearn.model_selection import GroupShuffleSplit
    pl.build_info().get('version')
    # '0.19.2'
    n_rows = 10**6 # 1_000_000 rows
    df = pl.DataFrame([
        pl.Series('match_id', np.random.choice(range(10**3), size=n_rows)), # 1_000 matches
        pl.Series('team_id', np.random.choice(range(10**2), size=n_rows)), # 100 teams
        pl.Series('player_name', np.random.choice([
                "kevin", "james", "kelly",
                "john", "jenny", "jim", "josh",
                "alice", "kevin", "lilly", "erica",
            ], size=n_rows))
    ]).lazy()
    df = pl.concat([df]*1_0000) # 1_000_000_000 rows
    df.collect(streaming=True).write_parquet('test.parquet') # ~5GB