Search code examples
pythonmultithreadingpython-multithreading

Getting arguments to a function that passed or failed after running the function inside ThreadPoolExecutor


Objective:
Download images from a list of URLs using ThreadPoolExecutor.
If any URL returns 404, add url to failed else add to passed.
Finally return all passed and failed URLs.

Code:

import requests
import concurrent.futures

img_urls = [
    "https://i...content-available-to-author-only...h.com/photo-1516117172878-fd2c41f4a758",
    "https://i...content-available-to-author-only...h.com/photo-1532009324734-20a7a5813719",
    "https://i...content-available-to-author-only...h.com/photo-1524429656589-6633a470097c",
    "https://i...content-available-to-author-only...h.com/photo-1530224264768-7ff8c1789d80",
    "https://i...content-available-to-author-only...h.com/photo-1564135624576-c5c88640f235",
]

def download_image(img_url):
    resp = requests.get(img_url)
    resp.raise_for_status()
    img_bytes = resp.content
    img_name = img_url.split("/")[3]
    img_name = f"{img_name}.jpg"
    with open(img_name, "wb") as img_file:
        img_file.write(img_bytes)
        print(f"{img_name} was downloaded...")


def dl_sequential():

    passed = set()
    failed = set()

    for img_url in img_urls:
        try:
            download_image(img_url)
            passed.add(img_url)
        except requests.exceptions.RequestException:
            failed.add(img_url)
            pass
    return passed, failed


def dl_threads():

    passed = 0
    failed = 0

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(download_image, img) for img in img_urls]

    for future in concurrent.futures.as_completed(futures):
        try:
            future.result()
            passed += 1
        except requests.exceptions.RequestException:
            failed += 1
            pass
    return passed, failed

if __name__ == "__main__":
    print("Starting sequential processing")
    passed, failed = dl_sequential()
    print("Passing URLs:", *passed, sep="\n")
    print("Failing URLs:", *failed, sep="\n")

    print("\nStarting parallel processing")
    passed, failed = dl_threads()
    print(passed, failed)

Basically dl_threads() output should be same as dl_sequential()


Solution

  • Very similar example is posted on official doc with a change that allows you to extract future argument (URL) in case of failure as well. https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed

     with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            futures = [executor.submit(download_image, img) for img in img_urls] #`futures` is a map of 
    
        for future in concurrent.futures.as_completed(futures):
                img_url = futures[future] #returns original arg
                try:
                    data = future.result()
                    passed += 1
                    passed_urls.add(img_url)
                except requests.exceptions.RequestException:
                    failed += 1
                    failed_urls.add(img_url)