Search code examples
pythonpandasnumpydaskdask-delayed

Dask/pandas apply function and return multiple rows


I'm trying to return a dataframe from the dask map_partitions function. The example code I provided returns a 2 row dataframe in the function. However only 1 row is shown in the end result. Which is in this case only the column name row. I removed the column names in previous test examples but even then only 1 row is shown. I also have this exact same result with pandas only.

How can I make this map_partitions function return multiple rows (or dataframe with multiple rows) to a new dask dataframe? A solution with dask delayed might even be better. I need to apply this function on every cell of the dataframe and the result should be a complete new dataframe (with more rows) based on every cell of the dataframe.

Current result

Dask
0               0          1          2          3 ...
1               0          1          2          3 ...
2               0          1          2          3 ...
3               0          1          2          3 ...
4               0          1          2          3 ...

Desired result:

Dask
           0          1          2          3          4
0  11.760715  14.591147   3.058529  19.868252  22.714292
1  10.601743  21.634348  17.443206  13.619830  13.574586
2  16.346402   2.80519    8.610979  11.656930  23.822052
3   3.100282  17.24039   10.871604  13.625602  22.695311
4  17.240093  23.069574   0.832129  22.055441   3.771150
5  22.676472  23.644936  10.721542  10.563838  17.297389
6  12.54929    0.988218  16.113930  19.572034   7.090997
7  11.76189   10.733782   3.819583   6.998412  14.439809
8  19.371690   5.172882  19.620361   3.148623  23.348465
9   5.924958  14.746566   9.069269   0.560508  15.120616

Example code

import pandas as pd
import dask.dataframe
import numpy as np

def myfunc():
    data1 = np.random.uniform(low=0, high=25, size=(5,))
    data2 = np.random.uniform(low=0, high=25, size=(5,))

    # Just a example dataframe to show
    df = pd.DataFrame([data1, data2])
    
    return df

df = pd.DataFrame({
    'val1': [1, 2, 3, 4, 5],
    'val2': [1, 2, 3, 4, 5]
})

ddf = dask.dataframe.from_pandas(df, npartitions=2)

output = ddf.map_partitions(lambda part: part.apply(lambda x: myfunc(), axis=1), meta=object).compute()

print('\nDask\n',output)

Solution

  • TRY: You can just flatten the output result before printing using pandas concat()

    output = pd.concat(output.to_list(), ignore_index=True)
    
    print('\nDask\n',output)
    

    This would give the result in a single dataframe

    Output (Random data but the result should be like the desired):

    Dask
                0          1          2          3          4
    0   5.718550   6.237734  21.148321  23.136265  11.644001 
    1   4.154657  12.591685  11.868645  11.260228   3.802258 
    2   6.688080   6.709124   9.170346  12.900095   7.538030 
    3  16.818043  18.826502  23.405016  15.024944  24.822155 
    4   4.405004  22.673484  11.130296   1.411436  21.202253 
    5   6.420442   1.414739   2.240358   7.151456   4.942321 
    6   7.443220  21.675140  20.287533  11.467862  12.751785 
    7  17.511607  17.788686  17.326715  24.051668   4.398992 
    8   0.881609   8.175566  23.253465   8.862715  19.432905 
    9   2.645422  10.262120  23.801481  16.172546  18.551709