Search code examples
pythonmultithreadingthreadpoolpython-multithreadingthreadpoolexecutor

`concurrent.futures.ThreadPoolExecutor` is behaving strangely


So, I am working on a project where I'm provided with some images and some excel sheets containing the information in context to those images. Now the images and excels are day to day data readings. They are orgaized in this folder structure:

Dataset Organization

Now the the task at hand:

  • I need to go through those day to day image recordings, do semantic segmentation on the images, extract object height-width-area and then put them in their respective excel sheets. Like this picture below. The created output should be organized like this: segmentation-results

I have solved this part. But my main problem is, it takes almost around 26-32 minutes for image processing, segmentation and feature extraction for each day readings. So it takes around 1.5-2h in total. The data is growing simultaneously and we are taking three readings each week. So I wrote a multi-threading script that can start processing all the days simultaneously. This cut downs the total time taken in 26-32 minutes in total for all 3 days of data.

The script works most of the time but sometimes when I run the script it seems like futures.append(executor.submit(processAndsegmentImages, day=day)) method is not starting threads for all 3 days. I checked and this happening. Sometimes it is only starting threads for 2 days or sometimes just one day.

Here's my code:

def processAndsegmentImages(day):
    # doing image processing, segmentation, and other analysis
    return '{} images processing and segmentation completed'.format(day)

if __name__ == "__main__":
    start = time.time()
    rf = Roboflow(api_key="my_api_key")
    project = rf.workspace().project("my_project")
    model = project.version(1).model
    print('model loaded\n')
    import concurrent.futures
    days = ['day1', 'day2', 'day3']
    with concurrent.futures.ThreadPoolExecutor(max_workers=min(32, os. cpu_count() + 4)) as executor:
        futures = []
        for day in days:
            print('adding {} to executor'.format(day))
            futures.append(executor.submit(processAndsegmentImages, day=day))
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    end = time.time()
    tlapsed = end-start
    print('total time taken: {:.2f} minutes'.format(tlapsed/60))

The idea output should be like this:

loading Roboflow workspace...
loading Roboflow project...
model loaded

adding day1 to executor
adding day2 to executor
adding day3 to executor
created root directory: /home/arrafi/potato/segmentation_results/day2_segmentation_results
created root directory: /home/arrafi/potato/segmentation_results/day1_segmentation_results
created root directory: /home/arrafi/potato/segmentation_results/day3_segmentation_results
starting day1 images processing from /home/arrafi/potato/segmentation_results/day1_segmentation_results/day1_raw_images/
starting day3 images processing from /home/arrafi/potato/segmentation_results/day3_segmentation_results/day3_raw_images/
starting day2 images processing from /home/arrafi/potato/segmentation_results/day2_segmentation_results/day2_raw_images/

192 day1_images proccessed and saved at:  /home/arrafi/potato/segmentation_results/day1_segmentation_results/day1_images
created pred_images and json directory for day1 images

starting potato segmentation ... of 192 day1_images

192 day2_images proccessed and saved at:  /home/arrafi/potato/segmentation_results/day2_segmentation_results/day2_images
created pred_images and json directory for day2 images

starting potato segmentation ... of 192 day2_images

192 day3_images proccessed and saved at:  /home/arrafi/potato/segmentation_results/day3_segmentation_results/day3_images
created pred_images and json directory for day3 images

starting potato segmentation ... of 192 day3_images

................and some more outputs......................

Most of the time the script runs successfully but sometimes I notice that the script is only starting the process for one/two days. Like this below, it only started for day1 and day3:

loading Roboflow workspace...
loading Roboflow project...
model loaded

adding day1 to executor
adding day2 to executor
adding day3 to executor
created root directory: /home/arrafi/potato/segmentation_results/day1_segmentation_results
created root directory: /home/arrafi/potato/segmentation_results/day3_segmentation_results
starting day1 images processing from /home/arrafi/potato/segmentation_results/day1_segmentation_results/day1_raw_images/
starting day3 images processing from /home/arrafi/potato/segmentation_results/day3_segmentation_results/day3_raw_images/


192 day1_images proccessed and saved at:  /home/arrafi/potato/segmentation_results/day1_segmentation_results/day1_images
created pred_images and json directory for day1 images

starting potato segmentation ... of 192 day1_images

192 day3_images proccessed and saved at:  /home/arrafi/potato/segmentation_results/day3_segmentation_results/day3_images
created pred_images and json directory for day3 images

starting potato segmentation ... of 192 day3_images

................and some more outputs......................

buggy output

Can anyone point out why is this happening? Am I doing something wrong in calling the ThreadPoolExecutor? I have searched online for solutions but can't find out why is this happening because so far the code is not throwing any error. And the behavior is so random.


Solution

  • So, I found out that sometimes if there's another python program running or a residual thread from a previous run is causing the issue. So, I added a small script at the start of my multithreading script, to restart the python program. Here's how I solved it:

    import os
    import sys
    import psutil
    import logging
    
    def restart_program():
        """
        Restarts the current program, with file objects and descriptors cleanup
        """
        try:
            p = psutil.Process(os.getpid())
            for handler in p.get_open_files() + p.connections():
                os.close(handler.fd)
        except Exception as e:
            logging.error(e)
    
        python = sys.executable
        os.execl(python, python, *sys.argv)
    

    The restart_program function is designed to restart the current Python program. It closes file objects and descriptors associated with the current process and then re-launches the program. This can be useful in certain scenarios, such as when you need to reload the program with new settings or configurations. I got the script from here: https://stackoverflow.com/a/33334183/13520498