Search code examples
pythonmultithreadingdictionarymultiprocessingconcurrent.futures

How to get concurrent.futures ProcessPoolExecutor work with a dictionary?


I watched the python multiprocessing tutorial on youtube, here's the link https://www.youtube.com/watch?v=fKl2JW_qrso&t=2316s&ab_channel=CoreySchafer

Then, I tried to apply that method in my code, Here's my code before applying multiprocessing:

import requests
from bs4 import BeautifulSoup
import pandas as pd
import investpy
from pandas import Timestamp
import json
from pandas.io.json import json_normalize
import time

def getCurrency_data():
    user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.37"
    links = {"USD-IDR":"https://www.investing.com/currencies/usd-idr", 
             "USD-JPY":"https://www.investing.com/currencies/usd-jpy",
             "USD-CNY":"https://www.investing.com/currencies/usd-cny", 
             "USD-EUR":"https://www.investing.com/currencies/usd-eur",
             "USD-SGD":"https://www.investing.com/currencies/usd-sgd", 
             "USD-THB":"https://www.investing.com/currencies/usd-thb",
             "USD-MXN":"https://www.investing.com/currencies/usd-mxn", 
             "USD-MYR":"https://www.investing.com/currencies/usd-myr",
             "USD-KRW":"https://www.investing.com/currencies/usd-krw", 
             "USD-INR":"https://www.investing.com/currencies/usd-inr"}
    df_currency= pd.DataFrame(columns =['Currency', 'Current', 'Change', 'Prev. Close', 'Open', '1 Year Change'])
    currency=[]
    current=[]
    change=[]
    prev_close=[]
    open_=[]
    oneyear_change=[]
    for key, value in links.items():
        data = requests.get(value, headers={'User-Agent': user_agent})
        soup = BeautifulSoup(data.content, 'html.parser')
        tags1 = soup.find_all('div', {'class':'top bold inlineblock'})
        span_tag =  []
        for div in tags1:
            spans = div.find_all('span')
            for span in spans:       
                x = span.text
                span_tag.append(x)
        current_tmp = span_tag[0]
        change_tmp = span_tag[1]
        
        current.append(current_tmp)
        change.append(change_tmp)
        currency_tmp = key
        currency.append(currency_tmp)
        
        cur = []
        tags2 = soup.find('div', {'class':'clear overviewDataTable overviewDataTableWithTooltip'})
        for a in tags2.findAll('div', {'class':'first inlineblock'}):
            for b in a.findAll('span', {'class':'float_lang_base_2 bold'}):
                cur.append(b.text)
        prevclose_tmp = cur[0]
        open_tmp = cur[1]
        oneyearchange_tmp = cur[2]
        
        prev_close.append(prevclose_tmp)
        open_.append(open_tmp)
        oneyear_change.append(oneyearchange_tmp)
        
        
    df_currency["Currency"] = currency
    df_currency["Current"] = current
    df_currency["Change"] = change
    df_currency["Prev. Close"] = prev_close
    df_currency["Open"] = open_
    df_currency["1 Year Change"] = oneyear_change
    return(df_currency)

well, but I'm confused. In that video, the input is a list, while I use a dictionary... here's the code after applying multiprocessing:

import requests
from bs4 import BeautifulSoup
import pandas as pd
import investpy
from pandas import Timestamp
import json
from pandas.io.json import json_normalize
import time
import concurrent.futures

t1 = time.perf_counter()

links = {"USD-IDR":"https://www.investing.com/currencies/usd-idr", 
         "USD-JPY":"https://www.investing.com/currencies/usd-jpy",
         "USD-CNY":"https://www.investing.com/currencies/usd-cny", 
         "USD-EUR":"https://www.investing.com/currencies/usd-eur",
         "USD-SGD":"https://www.investing.com/currencies/usd-sgd",
         "USD-THB":"https://www.investing.com/currencies/usd-thb",
         "USD-MXN":"https://www.investing.com/currencies/usd-mxn",
         "USD-MYR":"https://www.investing.com/currencies/usd-myr",
         "USD-KRW":"https://www.investing.com/currencies/usd-krw",
         "USD-INR":"https://www.investing.com/currencies/usd-inr"}

