I am new for concurrent.futures. Is there any problem in the following script?
def read_excel(self):
'''
read excel -> parse df -> store into sql
'''
file_list = ['分诊咨询记录','客户消费明细表','电话网络情况明细表']
tpool = ThreadPoolExecutor(max_workers=3)
ppool = ProcessPoolExecutor(max_workers=3)
tfutures = list()
pfutures = list()
write_futures = list()
for file in file_list:
path = os.path.join(self.config['his']['root'],file)
if os.path.exists(path):
tfutures.append(tpool.submit(self._read_excel,path,file))
for tfuture in as_completed(tfutures):
pfutures.append(ppool.submit(self._parse_df,tfuture.result()))
for pfuture in as_completed(pfutures):
result = pfuture.result()
write_futures.append(tpool.submit(self.update_data,result[1],result[0]))
for _ in as_completed(write_futures):pass
tpool.shutdown()
ppool.shutdown()
Will the for loop block the code? I mean, when the first read_excel finishes, will the the first parse_df start working? What if read_excel pool did not finish all tasks, and the first parse_df filnished? Will the first update_data start working?
I hope the script will not be blocked. When a task finishes, the next step should immediately start.
As László Hunyadi has responded, your current code will not be submitting anything to the multiprocessing pool until all tasks submitted to self._read_excel
completes. But I would solve this problem differently.
Presumably the parse_df
method involves CPU-intensive processing, otherwise you would not be executing this in a multiprocessing pool. Of course, if the processing is not sufficiently intensive, the reduction in time by running these tasks in parallel will not compensate for the additional overhead incurred by using multiprocessing. In that case you would be better off to use multithreading for everything. But assuming the need to execute parse_df
in a separate process, then I would organize the code more simply as follows based on the idea that you can pass a multiprocessing pool to a multithreading pool worker function. This results in simpler, more easily followed code. In the following code a new worker method worker
has been created and the main thread only has to submit tasks to this worker, which does all the required processing:
class SomeClass:
def worker(self, path, file, ppool):
"""This executes in a multithreading pool."""
result = self._read_excel(path, file) # Simple function call
# Run the CPU-intensive processing in the passed multiprocessing pool:
future = ppool.submit(self._parse_df, result)
result = future.result()
# Finally, we continue executing
self.update_data(result[1], result[0])
def read_excel(self):
'''
read excel -> parse df -> store into sql
'''
with ThreadPoolExecutor(max_workers=3) as tpool, \
ProcessPoolExecutor(max_workers=3) as ppool:
for path, file in path_file_list:
tpool.submit(self.worker, path, file, ppool)
file_list = ['分诊咨询记录','客户消费明细表','电话网络情况明细表']
for file in file_list:
path = os.path.join(self.config['his']['root'], file)
if os.path.exists(path):
tpool.submit(self.worker, path, file))
# The implicit calls to shutdown will wait for all
# submitted tasks to the multithreading pool to complete:
...
Since your code is not a complete, minimal reproducible example, I have provided some missing methods for demo purposes so you see how everything works in practice.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class ExcelProcessor:
def worker(self, path, file, ppool):
"""This executes in a multithreading pool."""
result = self._read_excel(path, file) # Simple function call
# Run the CPU-intensive processing in the passed multiprocessing pool:
future = ppool.submit(self._parse_df, result)
result = future.result()
# Finally, we continue executing
self.update_data(result[1], result[0])
def read_excel(self):
'''
read excel -> parse df -> store into sql
'''
with ThreadPoolExecutor(max_workers=3) as tpool, \
ProcessPoolExecutor(max_workers=3) as ppool:
path_file_list = [
('path1', 'file1'),
('path2', 'file2'),
('path3', 'file3'),
]
for path, file in path_file_list:
tpool.submit(self.worker, path, file, ppool)
# The implicit calls to shutdown will wait for all
# submitted tasks to the multithreading pool to complete:
...
def _read_excel(self, path, file):
return path + file # Just concatenate the arguments
def _parse_df(self, path_file):
""" Return argument and the argument reversed"""
return path_file, path_file[::-1]
def update_data(self, s1, s2):
"""Just output the results."""
print(s1, s2, flush=True)
# Required for Windows:
if __name__ == '__main__':
ExcelProcessor().read_excel()
Prints:
1elif1htap path1file1
2elif2htap path2file2
3elif3htap path3file3