Search code examples
elasticsearchasynchronouspython-asyncioelasticsearch-py

Does the AsyncElasticsearch client use the same session for async actions?


Does the AsyncElasticsearch client open a new session for each async request?

AsyncElasticsearch (from elasticsearch-py) uses AIOHTTP. From what I understand, AIOHTTP recommends a using a context manager for the aiohttp.ClientSession object, so as to not generate a new session for each request:

async with aiohttp.ClientSession() as session:
    ...

I'm trying to speed up my bulk ingests.

  • How do I know if the AsyncElasticsearch client is using the same session, or setting up multiple?
  • Do I need the above async with... command in my code snippet below?
# %%------------------------------------------------------------------------------------
# Create async elastic client
async_es = AsyncElasticsearch(
    hosts=[os.getenv("ELASTIC_URL")],
    verify_certs=False,
    http_auth=(os.getenv("ELASTIC_USERNAME"), os.getenv("ELASTIC_PW")),
    timeout=60 * 60,
    ssl_show_warn=False,
)

# %%------------------------------------------------------------------------------------
# Upload csv to elastic
# Chunk files to keep memory low
with pd.read_csv(file, usecols=["attributes"], chunksize=50_000) as reader:
    for df in reader:

        # Upload to elastic with username as id
        async def generate_actions(df_chunk):
            for index, record in df_chunk.iterrows():
                doc = record.replace({np.nan: None}).to_dict()
                doc.update(
                    {"_id": doc["username"], "_index": "users",}
                )
                yield doc

        es_upl_chunk = 1000

        async def main():
            tasks = []
            for i in range(0, len(df), es_upl_chunk):
                tasks.append(
                    helpers.async_bulk(
                        client=async_es,
                        actions=generate_actions(df[i : i + es_upl_chunk]),
                        chunk_size=es_upl_chunk,
                    )
                )
            successes = 0
            errors = []
            print("Uploading to es...")
            progress = tqdm(unit=" docs", total=len(df))
            for task in asyncio.as_completed(tasks):
                resp = await task
                successes += resp[0]
                errors.extend(resp[1])
                progress.update(es_upl_chunk)
            return successes, errors

        responses = asyncio.run(main())
        print(f"Uploaded {responses[0]} documents from {file}")
        if len(responses[1]) > 0:
            print(
                f"WARNING: Encountered the following errors: {','.join(responses[1])}"
            )

Solution

  • Turns out the AsyncElasticsearch was not the right client to speed up bulk ingests in this case. I use the helpers.parallel_bulk() function instead.