Search code examples
pythonpandasdaskdask-distributeddask-dataframe

Dask rolling function fails with message to repartition dataframe


I'm getting this error when I run a dask rolling function to calculate a moving average:

df['some_value'].rolling(10).mean() 

Error:

Partition size is less than overlapping window size. Try using "df.repartition" to increase the partition size.

What is this message? Why it's asking to repartition the dataframe?


Solution

  • The error is raised because given the partition size and the rolling window, dask would need to get data from multiple partitions to calculate the result. This could be done in principle, but the current dask.dataframe.rolling implementation supports only getting information from one additional partition.

    Here's an example:

    from dask.datasets import timeseries
    
    df = timeseries(freq="1h")  # each partition has 24 rows
    
    # this will work
    _ = df["x"].rolling(25).mean().compute()
    
    # this will not work
    _ = df["x"].rolling(26).mean().compute()
    

    Note that here each partition has 24 rows (each row represents an hour), so if we ask dask to compute rolling mean for last 25 values, then for the first row in the second partition it will need to get 24 previous values, which are conveniently in a single partition.

    If we ask for rolling mean of last 26 values, then for the first row in the second partition we will need to get previous 25 values: 24 of them are on the previous partition, but one more is on a partition before that. So here dask will trigger an error.

    To avoid this error you would want to use either sufficiently small window values OR sufficiently large partitions. For example, the code above will not trigger an error if we repartition the dataframe to contain twice as many rows per partition (so reduce the number of partitions by 2):

    new_df = df.repartition(npartitions=df.npartitions // 2)
    
    # this will work now
    _ = new_df["x"].rolling(26).mean().compute()