def getCurrency_data(link):
    user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.37"
    
    df_currency= pd.DataFrame(columns =['Currency', 'Current', 'Change', 'Prev. Close', 'Open', '1 Year Change'])
    currency=[]
    current=[]
    change=[]
    prev_close=[]
    open_=[]
    oneyear_change=[]
    for key, value in links.items():
        data = requests.get(value, headers={'User-Agent': user_agent})
        soup = BeautifulSoup(data.content, 'html.parser')
        tags1 = soup.find_all('div', {'class':'top bold inlineblock'})
        span_tag =  []
        for div in tags1:
            spans = div.find_all('span')
            for span in spans:       
                x = span.text
                span_tag.append(x)
        current_tmp = span_tag[0]
        change_tmp = span_tag[1]
        
        current.append(current_tmp)
        change.append(change_tmp)
        currency_tmp = key
        currency.append(currency_tmp)
        
        cur = []
        tags2 = soup.find('div', {'class':'clear overviewDataTable overviewDataTableWithTooltip'})
        for a in tags2.findAll('div', {'class':'first inlineblock'}):
            for b in a.findAll('span', {'class':'float_lang_base_2 bold'}):
                cur.append(b.text)
        prevclose_tmp = cur[0]
        open_tmp = cur[1]
        oneyearchange_tmp = cur[2]
        
        prev_close.append(prevclose_tmp)
        open_.append(open_tmp)
        oneyear_change.append(oneyearchange_tmp)
        
        
    df_currency["Currency"] = currency
    df_currency["Current"] = current
    df_currency["Change"] = change
    df_currency["Prev. Close"] = prev_close
    df_currency["Open"] = open_
    df_currency["1 Year Change"] = oneyear_change
    return(df_currency)

with concurrent.futures.ProcessPoolExecutor() as executor:
    executor.map(getCurrency_data, links)

t2 = time.perf_counter()

print(f'Finished in {t2-t1} secondes')

Is there something wrong with my code? I've tried it but nothing comes out. Thank you.


