For a FTP download problem at scale I'd like to use Python's concurrent futures to get multiple FTP connections and have each download multiple files.
My question is how can I make workers that can keep their state such that I don't have to reconnect the FTP for each file I download?
Ideally I'd do something like this:
from concurrent.futures import ProcessPoolExecutor as Executor
class FTP_Worker:
def __init__(self, idn):
self.idn = idn
self.connection = ftplib.connect(server, password)
def downloadfile(filename):
self.connection.download(filename)
workers = [FTP_Worker(i) for i in range(12)]
filenames = filenames # list of 1000s remote files
Executor(max_workers=12).map(workers, filenames)
And ideally this works for both ThreadPoolExecutor and ProcessPoolExecutor.
Some comments:
First, you did not provide a minimal, complete example, so in the code below I have commented out FTP-related calls and have instead added some print
statements so that you can follow what is going on.
Second, the ProcessPoolExecutor.map
method takes as a first argument a function. You are instead passing it a list of FTP_Worker
instances. This is clearly not going to work. But I see, based on the code you posted, no need for such a class.
Third, you are using a multiprocessing pool. But since your code appears to require no intensive CPU processing, a multithreading pool I have opted to use class concurrent.futures.ThreadPoolExecutor
instead. But you should be able to replace this with concurrent.futures.ProcessPoolExecutor
and the code should still work.
Fourth, "thread local" storage is used to hold one reusable FTP connection for each thread/process in your pool so that the 12 connections can be reused for your many downloads. But rather than keeping the connection in thread local storage, I am storing a special Connector
class instance that wraps the connection so that when the threads/processes that constitute the pool are destroyed, the connections will be closed.
You will, of course, need to supply the missing definitions and un-comment the code that does the FTP-related operations:
from concurrent.futures import ThreadPoolExecutor as Executor
import threading
threadLocal = threading.local()
class Connector:
def __init__(self, server, password):
raise Exception('You must call the get_connection class method instead of __init__')
@classmethod
def get_connection(cls):
connector = getattr(threadLocal, 'connector', None)
if connector is None:
connector = cls.__new__(cls)
# ftplib, server and password are not defined, so for demo we comment
# the following out and assign None to the connection:
#connector.connection = ftplib.connect(self._server, self._password)
connector.connection = None
# Just print out this for demo purposes
print('creating new connector')
threadLocal.connector = connector
return connector.connection
def __del__(self):
# Comment out the following since for this demo there is no connection:
#self._connection.close()
print('closing connection')
def downloadfile(idn, filename):
connection = Connector.get_connection()
# We don't have an actual connection, so:
#connection.download(filename)
# To emulate network download time:
import time
time.sleep(1)
print(idn, filename)
if __name__ == '__main__':
filenames = [f'fn{i}' for i in range(1, 21)]
idns = range(1, len(filenames) + 1)
with Executor(max_workers=12) as executor:
executor.map(downloadfile, idns, filenames)
# Now the pool will be shutdown and connections closed
print('Downloads completed.')
Prints:
creating new connector
creating new connector
creating new connector
creating new connector
creating new connector
creating new connector
creating new connector
creating new connector
creating new connector
creating new connector
creating new connector
creating new connector
11 fn11
9 fn9
8 fn8
6 fn6
4 fn4
1 fn1
10 fn10
7 fn7
2 fn2
12 fn12
3 fn3
5 fn5
closing connection
closing connection
closing connection
closing connection
18 fn18
19 fn19
14 fn14
13 fn13
closing connection
closing connection
closing connection
closing connection
15 fn15
16 fn16
closing connection
closing connection
17 fn17
closing connection
20 fn20
closing connection
Downloads completed.