Search code examples
pythonelasticsearchindexingkibanabulkinsert

Bulk Indexing Error in Elastic search Python


I am trying to ingest a simple Hello World example into a Data Stream using the Bulk call as shown in documemtation https://elasticsearch-py.readthedocs.io/en/v8.6.2/helpers.html#bulk-helpers

Traceback (most recent call last):
  File "C:\Users\elastic\Documents\azure_repos\dataingestion\data-ingestion-test-function\data_stream\hello_world.py", line 117, in <module>
    bulk(client=client, index='test-data-stream', actions=data)
  File "C:\Users\elastic\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 524, in bulk
    for ok, item in streaming_bulk(
  File "C:\Users\elastic\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 438, in streaming_bulk
    for data, (ok, info) in zip(
  File "C:\Users\elastic\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 355, in _process_bulk_chunk
    yield from gen
  File "C:\Users\elastic\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 274, in _process_bulk_chunk_success
    raise BulkIndexError(f"{len(errors)} document(s) failed to index.", errors)
elasticsearch.helpers.BulkIndexError: 2 document(s) failed to index.

I have tried this same code for ingesting into an index and it has worked

This is the code for client

client = Elasticsearch(
                "https://xx.xx.x.xx:9200",
                basic_auth=("username","password"),
                verify_certs=False)

I have created the ilm policy, index template, component template as shown in this tutorial
https://opster.com/guides/elasticsearch/data-architecture/elasticsearch-data-streams/

I have created this in Kibana and called the data stream test-data-stream and confirmed that the data stream was created successfully using Kibana UI

I was sucessfully able to ingest the data into data stream using api calls using postman but I am having trouble ingesting using python code

This is what I want to ingest

data = [{"message": "Hello World", "@timestamp": "2023-01-11T11:54:44Z"},
            {"message": "Hello World1", "@timestamp": "2023-01-11T11:54:44Z"}]

I used this code this to ingest

client.indices.delete_data_stream(name='test-data-stream', error_trace=True)
client.indices.create_data_stream(name='test-data-stream', error_trace=True)

bulk(client=client, index='test-data-stream', actions=data)

In the index parameter, If I switch to index, the code works fine but it doesn't work for data stream


Solution

  • When indexing into a data_stream, you MUST have op_type: create and bulk does op_type: index by default, so you need to specify it like this in your documents:

    data = [{"_op_type": "create", "message": "Hello World", "@timestamp": "2023-01-11T11:54:44Z"},
            {"_op_type": "create", "message": "Hello World1", "@timestamp": "2023-01-11T11:54:44Z"}]