Search code examples
nest

NEST ElasticClient C# bulk insert collection


I am trying to bulk insert a collection of data into Elastic Search using NEST ElasticClient library.

    var pool = new SingleNodeConnectionPool(new Uri($"http://localhost:9200"));
    var settings = new ConnectionSettings(pool);

    var client = new ElasticClient(settings);

    var path = @"D:\data.xml";

    var serializer = new XmlSerializer(typeof(Entity));

    using (TextReader reader = new StreamReader(new FileStream(path, FileMode.Open))) {
        var collection = (Entity)serializer.Deserialize(reader);             

        var elasticSearchEntities = new List<ElasticSearchEntity>();
        
        for (int i = 0; i < collection.Entity.Length; i++)
        {
            var elasticSearchEntity = new ElasticSearchEntity
            {
                _index = "entity",
                 _type = "entity",
                _id = collection.Entity[i].id.ToString(),
                Entity = collection.Entity[i],
            };
            elasticSearchEntities.Add(elasticSearchEntity);
     
        }
        var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));
    }

I have put a breakpoint at this line below and elasticSearchEntities has the data in the object.

var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));

but after it runs, index is not created.

If I use Index, it works but it is slow as it inserts one by one. I need bulk insert if possible.


Solution

  • The construction of the input to the bulk API doesn't look correct with the low level client. Each bulk operation should consist of two objects

    1. An object representing the bulk operation to perform e.g. index, and associated metadata
    2. An object representing the document

    It looks like the example in the question combines both of these into one object, which probably results in an error - the bulk response will have more details.

    As asked in the comments, is there a reason why you're using the low level client in particular? There is a bulk observable helper in the high level client that can help with indexing a large number of documents, which is useful if those documents are coming from some other source like a file or database.

    For example, indexing all questions and answers from Stack Overflow's posts.xml archive

    public class Question : Post
    {
        public string Title { get; set; }
    
        public CompletionField TitleSuggest { get; set; }
    
        public int? AcceptedAnswerId { get; set; }
    
        public int ViewCount { get; set; }
    
        public string LastEditorDisplayName { get; set; }
    
        public List<string> Tags { get; set; }
    
        public int AnswerCount { get; set; }
    
        public int FavoriteCount { get; set; }
    
        public DateTimeOffset? CommunityOwnedDate { get; set; }
    
        public override string Type => nameof(Question);
    }
    
    public class Answer : Post
    {
        public override string Type => nameof(Answer);
    }
    
    public class Post
    {
        public int Id { get; set; }
    
        public JoinField ParentId { get; set; }
    
        public DateTimeOffset CreationDate { get; set; }
    
        public int Score { get; set; }
    
        public string Body { get; set; }
    
        public int? OwnerUserId { get; set; }
    
        public string OwnerDisplayName { get; set; }
    
        public int? LastEditorUserId { get; set; }
    
        public DateTimeOffset? LastEditDate { get; set; }
    
        public DateTimeOffset? LastActivityDate { get; set; }
    
        public int CommentCount { get; set; }
    
        public virtual string Type { get; }
    }
    
    void Main()
    {
        var indexName = "posts";
        var node = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
        var settings = new ConnectionSettings(node)
            .RequestTimeout(TimeSpan.FromMinutes(10))
            .DefaultMappingFor(new ClrTypeMapping[] {
                new ClrTypeMapping(typeof(Post)) { IndexName = indexName },
                new ClrTypeMapping(typeof(Question)) { IndexName = indexName, RelationName = "question" },
                new ClrTypeMapping(typeof(Answer)) { IndexName = indexName },
            })
            .OnRequestCompleted(response =>
            {        
                if (response.Success)
                    Console.WriteLine($"Status: {response.HttpStatusCode}");
                else
                    Console.WriteLine($"Error: {response.DebugInformation}");
            });
    
    
        var client = new ElasticClient(settings);  
        var characterFilterMappings = CreateCharacterFilterMappings();
    
        if (!client.Indices.Exists(indexName).Exists)
        {
            var createIndexResponse = client.Indices.Create(indexName, c => c
                .Settings(s => s
                    .NumberOfShards(3)
                    .NumberOfReplicas(0)
                    .Analysis(a => a
                        .CharFilters(cf => cf
                            .Mapping("programming_language", mca => mca
                                .Mappings(characterFilterMappings)
                            )
                        )
                        .Analyzers(an => an
                            .Custom("html", ca => ca
                                .CharFilters("html_strip", "programming_language")
                                .Tokenizer("standard")
                                .Filters("standard", "lowercase", "stop")
                            )
                            .Custom("expand", ca => ca
                                .CharFilters("programming_language")
                                .Tokenizer("standard")
                                .Filters("standard", "lowercase", "stop")
                            )
                        )
                    )
                )
                .Map<Post>(u => u
                    .RoutingField(r => r.Required())
                    .AutoMap<Question>()
                    .AutoMap<Answer>()
                    .SourceField(s => s
                        .Excludes(new[] { "titleSuggest" })
                    )
                    .Properties<Question>(p => p
                        .Join(j => j
                            .Name(f => f.ParentId)
                            .Relations(r => r
                                .Join<Question, Answer>()
                            )
                        )
                        .Text(s => s
                            .Name(n => n.Title)
                            .Analyzer("expand")
                            .Norms(false)
                            .Fields(f => f
                                .Keyword(ss => ss
                                    .Name("raw")
                                )
                            )
                        )
                        .Keyword(s => s
                            .Name(n => n.OwnerDisplayName)
                        )
                        .Keyword(s => s
                            .Name(n => n.LastEditorDisplayName)
                        )
                        .Keyword(s => s
                            .Name(n => n.Tags)
                        )
                        .Keyword(s => s
                            .Name(n => n.Type)
                        )
                        .Text(s => s
                            .Name(n => n.Body)
                            .Analyzer("html")
                            .SearchAnalyzer("expand")
                        )
                        .Completion(co => co
                            .Name(n => n.TitleSuggest)
                        )
                    )
                )
            );
    
            if (!createIndexResponse.IsValid)
                Console.WriteLine($"invalid response creating index. {createIndexResponse.DebugInformation}");
        }
        
        var seenPages = 0;
        var handle = new ManualResetEvent(false);
        var size = 1000;
    
        var observableBulk = client.BulkAll<Post>(GetQuestionsAndAnswers(), f => f
            .MaxDegreeOfParallelism(16)
            .BackOffTime(TimeSpan.FromSeconds(10))
            .BackOffRetries(2)
            .Size(size)
            .BufferToBulk((bulk, posts) =>
            {
                foreach (var post in posts)
                {
                    if (post is Question question)
                    {
                        var item = new BulkIndexOperation<Question>(question);
                        bulk.AddOperation(item);
                    }
                    else
                    {
                        var answer = (Answer)post;
                        var item = new BulkIndexOperation<Answer>(answer);
                        bulk.AddOperation(item);
                    }
                }
            })
            .RefreshOnCompleted()
            .Index(indexName)
        );
    
        ExceptionDispatchInfo exception = null;
    
        var bulkObserver = new BulkAllObserver(
            onError: e => 
            { 
                exception = ExceptionDispatchInfo.Capture(e);
                handle.Set();
            },
            onCompleted: () => handle.Set(),
            onNext: b =>
            {
                Interlocked.Increment(ref seenPages);
                Console.WriteLine($"indexed {seenPages} pages");
            }
        );
        
        observableBulk.Subscribe(bulkObserver);   
        handle.WaitOne();
        
        if (exception != null) 
            exception.Throw();
    }
    
    public IEnumerable<Post> GetQuestionsAndAnswers()
    {
        using (var stream = File.OpenRead(@"stackoverflow_data\Posts.xml"))
        using (var reader = XmlReader.Create(stream))
        {
            reader.ReadToDescendant("posts");
            reader.ReadToDescendant("row");
            do
            {
                var item = (XElement)XNode.ReadFrom(reader);
                var id = int.Parse(item.Attribute("Id").Value);
                var postTypeId = int.Parse(item.Attribute("PostTypeId").Value);
                var score = int.Parse(item.Attribute("Score").Value);
                var body = item.Attribute("Body")?.Value;
                var creationDate = DateTimeOffset.Parse(item.Attribute("CreationDate").Value);
                var commentCount = int.Parse(item.Attribute("CommentCount").Value);
                var ownerUserId = item.Attribute("OwnerUserId") != null
                    ? int.Parse(item.Attribute("OwnerUserId").Value)
                    : (int?)null;
                var ownerDisplayName = item.Attribute("OwnerDisplayName")?.Value;
                var lastEditorUserId = item.Attribute("LastEditorUserId") != null
                    ? int.Parse(item.Attribute("LastEditorUserId").Value)
                    : (int?)null;
                var lastEditDate = item.Attribute("LastEditDate") != null
                    ? DateTimeOffset.Parse(item.Attribute("LastEditDate").Value)
                    : (DateTimeOffset?)null;
                var lastActivityDate = item.Attribute("LastActivityDate") != null
                    ? DateTimeOffset.Parse(item.Attribute("LastActivityDate").Value)
                    : (DateTimeOffset?)null;
                    
                switch (postTypeId)
                {
                    case 1:
                        var title = item.Attribute("Title")?.Value;
                        
                        var question = new Question
                        {
                            Id = id,
                            ParentId = JoinField.Root<Question>(),
                            AcceptedAnswerId = item.Attribute("AcceptedAnswerId") != null
                                ? int.Parse(item.Attribute("AcceptedAnswerId").Value)
                                : (int?)null,
                            CreationDate = creationDate,
                            Score = score,
                            ViewCount = int.Parse(item.Attribute("ViewCount").Value),
                            Body = body,
                            OwnerUserId = ownerUserId,
                            OwnerDisplayName = ownerDisplayName,
                            LastEditorUserId = lastEditorUserId,
                            LastEditorDisplayName = item.Attribute("LastEditorDisplayName")?.Value,
                            LastEditDate = lastEditDate,
                            LastActivityDate = lastActivityDate,
                            Title = title,
                            TitleSuggest = new CompletionField
                            {
                                Input = new[] { title },
                                Weight = score < 0 ? 0 : score
                            },
                            Tags = item.Attribute("Tags") != null
                                ? item.Attribute("Tags").Value.Replace("<", string.Empty)
                                    .Split(new[] { ">" }, StringSplitOptions.RemoveEmptyEntries)
                                    .ToList()
                                : null,
                            AnswerCount = int.Parse(item.Attribute("AnswerCount").Value),
                            CommentCount = commentCount,
                            FavoriteCount = item.Attribute("FavoriteCount") != null
                                ? int.Parse(item.Attribute("FavoriteCount").Value)
                                : 0,
                            CommunityOwnedDate = item.Attribute("CommunityOwnedDate") != null
                                ? DateTimeOffset.Parse(item.Attribute("CommunityOwnedDate").Value)
                                : (DateTimeOffset?)null
                        };
                        
                        yield return question;
                        break;
                    case 2:
                        var answer = new Answer
                        {
                            Id = id,
                            ParentId = JoinField.Link<Answer>(int.Parse(item.Attribute("ParentId").Value)),
                            CreationDate = creationDate,
                            Body = body,
                            OwnerUserId = ownerUserId,
                            OwnerDisplayName = ownerDisplayName,
                            LastEditorUserId = lastEditorUserId,
                            LastEditDate = lastEditDate,
                            LastActivityDate = lastActivityDate,
                            CommentCount = commentCount,
                        };
                        
                        yield return answer;
                        break;
                }
            }
            while (reader.ReadToNextSibling("row"));
        }
    }
    
    /* 
    * Simple char filter mappings to transform common
    * programming languages in symbols to words
    * e.g. c# => csharp, C++ => cplusplus
    */
    private IList<string> CreateCharacterFilterMappings()
    {
        var mappings = new List<string>();
        foreach (var c in new[] { "c", "f", "m", "j", "s", "a", "k", "t" })
        {
            mappings.Add($"{c}# => {c}sharp");
            mappings.Add($"{c.ToUpper()}# => {c}sharp");
        }
    
        foreach (var c in new[] { "g", "m", "c", "s", "a", "d" })
        {
            mappings.Add($"{c}++ => {c}plusplus");
            mappings.Add($"{c.ToUpper()}++ => {c}plusplus");
        }
        
        return mappings;
    }
    

    IEnumerable<Post> GetQuestionsAndAnswers() yields questions and answers from the large posts.xml file (~50GB in size if I recall), feeding these to BulkAll, which will concurrently make up to 16 bulk requests at a time to Elasticsearch, where each bulk request indexes 1000 documents. See this GitHub repository for a more comprehensive example.