I need to obtain properties from a web service for a large list of products (~25,000) and this is a very time sensitive operation (ideally I need this to execute in just a few seconds). I coded this first using a for loop as a proof of concept but it's taking 1.25 hours. I'd like to vectorize this code and execute the http requests in parallel using a GPU on Google Collab. I've removed many of the unnecessary details, but it's important to note that the products and their web service urls are stored in a DataFrame.
Will this be faster to execute on a GPU? Or should I just use multiple threads on a CPU?
What is the best way to implement this? And How can I save the results from parallel processes to the results DataFrame (all_product_properties) without running into concurrency problems?
Each product has multiple properties (key-value pairs) that I'm obtaining from the JSON response, but the product_id is not included in the JSON response so I need to add the product_id to the DataFrame.
#DataFrame containing string column of urls
urls = pd.DataFrame(["www.url1.com", "www.url2.com", ..., "www.url3.com"], columns=["url"])
#initialize empty dataframe to store properties for all products
all_product_properties = pd.DataFrame(columns=["product_id", "property_name", "property_value"])
for i in range(1, len(urls)):
curr_url = urls.loc[i, "url"]
try:
http_response = requests.request("GET", curr_url)
if http_response is not None:
http_response_json = json.loads(http_response.text)
#extract product properties from JSON response
product_properties_json = http_response_json['product_properties']
curr_product_properties_df = pd.json_normalize(product_properties_json)
#add product id since it's not returned in the JSON
curr_product_properties_df["product_id"] = i
#save current product properties to DataFrame containing all product properties
all_product_properties = pd.concat([all_product_properties, curr_product_properties_df ])
except Exception as e:
print(e)
GPUs probably will not help here since they are meant for accelerating numerical operations. However, since you are trying to parallelize HTTP requests which are I/O bound, you can use Python multithreading (part of the standard library) to reduce the time required.
In addition, concatenating pandas dataframes in a loop is a very slow operation (see: Why does concatenation of DataFrames get exponentially slower?). You can instead append your output to a list, and run just a single concat after the loop has concluded.
Here's how I would implement your code w/ multithreading:
# Use an empty list for storing loop output
all_product_properties = []
thread_local = threading.local()
def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
def download_site(url):
session = get_session()
try:
with session.get(url) as response:
if response is not None:
http_response_json = json.loads(response.text)
product_properties_json = http_response_json['product_properties']
curr_product_properties_df = pd.json_normalize(product_properties_json)
#add product id since it's not returned in the JSON
curr_product_properties_df["product_id"] = i
#save current product properties to DataFrame containing all product properties
return curr_product_properties_df
print(f"Read {len(response.content)} from {url}")
except Exception as e:
print(e)
def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
all_product_properties = executor.map(download_site, sites)
return all_product_properties
if __name__ == "__main__":
# Store URLs as list, example below
urls = ["https://www.jython.org", "http://olympus.realpython.org/dice"] * 10
start_time = time.time()
all_product_properties = download_all_sites(urls)
all_product_properties = pd.concat(all_product_properties)
duration = time.time() - start_time
print(f"Downloaded {len(urls)} in {duration} seconds")
Reference: this RealPython article on multithreading and multiprocessing in Python: https://realpython.com/python-concurrency/