Search code examples
python-3.xasync-awaitpython-asyncio

Processing millions of records using asyncio causes memory error


I get the below error

Fatal Python error: Cannot recover from MemoryErrors while normalizing exceptions. Current thread 0x0000ffff88de5010 (most recent call first): File "test.py", line 173 in wrap_get_fuzzy_match File "/usr/lib64/python3.7/asyncio/events.py", line 88 in _run File "/usr/lib64/python3.7/asyncio/base_events.py", line 1786 in _run_once File "/usr/lib64/python3.7/asyncio/base_events.py", line 541 in run_forever File "/usr/lib64/python3.7/asyncio/base_events.py", line 574 in run_until_complete File "test.py", line 224 in Aborted

async def get_valuation(url, params, api_header, session, semaphore):
        async with semaphore:
            async with session.get(url, headers=api_header) as response:
                status_code = response.status
                try:
                    if status_code != 200:
                        mmr = {params: 'not found' + ',' + ' ' + str(status_code)}
                    else:
                        asynch_response = await response.json()
                        mmr = await get_best_match(params, asynch_response, str(status_code))
                    return mmr
                except Exception as ex:
                    LOGGER.error(f"Error in get valuation and error was {ex}")
                    return ex


async def wrap_get_fuzzy_match(func, *args, **kwargs):
       try:
           return await func(*args, **kwargs)
       except Exception as err:
           LOGGER.error(f"Error in wrap_get_fuzzy_match and error was {err}")
           return err

async def main(headers, file):
        tasks = []
        sema = asyncio.Semaphore(500)
        BATCH_SIZE = 1000000
        async with ClientSession() as session:
            with open(file) as f:
                while True:
                    batch = [line.strip('\n') for line in islice(f, BATCH_SIZE)]
                    if not batch:
                        break
                    for param in batch:
                        task = asyncio.ensure_future(wrap_get_fuzzy_match(
                            get_valuation,
                            url= API + param,
                            params=param,
                            api_header=headers,
                            session=session,
                            semaphore=sema,
                        ))
                        tasks.append(task)
            responses = await asyncio.gather(*tasks)
            return responses

Solution

  • I solved the issue by passing data in chunks and calling the main function in the loop.

    async def get_valuation(url, params, api_header, session, semaphore):
        """
        Call fuzzy match api
        :param url:
        :param api_header:
        :param session:
        :param semaphore:
        :return:
        """
        async with semaphore:
            async with session.get(url, headers=api_header) as response:
                status_code = response.status
    
                try:
                    if status_code != 200:
                        mmr = {params: 'not found' + ',' + ' ' + str(status_code)}
                    else:
                        asynch_response = await response.json()
                        mmr = await get_best_match(params, asynch_response, str(status_code))
                    return mmr
                except Exception as ex:
                    LOGGER.error(f"Error in get valuation and error was {ex}")
                    return ex
    
    
    async def wrap_get_fuzzy_match(func, *args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except Exception as err:
            LOGGER.error(f"Error in wrap_get_fuzzy_match and error was {err}")
            return err
    
    
    async def main(params, headers):
        tasks = []
        sema = asyncio.Semaphore(100)
        async with ClientSession() as session:
            async with timeout(None):
                LOGGER.info(f"Number of urls to process: {len(tasks)}")
                for param in params:
                    task = asyncio.ensure_future(wrap_get_fuzzy_match(
                        get_valuation,
                        url=API,
                        params=param,
                        api_header=headers,
                        session=session,
                        semaphore=sema,
                    ))
                    tasks.append(task)
                responses = await asyncio.gather(*tasks)
                return responses
    
    
    if __name__ == '__main__':
        LOGGER.info("Start Processing")
        BATCH_SIZE = <size of each batch>
        loop = asyncio.get_event_loop()
        try:
            with open(INPUT) as file:
                inputs = file.readlines()
        except IOError:
            LOGGER.exception("Unable to read valuation input file")
            raise
        chunked_list = list(divide_chunks(big_list=inputs, chunk_size=BATCH_SIZE))
        LOGGER.info(
            f"Chunked size- {len(chunked_list)}"
        )
        batch_counter = 0
        for params in chunked_list:
            batch_counter += 1
            LOGGER.info(
                f"Starting batch number [{batch_counter}] out of [{len(chunked_list)}] "
            )
            results = loop.run_until_complete(
                asyncio.ensure_future(
                    main(params= params,headers=hdr)
                )
            )
            
        LOGGER.info("Processing Completed!!")
        loop.close()