Search code examples
pythonpandasdaskdask-delayed

Load images into a Dask Dataframe


I have a dask dataframe which contains image paths in a column (called img_paths). What I want to do in the next steps is to load images using those image paths into an another column (called img_loaded) and followed by applying some pre-processing functions.

However, during loading (or, image reading) process I am always getting different results including one time delayed wrapping of the imread function, another time correct loading of the image (I can see the arrays) and rest of the times: FileNotFoundError.

In addition to the following examples, I have used map_partitions function as well but I am also ended up in similar outputs except without having the arrays. In the end, I want to use map_partitions function than apply function.

Following is my code and descriptions about the problems:

import pandas as pd
import dask
import dask.dataframe as dd
from skimage.io import imread

imgs = ['https://cdn.sstatic.net/Sites/stackoverflow/company/img/logos/so/so-logo.png?v=9c558ec15d8a'] * 42

# create a pandas dataframe using image paths
df = pd.DataFrame({"img_paths": imgs})

# convert it into dask dataframe
ddf = dd.from_pandas(df, npartitions=2)

# convert imread function as delayed
delayed_imread = dask.delayed(imread, pure=True)

First try: using lambda function and apply delayed imread to each cell

ddf["img_loaded"] = ddf.images.apply(lambda x: delayed_imread(x))
ddf.compute()

Here what I get is wrapping of the delayed imread function when using the compute() method. I do not understand why? Following is the output:

enter image description here

Second try: without using lambda function

ddf["img_loaded"] = ddf.images.apply(delayed_imread)
ddf.compute()

This has worked! At least, I can see the loaded images as the arrays. But, I really do not get it why? why is this different than the first solution (i.e., using lambda function) Following is the output:

enter image description here

Third try: with/without using lambda function and without using delayed imread.

ddf["load"] = ddf.images.apply(imread) # or, lambda x: imread(x)
ddf.compute()

Here, again just for an experimentation I did not use the delayed imread function, rather I use simply the skimage.io.imread function. And, I have tried both using with and without lambda function. In each time, I got FileNotFoundError. I did not get this. Why can't it find the image path (although, they are correct) when using non-delayed imread function?

In addition to Ronald's answer, how to use map_partitions function:

ddf["img_loaded"] = ddf.map_partitions(lambda df: df.images.apply(lambda x: imread(x)), meta=("images", np.uint8)).compute()
ddf.compute()

Solution

  • The solution

    import pandas as pd
    import dask
    import dask.dataframe as dd
    import numpy as np
    from skimage.io import imread
    
    imgs = ['https://cdn.sstatic.net/Sites/stackoverflow/company/img/logos/so/so-logo.png?v=9c558ec15d8a'] * 4
    
    # create a pandas dataframe using image paths
    df = pd.DataFrame({"img_paths": imgs})
    
    # convert it into dask dataframe
    ddf = dd.from_pandas(df, npartitions=2)
    
    # convert imread function as delayed
    delayed_imread = dask.delayed(imread, pure=True)
    
    # give dask information about the function output type
    ddf['img_paths'].apply(imread, meta=('img_loaded', np.uint8)).compute()
    
    # OR turn it into dask.dealayed, which infers output type `object`
    ddf['img_paths'].apply(delayed_imread).compute()
    

    The explanation

    If you do try applying the print function, without computation you see the reason for FileNotFoundError of code: ddf.images.apply(imread).compute()

    ddf['img_paths'].apply(print)
    

    Output:

    > foo
    > foo
    

    When you add apply function to the graph, Dask runs through it string foo to infer the type of the output => imread was trying to open file named foo.

    To get a better understanding I encourage you to try:

    ddf.apply(print, axis=1)
    

    And try to predict what gets printed.

    Delayed cells after .compute()

    The reason is apply expects a function reference which is then called. By creating lambda function calling the delayed function you are basically double-delaying your function.