Search code examples
pythonpython-3.xpython-asyncioexasol

python asynchronous data pull error: __aexit__/__enter__


I am trying to write code leveraging async functionality of Python. I have a DB connection class where I have code for (dis)connecting with DB and also for fetching the data. Now I want to asynchronously fetch data using fetch data method based on one identifier. Code is as shown below:

import pyexasol
import pandas as pd
import logging
from typing import Iterable
import asyncio
import tqdm


class Exa(object):
    def __init__(self, dsn: str = '1.2.3.4',
                 user: str = os.environ['UID'],
                 password: str = os.environ['PWD']):
        self.__dsn = dsn
        self.__user = user
        self.__password = password
        self.conn = None

    def __connect(self):
        if self.conn is None:
            try:
                self.conn = pyexasol.connect(dsn=self.__dsn, user=self.__user,
                    password=self.__password, encryption=True)
            except Exception as e:
                logging.error(f"Error in connecting with Exasol. Error is: {e}")

    def __disconnect(self):
        if self.conn is not None:
            try:
                self.conn.close()
            except Exception as e:
                logging.error(f"Exception in disconnecting DB. Error is {e}")
            self.conn = None

    def fetch(self, query: str, leave_connection_open: bool = False) -> pd.DataFrame:
        # connect and execute the query
        self.__connect()
        try:
            res = self.conn.export_to_pandas(query)
            res.columns = res.columns.str.lower()
        except Exception as e:
            self.__disconnect()
            return pd.DataFrame()
        if not leave_connection_open:
            self.__disconnect()
        return res

    def fetch_batch(self, pattern: str, replacement: Iterable,
                    query: str, batchsize: int = 5000) -> pd.DataFrame:
        res = asyncio.run(self._fetch_batch(pattern=pattern, replacement=replacement,
                                            query=query, batchsize=batchsize))
        return res

    async def _fetch_batch(self, pattern: str, replacement: Iterable,
                           query: str, batchsize: int = 5000) -> pd.DataFrame:

        replacement = list(replacement)
        # breaking into batches
        if any(isinstance(i, str) for i in replacement):
            batches = ["'" + "','".join(replacement[i:i + batchsize]) + "'"
                       for i in range(0, len(replacement), batchsize)]
        else:
            batches = [",".join(replacement[i:i + batchsize])
                       for i in range(0, len(replacement), batchsize)]
        # connecting and executing query in batches
        nbatches = len(batches)
        self.__connect()
        try:
            tasks = [self.__run_batch_query(query=query.replace(pattern, batches[i]),
                                            i=i, nbatches=nbatches) for i in range(nbatches)]
            # progress bar
            res = [await f for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))]
        except Exception as e:
            logging.error("Could not fetch batches of data. Error is: %s", e)
        '''finally:
            self.__disconnect()'''
        # dataframe concatenation
        res = pd.concat(res)
        res.columns = res.columns.str.lower()
        return res

    async def __run_batch_query(self, query: str,
                                i: int, nbatches: int) -> pd.DataFrame:
        logging.info("Fetching %d/%d", i + 1, nbatches)
        async with self.fetch(query=query, leave_connection_open=True) as resp:
            raw = await resp
        return raw

I am running this code with:

from foo import Exa
db = Exa()

ids = db.fetch('select id from application limit 100')
ids1 = db.fetch_batch(pattern='IDS',
                         replacement=ids['id'],
                         query='select id from application where id in (IDS)',
                         batchsize=25)

but then I get error like:

ERROR:root:Could not fetch batches of data. Error is: __aexit__
Traceback (most recent call last):
  File "/home/priya/pydbutils/gitignored/foo2.py", line 85, in __run_batch_query
    async with self.fetch(query=query, leave_connection_open=True) as resp:
AttributeError: __aexit__

Also if I change __run_batch_query() method call to self.fetch() method without async then error changes to:

ERROR:root:Could not fetch batches of data. Error is: __enter__
Traceback (most recent call last):
  File "/home/priya/pydbutils/gitignored/foo2.py", line 85, in __run_batch_query
    with self.fetch(query=query, leave_connection_open=True) as resp:
AttributeError: __enter__

please help by pointing out the mistake if any here?


Solution

  • pyexasol creator is here.

    Please note, asyncio will not provide any benefits for Exasol-related scenarios. Ayncio runs on single CPU and utilises single network connection, which prevents any meaningful scaling.

    The the most efficient ways to load data from Exasol server are:

    • export_to_pandas() or export_to_callback() for single Python process;
    • export_parallel() + http_transport() for multiple Python processes;

    Please check HTTP Transport (parallel) manual page for explanations and examples. This approach scales linearly, and you may even run the computation tasks on multiple servers.

    For simple scenarios, you may consider compression=True connection option if you transfer large amounts of data over slow (e.g. WiFi) network.