Search code examples
pythonmultithreadingmultiprocessing

Thread & Process PoolExecutor with workers that keep state


For a FTP download problem at scale I'd like to use Python's concurrent futures to get multiple FTP connections and have each download multiple files.

My question is how can I make workers that can keep their state such that I don't have to reconnect the FTP for each file I download?

Ideally I'd do something like this:

from concurrent.futures import ProcessPoolExecutor as Executor

class FTP_Worker:
    def __init__(self, idn):
        self.idn = idn
        self.connection = ftplib.connect(server, password)
        
    def downloadfile(filename):
        self.connection.download(filename)
        
workers = [FTP_Worker(i) for i in range(12)]
filenames = filenames # list of 1000s remote files

Executor(max_workers=12).map(workers, filenames)

And ideally this works for both ThreadPoolExecutor and ProcessPoolExecutor.


Solution

  • Some comments:

    First, you did not provide a minimal, complete example, so in the code below I have commented out FTP-related calls and have instead added some print statements so that you can follow what is going on.

    Second, the ProcessPoolExecutor.map method takes as a first argument a function. You are instead passing it a list of FTP_Worker instances. This is clearly not going to work. But I see, based on the code you posted, no need for such a class.

    Third, you are using a multiprocessing pool. But since your code appears to require no intensive CPU processing, a multithreading pool I have opted to use class concurrent.futures.ThreadPoolExecutor instead. But you should be able to replace this with concurrent.futures.ProcessPoolExecutor and the code should still work.

    Fourth, "thread local" storage is used to hold one reusable FTP connection for each thread/process in your pool so that the 12 connections can be reused for your many downloads. But rather than keeping the connection in thread local storage, I am storing a special Connector class instance that wraps the connection so that when the threads/processes that constitute the pool are destroyed, the connections will be closed.

    You will, of course, need to supply the missing definitions and un-comment the code that does the FTP-related operations:

    from concurrent.futures import ThreadPoolExecutor as Executor
    import threading
    
    threadLocal = threading.local()
    
    class Connector:
        def __init__(self, server, password):
            raise Exception('You must call the get_connection class method instead of __init__')
    
        @classmethod
        def get_connection(cls):
            connector = getattr(threadLocal, 'connector', None)
            if connector is None:
                connector = cls.__new__(cls)
                # ftplib, server and password are not defined, so for demo we comment
                # the following out and assign None to the connection:
                #connector.connection = ftplib.connect(self._server, self._password)
                connector.connection = None
                # Just print out this for demo purposes
                print('creating new connector')
                threadLocal.connector = connector
            return connector.connection
    
        def __del__(self):
            # Comment out the following since for this demo there is no connection:
            #self._connection.close()
            print('closing connection')
    
    def downloadfile(idn, filename):
        connection = Connector.get_connection()
        # We don't have an actual connection, so:
        #connection.download(filename)
        # To emulate network download time:
        import time
        time.sleep(1)
        print(idn, filename)
    
    if __name__ == '__main__':
        filenames = [f'fn{i}' for i in range(1, 21)]
        idns = range(1, len(filenames) + 1)
    
        with Executor(max_workers=12) as executor:
            executor.map(downloadfile, idns, filenames)
        # Now the pool will be shutdown and connections closed
        print('Downloads completed.')
    

    Prints:

    creating new connector
    creating new connector
    creating new connector
    creating new connector
    creating new connector
    creating new connector
    creating new connector
    creating new connector
    creating new connector
    creating new connector
    creating new connector
    creating new connector
    11 fn11
    9 fn9
    8 fn8
    6 fn6
    4 fn4
    1 fn1
    10 fn10
    7 fn7
    2 fn2
    12 fn12
    3 fn3
    5 fn5
    closing connection
    closing connection
    closing connection
    closing connection
    18 fn18
    19 fn19
    14 fn14
    13 fn13
    closing connection
    closing connection
    closing connection
    closing connection
    15 fn15
    16 fn16
    closing connection
    closing connection
    17 fn17
    closing connection
    20 fn20
    closing connection
    Downloads completed.