Search code examples
c#elasticsearchkibananestelasticsearch-dsl

How to get more than 10K logs/results in Elasticsearch


How could I go about getting all the logs if lets say I have more than 10K logs/results with the latest version of Elasticsearch (7.13)? I was reading up on scroll search results but at the beginning it shows:

We no longer recommend using the scroll API for deep pagination. If you need to preserve the index state while paging through more than 10,000 hits, use the search_after parameter with a point in time (PIT).

But with search_after it says you can access more than 10,000 hits but you would need to use a point in time api in order to get the PIT (point in time) ID then pass that ID to the search_after parameter. In kibana CLI if you type in the cmd POST /YOUR PATTERN INDEX NAME*/_pit?keep_alive=1m it will return that PIT ID. But how would you go about doing that command in NEST for .net client?

This only tells you what to do if you already have the PIT ID but does not show you how to do the post command to get the PIT ID? Is there a way to do so without having to go to Kibana -> Discover -> CLI and running the command POST /customer-simulation-es-app-logs*/_pit?keep_alive=1m (customer-sim is the name of my index)


Before implementing Rob's sample I had the following:

[HttpGet("GetMonthlyLogs")]
        public async Task<List<EsSource>> GetLogsByDate()
        {
    
            string indexName = "customer-simulation-es-app-logs*";
            var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
            connectionSettings.DefaultIndex(indexName);
            connectionSettings.EnableDebugMode();
            _elasticClient = new ElasticClient(connectionSettings);

            // this will return the number of results in the index based on the criteria below:
            var responseHits = _elasticClient.Count<EsSource>(c => c
                 .Query(q => q
                     .Bool(b => b
                         .Should(
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Error")),
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Information")))
                         .Filter(f => f.DateRange(dr => dr
                         .Field("@timestamp")
                             .GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
                             .LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
                         .MinimumShouldMatch(1)))).Count;

            var response = await _elasticClient.SearchAsync<EsSource>(s => s
                  .Size(3000) // must see about this
                  .Source(src => src.Includes(i => i
                                    .Fields(f => f.timestamp,
                                            f => f.level,
                                            f => f.messageTemplate,
                                            f => f.message)))
                  .Index("customer-simulation-es-app-logs*")
                  .Query(q => q
                      .Bool(b => b
                          .Should(
                                m => m
                                .Match(ma => ma
                                    .Field(fa => fa.level)
                                    .Query("Error")),
                                m => m
                                .Match(ma => ma
                                    .Field(fa => fa.level)
                                    .Query("Information")))
                          .Filter(f => f.DateRange(dr => dr
                          .Field("@timestamp")
                             .GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
                             .LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
                          .MinimumShouldMatch(1))));


            return response?.Documents.ToList();
        }

public class EsSource
{
       [Date(Name = "@timestamp")]
        public DateTimeOffset timestamp { get; set; }
        public String level { get; set; }
        public String messageTemplate { get; set; }
        public String message { get; set; }

}

I attempted to give Rob's sample implementation a stab, what I did was the following. However, my question is if I do not have the "ID" that is inside of EsDocuments can I use timestamp instead? The foreach is needed because that will group the results into 1000 correct? Can I also sort by timestamp or does it strictly have to be the result ID? Since I do not have an ID I was thinking to make another var searchResponse that uses the searchapi, but then create a general variable named EsID so I can for loop through searchResponse within hits so like foreach(var item in searchResponse.Hits(){ EsID = item.Id } then use it for the foreach that has batch (batches.Select(x => EsID)) and use it for the sort. But I felt like that would be repetitive code... correct me if I am wrong?

Please see my implementation here:

private IElasticClient _elasticClient;

[HttpGet("GetMonthlyLogs")]
       public async Task<List<EsSource>> GetLogsByDate()
       {
           
            string indexName = "customer-simulation-es-app-logs*";
            var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
            connectionSettings.DefaultIndex(indexName);
            connectionSettings.EnableDebugMode();
            _elasticClient = new ElasticClient(connectionSettings);

             // this will return the number of results in the index based on the criteria's 
            var responseHits = _elasticClient.Count<EsSource>(c => c
                 .Query(q => q
                     .Bool(b => b
                         .Should(
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Error")),
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Information")))
                         .Filter(f => f.DateRange(dr => dr
                         .Field("@timestamp")
                             .GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
                             .LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
                         .MinimumShouldMatch(1)))).Count;

           

            foreach (var batches in Enumerable.Range(0, (int)responseHits).Batch(1000))
            {
                var bulk = await _elasticClient.IndexManyAsync(batches.Select(x => new EsSource { /* can I use timestamp?? */}));
            }

            await _elasticClient.Indices.RefreshAsync();

            var openPit = await _elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
            var pit = openPit.Id;

            var searchAfter = 0;

            try
            {
                while (true)
                {
                    var response = await _elasticClient.SearchAsync<EsSource>(s => s
                          .TrackTotalHits(false) // disable the tracking of total hits to speed up pagination
                          .Size(1000)
                          // pass pit id & extend lifetime of it by another minute
                          .PointInTime(pit, d => d.KeepAlive("1m"))
                          .Source(src => src.Includes(i => i
                                              .Fields(f => f.timestamp,
                                                      f => f.level,
                                                      f => f.messageTemplate,
                                                      f => f.message)))
                          .Query(q => q
                              .Bool(b => b
                                  .Should(
                                          m => m
                                          .Match(ma => ma
                                              .Field(fa => fa.level)
                                              .Query("Error")),
                                          m => m
                                          .Match(ma => ma
                                              .Field(fa => fa.level)
                                              .Query("Information")))
                                  .Filter(f => f.DateRange(dr => dr
                                  .Field("@timestamp")
                                      .GreaterThanOrEquals("2021-07-14T00:00:00.000-05:00")
                                      .LessThanOrEquals("2021-07-14T23:59:59.999-05:00")))
                                  .MinimumShouldMatch(1)))
                          // can I assort with timestamp or does it have to be the result ID?
                          .Sort(srt => srt.Ascending(f => f.timestamp))
                          .SearchAfter(searchAfter));

                    if (response.Documents.Count == 0)
                    {
                        break;
                    }


                    //searchAfter = response.Documents.LastOrDefault()?.timestamp ?? x;
                }
            }
            finally
            {
                // closing the pit
                var closePit = await _elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
            }

            return // response?.Documents.ToList();
        }


    public class EsSource
    {
        [Date(Name = "@timestamp")]
        public DateTimeOffset timestamp { get; set; }
        public String level { get; set; }
        public String messageTemplate { get; set; }
        public String message { get; set; }        
    }

Solution

  • I've prepared a sample app with comments which demonstrate how to retrieve all docs from an index using PIT and search after.

    class Program
    {
        static async Task Main(string[] args)
        {
            string indexName = "test";
            var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
            connectionSettings.DefaultIndex(indexName);
            connectionSettings.EnableDebugMode();
            var elasticClient = new ElasticClient(connectionSettings);
    
            await elasticClient.Indices.DeleteAsync(indexName);
            var indexResponse = await elasticClient.Indices.CreateAsync(indexName);
    
            // index some test data
            // Batch coming from morelinq nuget
            Console.WriteLine($"Index some data into index");
            foreach (var batches in Enumerable.Range(0, 20000).Batch(1000))
            {
                var bulk = await elasticClient.IndexManyAsync(batches.Select(x => new EsDocument {Id = x }));
            }
    
            await elasticClient.Indices.RefreshAsync();
    
            var countResponse = await elasticClient.CountAsync<EsDocument>(d => d);
            Console.WriteLine($"Documents in index: {countResponse.Count}");
    
            Console.WriteLine($"Open new pit");
            var openPit = await elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
            var pit = openPit.Id;
    
            Console.WriteLine($"Read all docs from index ..");
            // we will start reading docs from the beginning
            var searchAfter = 0;
            try
            {
                while (true)
                {
                    var searchResponse = await elasticClient.SearchAsync<EsDocument>(s => s
                        // disable the tracking of total hits to speed up pagination.
                        .TrackTotalHits(false)
                        .Size(1000)
                        // pass pit id and extend lifetime of it by another minute
                        .PointInTime(pit, d => d.KeepAlive("1m"))
                        .Query(q => q.MatchAll())
                        // sort by Id filed so we can pass last retrieved id to next search
                        .Sort(sort => sort.Ascending(f => f.Id))
                        // pass last id we received from prev. search request so we can keep retrieving more documents
                        .SearchAfter(searchAfter));
    
                    // if we didn't receive any docs just stop processing
                    if (searchResponse.Documents.Count == 0)
                    {
                        break;
                    }
    
                    Console.WriteLine(
                        $"Id [{searchResponse.Documents.FirstOrDefault()?.Id}..{searchResponse.Documents.LastOrDefault()?.Id}]");
                    searchAfter = searchResponse.Documents.LastOrDefault()?.Id ?? 0;
                }
            }
            finally
            {
                Console.WriteLine($"Close pit");
                var closePit = await elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
            }
        }
    
        class EsDocument
        {
            public int Id { get; set; }
        }
    }
    

    Prints

    Index some data into index
    Documents in index: 20000
    Open new pit
    Read all docs from index ..
    Id [1..1000]
    Id [1001..2000]
    Id [2001..3000]
    Id [3001..4000]
    Id [4001..5000]
    Id [5001..6000]
    Id [6001..7000]
    Id [7001..8000]
    Id [8001..9000]
    Id [9001..10000]
    Id [10001..11000]
    Id [11001..12000]
    Id [12001..13000]
    Id [13001..14000]
    Id [14001..15000]
    Id [15001..16000]
    Id [16001..17000]
    Id [17001..18000]
    Id [18001..19000]
    Id [19001..19999]
    Close pit