Search code examples
pythonmultiprocessingprocess-pool

Working with ProcessPoolExecutor.(Windows) Where do all the classes and functions go, that have nothing to do with multiprocessing?


+++ I EDITED MY QUESTION, TO REFLECT WHAT I THINK IS THE CORRECT STRUCTURE NOW, so that not necessary things don't get loaded in the child workers.. with the help of @booboo +++ THX

Where do all my classes and functions go(that have nothing to do with multiprocessing), so they are not loaded into every single process,....inside main or inside other files?

Should all my multiprocessing functions go into a separate file, to avoid loading unnecessary classes and functions into the processes? I think this is what seems most logical.

None of the written tutorials/videos talk about it.

import myMulti

if __name__ == '__main__':
    import os
    from time import sleep, time
    import concurrent.futures
    import myClasses

    folder = 'G:\+++CODING+++\Thin-Plate-Spline-Motion-Model-Windows-main\\assets\\test28\\'

    # some classes
    # some classesresultlist=[]
    start_time = time()
    resultlist=[]
    def create_filepaths(folder):
        return sorted([os.path.join(folder, f) for f in os.scandir(folder)])


    def update_chunks_to_gui(future_object, local_last_chunk_index,processing_abort=False):
        if processing_abort:  # If user changes directory, thumbnail creating will be canceled.
            for image in future_object:
                image.cancel()
        else:
            for index, image in enumerate(future_object[local_last_chunk_index:]):
                if image.done():
                    resultlist.append((image.result()))  # resultlist is just for Demo....update GUI with image
                    local_last_chunk_index += 1
                else:
                    break
            return local_last_chunk_index


    def start_job(filepaths):
        print('...starting jobs')
        pool = concurrent.futures.ProcessPoolExecutor(max_workers=12)
        return [pool.submit(myMulti.process_file, file) for file in filepaths]

    def main():
        filepaths = create_filepaths(folder)
        last_chunk_index = 0

        future = start_job(filepaths)

        while True:  # GUI and general logic Main Loop
            # do stuff
            # do stuff
            print('+')
            sleep(0.5)
            if not last_chunk_index == len(future):
                last_chunk_index = update_chunks_to_gui(future, last_chunk_index)  # processing_abort=True if jobs needs to be canceled, for example when user changes folder
            else:
                print("...all done in:", time()-start_time)
                quit()


if __name__ == '__main__':
    main()

What I now have tried is.... this in a seperate file that I import into my main program:

What is the right approach, that I don't load unnecessary methods/modules/classes/functions in the processes?

myMultiprocessing:

import PIL.Image
import io
import base64


def convert_to_bytes(file_or_bytes, resize=None, fill_blanc_color=None):
    if isinstance(file_or_bytes, str):
        img = PIL.Image.open(file_or_bytes)
        if file_or_bytes.endswith(".jpg"):
            img.draft('RGB', resize if resize else None )
    else:
        try:
            img = PIL.Image.open(io.BytesIO(base64.b64decode(file_or_bytes)))
        except Exception as e:
            dataBytesIO = io.BytesIO(file_or_bytes)
            img = PIL.Image.open(dataBytesIO)
    if resize:
        img = img.resize((int(img.size[0]*min(resize[0]/img.size[1], resize[0]/img.size[0])),
                          int(img.size[1]*min(resize[1]/img.size[1], resize[0]/img.size[0]))),
                         PIL.Image.HAMMING).convert("RGBA")
    if fill_blanc_color:
        r, g, b = fill_blanc_color
        newimg = PIL.Image.new("RGB", resize if resize else img.size, (r, g, b))
        newimg.paste(img, (int((100-img.width)/2), int((100-img.height)/2)), img)
        img=newimg
    with io.BytesIO() as data:
        img.save(data, format="PNG")
        #del newimg
        del img
        return data.getvalue()


def process_file(filepath):
    return convert_to_bytes(filepath, resize=(100, 100), fill_blanc_color=(90, 105, 121))

BTW here is a picture on what I am working on, this is my first python program. But pretty fun to learn all the concepts needed as I go along. enter image description here


Solution

  • You haven't stated your platform as the guidelines for Python multiprocessing request. I will assume you are running under a platforms such as Windows that uses the spawn method to create new processes.

    In that case every new process is initialized by starting off with essentially uninitialized memory into which a new Python interpreter is started, which re-reads the source program initializing memory be processing everything at global scope (import statements, function definitions, global variable declarations and any executable code at global scope) unless what is at global scope is contained within a if __name__ == '__main__': block (the if statement will be executed but the test will fail).

    So, to save some execution time, put whatever you do not require by the child processes within such a block. For instance:

    if __name__ == '__main__':
        import os
        import myClasses
        import time
    
    def process_file(filepath): #Multiprocessing
        return Processed file
     
    
    def main():
        # myClasses in main()???
        # def some functions
        # def some functions  
       
        def create_filepaths(folder):
            return sorted([os.path.join(folder, f) for f in os.scandir(folder)])
    
    
        def update_chunks_to_GUI(future_object):
             # update chunks to GUI
    
        filepaths = create_filepaths(folder)
        last_chunk_index = 0
    
        with concurrent.futures.ProcessPoolExecutor(max_workers=1) as pool:
            future = [pool.submit(process_file, file) for file in filepaths]
    
            while True:              # GUI and general logic Main Loop
                # do stuff
                # do stuff
                update_chunks_to_GUI(future)
    
    if __name__ == '__main__':
        main()
    

    If, for example, process_file required function from the time module, you would not include import time in such a block:

    import time
    
    if __name__ == '__main__':
        import os
        import myClasses
    
    def process_file(filepath): #Multiprocessing
        return Processed file
     
    
    def main():
        # myClasses in main()???
        # def some functions
        # def some functions  
       
        def create_filepaths(folder):
            return sorted([os.path.join(folder, f) for f in os.scandir(folder)])
    
    
        def update_chunks_to_GUI(future_object):
             # update chunks to GUI
    
        filepaths = create_filepaths(folder)
        last_chunk_index = 0
    
        with concurrent.futures.ProcessPoolExecutor(max_workers=1) as pool:
            future = [pool.submit(process_file, file) for file in filepaths]
    
            while True:              # GUI and general logic Main Loop
                # do stuff
                # do stuff
                update_chunks_to_GUI(future)
    
    if __name__ == '__main__':
        main()