Search code examples
pythonloopspytorchmultiprocessinggenerator

Generator class not working as expected with concurrency


I'm doing a project to create an Image Semantic Search and one of the steps is pre-computing the images embeddings to do that I'm using an ImageBatchGenerator class that should take a list of URLs and a batch size so I can later loop and compute the embeddings. I also wanted to add multiprocessing so this process goes faster therefore I've used Python's concurrent.futures

So far this is my implementation:

import requests
from io import BytesIO
from typing import Union
from concurrent.futures import ThreadPoolExecutor, as_completed

from PIL import Image

def fetch_image(url: str) -> tuple[str, Image.Image]:
    """Fetch image from url

    Parameters
    ----------
    url : str
        url of the image

    Returns
    -------
    tuple[str, Image.Image]
        tuple (url, image) where image is PIL image object and url is the url of the image
    """
    try:
        response = requests.get(url)
        response.raise_for_status()
        return url, Image.open(BytesIO(response.content))
    except requests.exceptions.HTTPError as errh:
        print(f"HTTP Error: {errh}")
    except requests.exceptions.ConnectionError as errc:
        print(f"Error Connecting: {errc}")
    except requests.exceptions.Timeout as errt:
        print(f"Timeout Error: {errt}")
    except requests.exceptions.RequestException as err:
        print(f"Something Else: {err}")
    return None

class ImageBatchGenerator:
    """
    A generator class that get's as arguments a list of URLs and batch size and generates batches of PIL images
    that are obtained through GET requests to the URLs.

    Parameters
    ----------
    urls : list[str]
        List of URLs to fetch images from
    batch_size : int
        The size of the batches to be generated
    """
    def __init__(self, urls: list[str], batch_size: int=32) -> None:
        self.urls = urls
        self.batch_size = batch_size
        self.executor = ThreadPoolExecutor()
    
    def __len__(self) -> int:
        return (len(self.urls) + self.batch_size - 1) // self.batch_size

    def __iter__(self) -> ImageBatchGenerator:
        self.futures = [self.executor.submit(fetch_image, url) for url in self.urls]
        return self

    def __next__(self) -> dict[str, Union[str, Image.Image]]:
        images = []
        urls = []
        for future in as_completed(self.futures):
            url, image = future.result()
            if image is not None:
                images.append(image)
                urls.append(url)
            if len(images) == self.batch_size:
                break
        if len(images) == 0:
            self.executor.shutdown()
            raise StopIteration
        return {"images": images, "urls": urls}

The problem I'm facing is that I'm getting a lot of repetition for URLs (i.e. loading the same image more than once), though the list contains only unique URLs.

To test it I used this:

data = pd.read_csv("https://raw.githubusercontent.com/EduardoPach/Semantic-Image-Search/main/data.csv")
urls = data["path"].tolist()
batch_generator = utils.ImageBatchGenerator(urls)
counter = {}
for idx, batch_dict in enumerate(batch_generator):
    images, urls = batch_dict.values()
    print(f"Doing Batch {idx+1}")
    for url in urls:
        if url in counter:
            print(f"\t found url ({url}) again")
            counter[url] += 1
        else:
            counter[url] = 1

Any help would be appreciated.


Solution

  • The problem is that as a Future becomes "completed" you are not removing it from self.futures. Each time __iter__ is called the Future instances are reiterated from the start of the collection and so the first completed Future will always be the one returned.

    The simplest fix is to make self.futures a set so that removal of a completed Future is efficient:

        ...
        def __iter__(self) -> ImageBatchGenerator:
            # Use a set:
            self.futures = {self.executor.submit(fetch_image, url) for url in self.urls}
            return self
    
        def __next__(self) -> dict[str, Union[str, Image.Image]]:
            images = []
            urls = []
            for future in as_completed(self.futures):
                # Remove completed future:
                self.futures.remove(future)
                url, image = future.result()
                ... # etc.
    

    Another Solution

    You iterate the as_completed iterator once in the following way:

        def __iter__(self) -> ImageBatchGenerator:
            self.futures = [self.executor.submit(fetch_image, url) for url in self.urls]
            # Create the iterator once here:
            self.as_completed = as_completed(self.futures)
            return self
    
        def __next__(self) -> dict[str, Union[str, Image.Image]]:
            images = []
            urls = []
            try:
                while True:
                    future = next(self.as_completed)
                    url, image = future.result()
                    if image is not None:
                        images.append(image)
                        urls.append(url)
                    if len(images) == self.batch_size:
                        return {"images": images, "urls": urls}
            except StopIteration:
                self.executor.shutdown()
                if len(images) == 0:
                    raise
                return {"images": images, "urls": urls}