Search code examples
pythondask

Use dask to map over an array and return a dataframe


I am using dask and zarr to operate over some very large images.

I have a pipeline set up that performs some transformations to these images and I would then like to measure properties of the image using the regionprops and regionprops_table functions from skimage. This takes a matrix as an input and returns a data frame. I can't use map_overlap as this rewuires a matrix to be returned but I would like something sematically similar to this:

import numpy as np
import dask.array as da
import pandas as pd

mask = np.zeros((1000, 1000), dtype=int)
mask[100:200, 100:200] = 1
mask[300:400, 300:400] = 2
mask[500:600, 500:600] = 3
mask = da.from_array(mask, chunks=(200, 200))

def get_data_frame(mask):
    res = regionprops_table(mask, properties=('label', 'area', 'eccentricity'))
    df = pd.DataFrame(res)
    return df

mask.map_overlap(get_data_frame, depth=50, boundary=None).compute()
    

Returning either a pandas data frame or a dask data frame but I would like each chunk to be dealt with in parallel.


Solution

  • I'll use overlap and to_delayed to convert the Dask Array to Delayed objects:

    import numpy as np
    import dask.array as da
    from dask import delayed
    import dask.dataframe as dd
    import pandas as pd
    from skimage.measure import regionprops_table
    
    mask = np.zeros((1000, 1000), dtype=int)
    mask[100:200, 100:200] = 1
    mask[300:400, 300:400] = 2
    mask[500:600, 500:600] = 3
    mask = da.from_array(mask, chunks=(200, 200))
    mask
    
    def get_data_frame(mask):
        res = regionprops_table(mask, properties=('label', 'area', 'eccentricity'))
        df = pd.DataFrame(res)
        return df
    
    mask_overlaped = da.overlap.overlap(mask, depth=50, boundary=None)
    delayed_chunks = mask_overlaped.to_delayed()
    delayed_frames = []
    
    # there is probably a better way to iterate here
    for i in range(delayed_chunks.shape[0]):
        for j in range(delayed_chunks.shape[1]):
            delayed_frames.append(delayed(get_data_frame)(delayed_chunks[i][j]))
    
    ddf = dd.from_delayed(delayed_frames, meta={'label': 'int64', 'area': 'float64', 'eccentricity': 'float64'})
    
    ddf.compute()