Search code examples
pythonmultithreadingbeautifulsouppython-requestscron

Response ended prematurely while scrapping web page inside cronjob


I created Cronjob to execute the Script every 24 hours, I noticed that this error occurs when the code compiles itself during the cron process on the local machine I did not notice this problem.

import os
import psycopg2
import re
import requests
import time
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
from loguru import logger
from psycopg2.extras import execute_values
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

PSQL_CONN_STRING = os.environ.get("DB_Postgresql","Some env keys")
conn = psycopg2.connect(PSQL_CONN_STRING)

session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))


def get_list_of_eans(conn) -> tuple[list, list, list]:
   # take data from db and process it


def scrape_page(conn, ean: str, sku: str, supplier: str, sid: str, aid: str):
    search_query = ean if isinstance(ean, int) else ean.replace('"', '')
    if not str(search_query).startswith('PO_'):

        time.sleep(1)
        url = f'https://www.ebay.de/sch/i.html?_from=R42&_nkw={search_query}&_sacat=0&_sop=2&LH_ItemCondition=3&LH_BIN=1&_stpos=10719&_fcid=77&_fspt=1&LH_PrefLoc=99&rt=nc&_sadis=1000'
        res = session.get(url)

        if res.status_code != 200:
            return res.status_code

        soup = BeautifulSoup(res.text, 'html.parser')
        # further code for obtaining information from the site  
        
        
                            data.append(
                        [offer_id, search_query, title, supplier, price, shipping, free_days])
            except Exception as ex:
                logger.error(f'search - {search_query}: {ex}')
        if data:

            try:
                with conn.cursor() as cur:
                    execute_values(cur, '''
                         insert do update on conflict 
                    ''', data)
                    conn.commit()
            except psycopg2.IntegrityError as ie:
                conn.rollback()
                logger.error(f'IntegrityError on insert - {search_query}: {ie}')
            except psycopg2.DatabaseError as de:
                conn.rollback()
                logger.error(f'DatabaseError on insert - {search_query}: {de}')
            except Exception as e:
                conn.rollback()
                logger.error(f'Unexpected error on insert - {search_query}: {e}')

def main():
    list_of_eans, list_sid, list_aid = get_list_of_eans(conn)
    pool = ThreadPoolExecutor(10)
    future_threads = []

    d, dd = 0, 0

    expanded_list_sid = list_sid * (len(list_of_eans) // len(list_sid)) + list_sid[:len(list_of_eans) % len(list_sid)]
    expanded_list_aid = list_aid * (len(list_of_eans) // len(list_aid)) + list_aid[:len(list_of_eans) % len(list_aid)]

    for ean, sid, aid in zip(list_of_eans, expanded_list_sid, expanded_list_aid):
        future = pool.submit(scrape_page, conn, ean[0], ean[1], ean[2], sid, aid)
        future_threads.append(future)
        dd += 1

    for future in future_threads:
        result = future.result()
        if d % 100 == 0:
            logger.info(f'{d} / {dd}, status code: {result}')
        d += 1


if __name__ == '__main__':
    main()

The problem I have been struggling with for a long time for which I have not found an answer is then the entire Cronjob gets frozen and steals more than 80 GB of memory from the server. How can I deal with such a problem?

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/requests/models.py", line 820, in generate
    yield from self.raw.stream(chunk_size, decode_content=True)
  File "/usr/local/lib/python3.10/site-packages/urllib3/response.py", line 1057, in stream
    yield from self.read_chunked(amt, decode_content=decode_content)
  File "/usr/local/lib/python3.10/site-packages/urllib3/response.py", line 1206, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.10/site-packages/urllib3/response.py", line 1136, in _update_chunk_length
    raise ProtocolError("Response ended prematurely") from None
urllib3.exceptions.ProtocolError: Response ended prematurely
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/app/./main.py", line 192, in <module>
    main()
  File "/app/./main.py", line 185, in main
    result = future.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/app/./main.py", line 79, in scrape_page
    res = session.get(url)
  File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 602, in get
    return self.request("GET", url, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 589, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 746, in send
    r.content
  File "/usr/local/lib/python3.10/site-packages/requests/models.py", line 902, in content
    self._content = b"".join(self.iter_content(CONTENT_CHUNK_SIZE)) or b""
  File "/usr/local/lib/python3.10/site-packages/requests/models.py", line 822, in generate
    raise ChunkedEncodingError(e)
requests.exceptions.ChunkedEncodingError: Response ended prematurely

I have tried to lift versions of the libraries and secure the code by various means, adding a try exception block, but it is difficult for me to determine where the problem might be and what might be causing it.


Solution

  • I managed to solve the problem some time ago. I have now found the time to write a reply.

    Change variable retires and delete mount http.

    retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504], raise_on_status=False)
    session.mount('https://', HTTPAdapter(max_retries=retries))
    

    I added 2 functions.

      def make_request(url, retries=3):
        for attempt in range(retries):
            try:
                res = session.get(url, timeout=(5, 20))
                res.raise_for_status()
                return res
            except ChunkedEncodingError as e:
                if attempt < retries - 1:
                    time.sleep(2 ** attempt)
                    continue
                else:
                    raise e
            except Exception as e:
                raise e
    
    
    
    def scrape_page_with_retry(conn, ean: str, sku: str, supplier: str, sid: str, aid: str, retries=3):
        for attempt in range(retries):
            try:
                return scrape_page(conn, ean, sku, supplier, sid, aid)
            except ChunkedEncodingError:
                if attempt < retries - 1:
                    time.sleep(2 ** attempt)
                    continue
                else:
                    return 400
            except Exception as e:
                return None
    

    Function make_request

    Uses session.get(url, timeout=(5, 20)) to send a GET request, where timeout=(5, 20) means it waits a maximum of 5 seconds for a connection and 20 seconds for a response.

    If it fails 3 times it returns an error.

    Function scrape_page_with_retry

    In the for loop, it performs at most retries of attempts to execute the scrape_page function. If a ChunkedEncodingError occurs, it retries after waiting a time that increases exponentially (2^attempt). If there is still an error after all attempts, it returns 400. I call them up in these places

    url = f'https://www.ebay.de/sch/i.html?_from=R42&_nkw={search_query}&_sacat=0&_sop=2&LH_ItemCondition=3&LH_BIN=1&_stpos=10719&_fcid=77&_fspt=1&LH_PrefLoc=99&rt=nc&_sadis=1000'
    res = make_request(url)
    

    and here

    for ean, sid, aid in zip(list_of_eans, expanded_list_sid, expanded_list_aid):
        future = pool.submit(scrape_page_with_retry, conn, ean[0], ean[1], ean[2], sid, aid)
        future_threads.append(future)
        dd += 1
    

    This solved my problem.