Search code examples
c#elasticsearchnest

how to bulk insert documents to ElasticSearch without updating when document exists


I'm using elastic search with Nest library. I would like to know how can I bulk insert documents to ElasticSearch without updating when document exists?


Solution

  • Here's an example of a bulk API call that will perform create operations

    private static void Main()
    {
        var defaultIndex = "documents";
        var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
        var settings = new ConnectionSettings(pool)
            .DefaultIndex(defaultIndex);
    
        var client = new ElasticClient(settings);
    
        if (client.IndexExists(defaultIndex).Exists)
            client.DeleteIndex(defaultIndex);
    
        client.Index(new MyDocument(1) 
        { 
            Message = "new" 
        }, i => i.Refresh(Refresh.WaitFor));
    
        var documents = new [] 
        {
            new MyDocument(1) { Message = "updated" },
            new MyDocument(2) { Message = "updated" },
            new MyDocument(3) { Message = "updated" },
        };
    
        client.Bulk(b => b
            .CreateMany(documents)
            .Refresh(Refresh.WaitFor)
        );
    
        var getResponse = client.Get<MyDocument>(1);
    
        Console.WriteLine(getResponse.Source.Message == "new");
    }
    
    public class MyDocument 
    {
        public MyDocument(int id) => Id = id;
    
        public int Id { get; set; }  
    
        public string Message { get; set; }
    }
    

    The output will be true meaning document with Id 1 was not created within the bulk call because it already exists. If you take a look at the bulk response, it'll be a HTTP 200 response similar to

    {
      "took" : 1387,
      "errors" : true,
      "items" : [
        {
          "create" : {
            "_index" : "documents",
            "_type" : "mydocument",
            "_id" : "1",
            "status" : 409,
            "error" : {
              "type" : "version_conflict_engine_exception",
              "reason" : "[mydocument][1]: version conflict, document already exists (current version [1])",
              "index_uuid" : "DZIgGMZcSlWRycC1MGhJWQ",
              "shard" : "3",
              "index" : "documents"
            }
          }
        },
        {
          "create" : {
            "_index" : "documents",
            "_type" : "mydocument",
            "_id" : "2",
            "_version" : 1,
            "result" : "created",
            "_shards" : {
              "total" : 2,
              "successful" : 1,
              "failed" : 0
            },
            "_seq_no" : 0,
            "_primary_term" : 1,
            "status" : 201
          }
        },
        {
          "create" : {
            "_index" : "documents",
            "_type" : "mydocument",
            "_id" : "3",
            "_version" : 1,
            "result" : "created",
            "_shards" : {
              "total" : 2,
              "successful" : 1,
              "failed" : 0
            },
            "_seq_no" : 0,
            "_primary_term" : 1,
            "status" : 201
          }
        }
      ]
    }
    

    Importantly, "errors" is true and the first "create" operation response indicates what the error was.

    An alternative approach to using .CreateMany(...) would be to use .UpdateMany(...) with an upsert operation, specifying a "no-op" operation in the case the document exists

    client.Bulk(b => b
        .UpdateMany(documents, (d, document) => d
            .Upsert(document)
            .Script(s => s
                .Source("ctx.op = 'none'")
            )
        )
        .Refresh(Refresh.WaitFor)
    );
    

    The outcome is the same, that is, document with Id 1 is not overwritten, but the response is slightly different

    {
      "took" : 1307,
      "errors" : false,
      "items" : [
        {
          "update" : {
            "_index" : "documents",
            "_type" : "mydocument",
            "_id" : "1",
            "_version" : 1,
            "result" : "noop",
            "_shards" : {
              "total" : 2,
              "successful" : 1,
              "failed" : 0
            },
            "status" : 200
          }
        },
        {
          "update" : {
            "_index" : "documents",
            "_type" : "mydocument",
            "_id" : "2",
            "_version" : 1,
            "result" : "created",
            "_shards" : {
              "total" : 2,
              "successful" : 1,
              "failed" : 0
            },
            "_seq_no" : 0,
            "_primary_term" : 1,
            "status" : 201
          }
        },
        {
          "update" : {
            "_index" : "documents",
            "_type" : "mydocument",
            "_id" : "3",
            "_version" : 1,
            "result" : "created",
            "_shards" : {
              "total" : 2,
              "successful" : 1,
              "failed" : 0
            },
            "_seq_no" : 0,
            "_primary_term" : 1,
            "status" : 201
          }
        }
      ]
    }
    

    Note that "errors" now is false, and the first "update" operation is a "noop".