I'm doing a project to create an Image Semantic Search and one of the steps is pre-computing the images embeddings to do that I'm using an ImageBatchGenerator
class that should take a list of URLs and a batch size so I can later loop and compute the embeddings. I also wanted to add multiprocessing so this process goes faster therefore I've used Python's concurrent.futures
So far this is my implementation:
import requests
from io import BytesIO
from typing import Union
from concurrent.futures import ThreadPoolExecutor, as_completed
from PIL import Image
def fetch_image(url: str) -> tuple[str, Image.Image]:
"""Fetch image from url
Parameters
----------
url : str
url of the image
Returns
-------
tuple[str, Image.Image]
tuple (url, image) where image is PIL image object and url is the url of the image
"""
try:
response = requests.get(url)
response.raise_for_status()
return url, Image.open(BytesIO(response.content))
except requests.exceptions.HTTPError as errh:
print(f"HTTP Error: {errh}")
except requests.exceptions.ConnectionError as errc:
print(f"Error Connecting: {errc}")
except requests.exceptions.Timeout as errt:
print(f"Timeout Error: {errt}")
except requests.exceptions.RequestException as err:
print(f"Something Else: {err}")
return None
class ImageBatchGenerator:
"""
A generator class that get's as arguments a list of URLs and batch size and generates batches of PIL images
that are obtained through GET requests to the URLs.
Parameters
----------
urls : list[str]
List of URLs to fetch images from
batch_size : int
The size of the batches to be generated
"""
def __init__(self, urls: list[str], batch_size: int=32) -> None:
self.urls = urls
self.batch_size = batch_size
self.executor = ThreadPoolExecutor()
def __len__(self) -> int:
return (len(self.urls) + self.batch_size - 1) // self.batch_size
def __iter__(self) -> ImageBatchGenerator:
self.futures = [self.executor.submit(fetch_image, url) for url in self.urls]
return self
def __next__(self) -> dict[str, Union[str, Image.Image]]:
images = []
urls = []
for future in as_completed(self.futures):
url, image = future.result()
if image is not None:
images.append(image)
urls.append(url)
if len(images) == self.batch_size:
break
if len(images) == 0:
self.executor.shutdown()
raise StopIteration
return {"images": images, "urls": urls}
The problem I'm facing is that I'm getting a lot of repetition for URLs (i.e. loading the same image more than once), though the list contains only unique URLs.
To test it I used this:
data = pd.read_csv("https://raw.githubusercontent.com/EduardoPach/Semantic-Image-Search/main/data.csv")
urls = data["path"].tolist()
batch_generator = utils.ImageBatchGenerator(urls)
counter = {}
for idx, batch_dict in enumerate(batch_generator):
images, urls = batch_dict.values()
print(f"Doing Batch {idx+1}")
for url in urls:
if url in counter:
print(f"\t found url ({url}) again")
counter[url] += 1
else:
counter[url] = 1
Any help would be appreciated.
The problem is that as a Future becomes "completed" you are not removing it from self.futures
. Each time __iter__
is called the Future instances are reiterated from the start of the collection and so the first completed Future will always be the one returned.
The simplest fix is to make self.futures
a set so that removal of a completed Future is efficient:
...
def __iter__(self) -> ImageBatchGenerator:
# Use a set:
self.futures = {self.executor.submit(fetch_image, url) for url in self.urls}
return self
def __next__(self) -> dict[str, Union[str, Image.Image]]:
images = []
urls = []
for future in as_completed(self.futures):
# Remove completed future:
self.futures.remove(future)
url, image = future.result()
... # etc.
Another Solution
You iterate the as_completed
iterator once in the following way:
def __iter__(self) -> ImageBatchGenerator:
self.futures = [self.executor.submit(fetch_image, url) for url in self.urls]
# Create the iterator once here:
self.as_completed = as_completed(self.futures)
return self
def __next__(self) -> dict[str, Union[str, Image.Image]]:
images = []
urls = []
try:
while True:
future = next(self.as_completed)
url, image = future.result()
if image is not None:
images.append(image)
urls.append(url)
if len(images) == self.batch_size:
return {"images": images, "urls": urls}
except StopIteration:
self.executor.shutdown()
if len(images) == 0:
raise
return {"images": images, "urls": urls}