Search code examples
pythonpandasgoogle-colaboratorypython-multithreading

Parallel web requests with GPU on Google Collab


I need to obtain properties from a web service for a large list of products (~25,000) and this is a very time sensitive operation (ideally I need this to execute in just a few seconds). I coded this first using a for loop as a proof of concept but it's taking 1.25 hours. I'd like to vectorize this code and execute the http requests in parallel using a GPU on Google Collab. I've removed many of the unnecessary details, but it's important to note that the products and their web service urls are stored in a DataFrame.

Will this be faster to execute on a GPU? Or should I just use multiple threads on a CPU?

What is the best way to implement this? And How can I save the results from parallel processes to the results DataFrame (all_product_properties) without running into concurrency problems?

Each product has multiple properties (key-value pairs) that I'm obtaining from the JSON response, but the product_id is not included in the JSON response so I need to add the product_id to the DataFrame.

#DataFrame containing string column of urls
urls = pd.DataFrame(["www.url1.com", "www.url2.com", ..., "www.url3.com"], columns=["url"])

#initialize empty dataframe to store properties for all products
all_product_properties = pd.DataFrame(columns=["product_id", "property_name", "property_value"])

for i in range(1, len(urls)):

  curr_url = urls.loc[i, "url"]

  try:
    http_response = requests.request("GET", curr_url)

    if http_response is not None:

      http_response_json = json.loads(http_response.text)

      #extract product properties from JSON response
      product_properties_json = http_response_json['product_properties']
      curr_product_properties_df = pd.json_normalize(product_properties_json)

      #add product id since it's not returned in the JSON
      curr_product_properties_df["product_id"] = i

      #save current product properties to DataFrame containing all product properties
      all_product_properties = pd.concat([all_product_properties, curr_product_properties_df ])

  except Exception as e:
    print(e)

Solution

  • GPUs probably will not help here since they are meant for accelerating numerical operations. However, since you are trying to parallelize HTTP requests which are I/O bound, you can use Python multithreading (part of the standard library) to reduce the time required.

    In addition, concatenating pandas dataframes in a loop is a very slow operation (see: Why does concatenation of DataFrames get exponentially slower?). You can instead append your output to a list, and run just a single concat after the loop has concluded.

    Here's how I would implement your code w/ multithreading:

    # Use an empty list for storing loop output
    all_product_properties = []
    
    thread_local = threading.local()
    
    def get_session():
        if not hasattr(thread_local, "session"):
            thread_local.session = requests.Session()
        return thread_local.session
    
    
    def download_site(url):
        session = get_session()
    
        try:
            with session.get(url) as response:
                if response is not None:
                    http_response_json = json.loads(response.text)
                    product_properties_json = http_response_json['product_properties']
                    curr_product_properties_df = pd.json_normalize(product_properties_json)
    
                    #add product id since it's not returned in the JSON
                    curr_product_properties_df["product_id"] = i
    
                    #save current product properties to DataFrame containing all product properties
                    return curr_product_properties_df
                    print(f"Read {len(response.content)} from {url}")
        except Exception as e:
            print(e)
            
    
    def download_all_sites(sites):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            all_product_properties = executor.map(download_site, sites)
        return all_product_properties
    
    
    if __name__ == "__main__":
        # Store URLs as list, example below
        urls = ["https://www.jython.org", "http://olympus.realpython.org/dice"] * 10
        start_time = time.time()
        all_product_properties = download_all_sites(urls)
        all_product_properties = pd.concat(all_product_properties)
        
        duration = time.time() - start_time
        print(f"Downloaded {len(urls)} in {duration} seconds")
    

    Reference: this RealPython article on multithreading and multiprocessing in Python: https://realpython.com/python-concurrency/