Search code examples
pythonweb-scrapingbeautifulsouppython-requestsmultiprocessing

Optimize Python Web Scraping Script Using concurrent.futures to Reduce Execution Time


I'm currently working on a web scraping script in Python that extracts table data from multiple pages of a website using urllib, BeautifulSoup, and pandas. The script is designed to handle content encoding like gzip and brotli, and it retries on certain HTTP errors such as 429 (Too Many Requests) with exponential backoff.

I've implemented concurrent processing with ProcessPoolExecutor to speed up the process. However, the script still takes a significant amount of time to run, around 395 seconds. I believe there is room for vast optimization.

Below is the full script I'm using:

import urllib.request
from bs4 import BeautifulSoup
import pandas as pd
import gzip
import brotli
import io
import time
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging

# Setup logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
log_stream = io.StringIO()
handler = logging.StreamHandler(log_stream)
formatter = logging.Formatter('%(asctime)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

def get_page_content(url):
    req = urllib.request.Request(url, headers={
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1'
    })
    response = urllib.request.urlopen(req)
    if response.info().get('Content-Encoding') == 'gzip':
        buf = io.BytesIO(response.read())
        data = gzip.GzipFile(fileobj=buf).read()
    elif response.info().get('Content-Encoding') == 'br':
        data = brotli.decompress(response.read())
    else:
        data = response.read()
    return data

def extract_table_data(page_url, page_number):
    try:
        webpage = get_page_content(page_url)
        soup = BeautifulSoup(webpage, 'html.parser')
        div_element = soup.find('div', class_='tw-mb-6 lg:tw-mb-12')
        if div_element:
            html_table = div_element.find('table')
            if html_table:
                df = pd.read_html(io.StringIO(str(html_table)))[0]
                df = df.loc[:, df.columns[1:-1]]
                df['Page Number'] = page_number
                return df
            else:
                logger.info(f"No table found in the specified div for URL: {page_url}")
        else:
            logger.info(f"Specified div element not found for URL: {page_url}")
    except urllib.error.HTTPError as e:
        if e.code == 404:
            logger.info(f"HTTP Error 404 on page {page_number}. Stopping scraping.")
            raise e
        logger.error(f"HTTP Error on page {page_number}: {str(e)}")
        traceback.print_exc()
    except Exception as e:
        logger.error(f"An error occurred for URL {page_url}: {str(e)}")
        traceback.print_exc()
    return None

def process_page(page):
    logger.info(f"Starting to process page {page}")
    try:
        url = base_url + str(page)
        logger.info(f"Fetching URL: {url}")
        retries = 0
        while retries < max_retries:
            try:
                df = extract_table_data(url, page)
                if df is not None:
                    return df
                else:
                    logger.info(f"No data found on page {page}, stopping.")
                    return None
            except urllib.error.HTTPError as e:
                if e.code == 404:
                    raise e
                elif e.code == 429:
                    logger.warning(f"HTTP Error 429 on page {page}: Too Many Requests. Retrying after delay...")
                    retries += 1
                    time.sleep(retry_delay * retries)
                else:
                    logger.info(f"HTTP Error on page {page}: {e.code}. Retrying...")
                    retries += 1
                    time.sleep(retry_delay)
            except Exception as e:
                logger.error(f"An error occurred on page {page}: {str(e)}. Retrying...")
                traceback.print_exc()
                retries += 1
                time.sleep(retry_delay)
    except Exception as e:
        logger.error(f"Failed to process page {page}: {str(e)}")
        traceback.print_exc()
    logger.info(f"Finished processing page {page}")
    return None

base_url = 'https://www.coingecko.com/en/coins/1/markets/spot?page='
all_data = pd.DataFrame()
start_page = 1
max_retries = 3
retry_delay = 5
max_consecutive_errors = 5
start_time = time.time()

with ProcessPoolExecutor(max_workers=2) as executor:
    futures = {}
    consecutive_errors = 0
    current_page = start_page

    while True:
        try:
            future = executor.submit(process_page, current_page)
            futures[future] = current_page
            current_page += 1
            completed_futures = [future for future in as_completed(futures) if future.done()]
            for future in completed_futures:
                page = futures.pop(future)
                try:
                    df = future.result()
                    if df is not None:
                        all_data = pd.concat([all_data, df], ignore_index=True)
                        consecutive_errors = 0
                    else:
                        consecutive_errors += 1
                except urllib.error.HTTPError as e:
                    if e.code == 404:
                        logger.info("Reached a page that does not exist. Stopping.")
                        break
                    consecutive_errors += 1
                except Exception as e:
                    logger.error(f"An error occurred while processing page {page}: {str(e)}")
                    consecutive_errors += 1
                if consecutive_errors >= max_consecutive_errors:
                    logger.info(f"Stopping due to {max_consecutive_errors} consecutive errors.")
                    break
            if consecutive_errors >= max_consecutive_errors or 'HTTP Error 404' in log_stream.getvalue():
                break
        except Exception as e:
            logger.error(f"Process pool encountered an error: {str(e)}")
            break

end_time = time.time()
duration = end_time - start_time
logger.info(f"Total time taken: {duration:.2f} seconds")
print(f"Total time taken: {duration:.2f} seconds")

save_path = r'C:\Users\hamid\Downloads\Crypto_Data_Table.csv'
all_data.to_csv(save_path, index=False)
logger.info(f"All data saved to '{save_path}'")

Are there any specific adjustments or optimizations that could significantly speed up the execution time? Would using another concurrency method, like ThreadPoolExecutor or a different library, help? Or is there a way to optimize the data fetching and processing in a way that reduces the overall time? Any suggestions for reducing the execution time would be greatly appreciated.


Solution

  • Don't scrape. (The fact that you're worried about "429 polling too fast" is instructive.)

    There's a free API for this. Use it.