Solution

  • This problem could also be solved using multithreading rather than multiprocessing since most of the time spent in getCurrency_data is waiting for data to come back from your requests.get request and as such there is little contention among the threads competing for the Global Interpreter Lock. But as there is some CPU-intensive processing of the data returned done by BeautifulSoup, there will always be some contention for the GIL and this suggests that:

    (1) Multiprocessing will probably perform slightly better than multithreading but only if you create as many processes as the number of URLs you have to retrieve to reflect the fact that most of the time your "worker" function is waiting and (2) you should use a requests.Session instance for retrieving the URLs since all your URLs are going against the same website and efficiency could be improved by doing so.

    To convert your program to multiprocessing or multithreading (try it both ways -- you only need to change ProcessPoolExecutor to ThreadPoolExecutor, but I found that multiprocessing was slightly more performant), function getCurrency_data should be processing only a single URL and returning back to the main process the data it has retrieved. It is the main process that should then accumulate the data returned by all the subprocesses and initialize the dataframe:

    import requests
    from bs4 import BeautifulSoup
    import pandas as pd
    #import investpy
    #from pandas import Timestamp
    #import json
    #from pandas.io.json import json_normalize
    import time
    import concurrent.futures
    from functools import partial
    
    def getCurrency_data(session, item):
        key, value = item
    
        data = session.get(value)
        soup = BeautifulSoup(data.content, 'html.parser')
        tags1 = soup.find_all('div', {'class':'top bold inlineblock'})
        span_tag =  []
        for div in tags1:
            spans = div.find_all('span')
            for span in spans:
                x = span.text
                span_tag.append(x)
        current_tmp = span_tag[0]
        change_tmp = span_tag[1]
        currency_tmp = key
    
        cur = []
        tags2 = soup.find('div', {'class':'clear overviewDataTable overviewDataTableWithTooltip'})
        for a in tags2.findAll('div', {'class':'first inlineblock'}):
            for b in a.findAll('span', {'class':'float_lang_base_2 bold'}):
                cur.append(b.text)
        prevclose_tmp = cur[0]
        open_tmp = cur[1]
        oneyearchange_tmp = cur[2]
    
        return currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp
    
    
    def main():
    
        t1 = time.perf_counter()
    
        links = {"USD-IDR":"https://www.investing.com/currencies/usd-idr",
                 "USD-JPY":"https://www.investing.com/currencies/usd-jpy",
                 "USD-CNY":"https://www.investing.com/currencies/usd-cny",
                 "USD-EUR":"https://www.investing.com/currencies/usd-eur",
                 "USD-SGD":"https://www.investing.com/currencies/usd-sgd",
                 "USD-THB":"https://www.investing.com/currencies/usd-thb",
                 "USD-MXN":"https://www.investing.com/currencies/usd-mxn",
                 "USD-MYR":"https://www.investing.com/currencies/usd-myr",
                 "USD-KRW":"https://www.investing.com/currencies/usd-krw",
                 "USD-INR":"https://www.investing.com/currencies/usd-inr"}
    
        with requests.Session() as session:
            user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.37"
            session.headers = {'User-Agent': user_agent}
            currency = []
            current = []
            change = []
            prev_close = []
            open_ = []
            oneyear_change = []
            with concurrent.futures.ProcessPoolExecutor(max_workers=len(links)) as executor:
                for return_value in executor.map(partial(getCurrency_data, session), links.items()):
                    currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp = return_value
                    currency.append(currency_tmp)
                    current.append(current_tmp)
                    change.append(change_tmp)
                    prev_close.append(prevclose_tmp)
                    open_.append(open_tmp)
                    oneyear_change.append(oneyearchange_tmp)
            df_currency = pd.DataFrame(columns=['Currency', 'Current', 'Change', 'Prev. Close', 'Open', '1 Year Change'])
            df_currency["Currency"] = currency
            df_currency["Current"] = current
            df_currency["Change"] = change
            df_currency["Prev. Close"] = prev_close
            df_currency["Open"] = open_
            df_currency["1 Year Change"] = oneyear_change
    
        t2 = time.perf_counter()
    
        print(f'Finished in {t2-t1} seconds')
    
        print(df_currency)
    
    # Required for Windows:
    if __name__ == '__main__':
        main()
    

    Prints:

    Finished in 4.4468559 seconds
      Currency   Current   Change Prev. Close      Open 1 Year Change
    0  USD-IDR  14,452.5    +52.5      14,400    14,450       - 2.49%
    1  USD-JPY    109.81    +0.09      109.72    109.73         3.47%
    2  USD-CNY    6.5006  +0.0064      6.4942    6.4951       - 6.13%
    3  USD-EUR    0.8564   0.0001      0.8563    0.8565         1.33%
    4  USD-SGD    1.3628  -0.0014      1.3643     1.364       - 0.44%
    5  USD-THB    33.370   +0.020       33.35     33.34         6.54%
    6  USD-MXN   20.3829  +0.2309      20.152    20.152       - 8.88%
    7  USD-MYR    4.2375  +0.0005       4.237    4.2395         1.67%
    8  USD-KRW  1,182.31    +6.03    1,176.28  1,175.46       - 0.69%
    9  USD-INR    74.400   +0.030       74.37     74.38       - 0.62%
    

    The Best of Both Worlds

    Since there is a fair bit of overhead in creating processes, it can be most efficient to separate the work needed to be done into what is primarily I/O (i.e. retrieving the URL) and primarily CPU (parsing and processing the retrieved HTML document) and using multithreading for the former and multiprocessing for the later. In that way you are not creating more processes than you actually need.

    As before, the multithreading pool size should be equal to the number of URLs needed to be retrieved (as long as that number is not unreasonably large; creating hundreds of threads should not be a problem) and the multiprocessing pool size should use at most the number of CPU cores you have. We therefore create both pools and pass the multiprocessing pool to our worker function, which retrieves the URL and then submits the data to the multiprocessing pool for parsing and processing that data.

    In the code below I am creating a multiprocessing pool size equal to the number of physical cores that I have (4), which is half of the number of logical cores I have, which would otherwise be the default pool size if I did not specify a size. To determine this value dynamically, you can install package psutil from the PyPI repository:

    import requests
    from bs4 import BeautifulSoup
    import pandas as pd
    #import investpy
    #from pandas import Timestamp
    #import json
    #from pandas.io.json import json_normalize
    import time
    import concurrent.futures
    from functools import partial
    import psutil
    
    def process_data(key, data):
        soup = BeautifulSoup(data, 'html.parser')
        tags1 = soup.find_all('div', {'class':'top bold inlineblock'})
        span_tag =  []
        for div in tags1:
            spans = div.find_all('span')
            for span in spans:
                x = span.text
                span_tag.append(x)
        current_tmp = span_tag[0]
        change_tmp = span_tag[1]
        currency_tmp = key
    
        cur = []
        tags2 = soup.find('div', {'class':'clear overviewDataTable overviewDataTableWithTooltip'})
        for a in tags2.findAll('div', {'class':'first inlineblock'}):
            for b in a.findAll('span', {'class':'float_lang_base_2 bold'}):
                cur.append(b.text)
        prevclose_tmp = cur[0]
        open_tmp = cur[1]
        oneyearchange_tmp = cur[2]
    
        return currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp
    
    
    def getCurrency_data(session, pool_executor, item):
        key, value = item
    
        data = session.get(value)
        f = pool_executor.submit(process_data, key, data.content)
        return f.result()
    
    def main():
    
        t1 = time.perf_counter()
    
        links = {"USD-IDR":"https://www.investing.com/currencies/usd-idr",
                 "USD-JPY":"https://www.investing.com/currencies/usd-jpy",
                 "USD-CNY":"https://www.investing.com/currencies/usd-cny",
                 "USD-EUR":"https://www.investing.com/currencies/usd-eur",
                 "USD-SGD":"https://www.investing.com/currencies/usd-sgd",
                 "USD-THB":"https://www.investing.com/currencies/usd-thb",
                 "USD-MXN":"https://www.investing.com/currencies/usd-mxn",
                 "USD-MYR":"https://www.investing.com/currencies/usd-myr",
                 "USD-KRW":"https://www.investing.com/currencies/usd-krw",
                 "USD-INR":"https://www.investing.com/currencies/usd-inr"}
    
        with requests.Session() as session:
            user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.37"
            session.headers = {'User-Agent': user_agent}
            currency = []
            current = []
            change = []
            prev_close = []
            open_ = []
            oneyear_change = []
            with concurrent.futures.ProcessPoolExecutor(psutil.cpu_count(logical=False)) as pool_executor, \
            concurrent.futures.ThreadPoolExecutor(max_workers=len(links)) as executor:
                for return_value in executor.map(partial(getCurrency_data, session, pool_executor), links.items()):
                    currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp = return_value
                    currency.append(currency_tmp)
                    current.append(current_tmp)
                    change.append(change_tmp)
                    prev_close.append(prevclose_tmp)
                    open_.append(open_tmp)
                    oneyear_change.append(oneyearchange_tmp)
            df_currency = pd.DataFrame(columns=['Currency', 'Current', 'Change', 'Prev. Close', 'Open', '1 Year Change'])
            df_currency["Currency"] = currency
            df_currency["Current"] = current
            df_currency["Change"] = change
            df_currency["Prev. Close"] = prev_close
            df_currency["Open"] = open_
            df_currency["1 Year Change"] = oneyear_change
    
        t2 = time.perf_counter()
    
        print(f'Finished in {t2-t1} seconds')
    
        print(df_currency)
    
    # Required for Windows:
    if __name__ == '__main__':
        main()
    

    Prints:

    Finished in 3.5800665 seconds
      Currency   Current   Change Prev. Close      Open 1 Year Change
    0  USD-IDR  14,452.5    +52.5      14,400    14,450       - 2.49%
    1  USD-JPY    109.81    +0.09      109.72    109.73         3.47%
    2  USD-CNY    6.5015  +0.0073      6.4942    6.4951       - 6.13%
    3  USD-EUR    0.8545  -0.0018      0.8563    0.8565         1.33%
    4  USD-SGD    1.3615  -0.0027      1.3643     1.364       - 0.44%
    5  USD-THB    33.360   +0.010       33.35     33.34         6.54%
    6  USD-MXN   20.4000  +0.2480      20.152    20.152       - 8.88%
    7  USD-MYR    4.2375  +0.0005       4.237    4.2395         1.67%
    8  USD-KRW  1,177.58    +1.30    1,176.28  1,175.46       - 0.69%
    9  USD-INR    74.352   -0.018       74.37     74.38       - 0.62%