Search code examples
apielasticsearchasynchronoussearchscroll

Elastic Search Scroll API Asynchronous execution


I'm running an elastic search cluster 5.6 version with 70Gb index size/ day. At the end of the day we are requested to make summarizations of each hour for the last 7 day. We are using the java version of the High Level Rest client and considering the amount of docs each query returns is critical to scroll the results.

In order to take advantage of the CPUs we have, and decrease the reading time, we were thinking about using the search Scroll Asynchronous version but we are missing some example and at least the logic inside it to move forward.

We already check elastic related documentation but it's to vague:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-high-search-scroll.html#java-rest-high-search-scroll-async

We also ask in the elastic discussion forum as they say but it looks like nobody can't answer that:

https://discuss.elastic.co/t/no-code-for-example-of-using-scrollasync-with-the-java-high-level-rest-client/165126

Any help on this will be very appreciated and for sure I'm not the only one having this req.


Solution

  • Here the example code:

    public class App {
    
      public static void main(String[] args) throws IOException, InterruptedException {
        RestHighLevelClient client = new RestHighLevelClient(
            RestClient.builder(HttpHost.create("http://localhost:9200")));
    
        client.indices().delete(new DeleteIndexRequest("test"), RequestOptions.DEFAULT);
        for (int i = 0; i < 100; i++) {
          client.index(new IndexRequest("test", "_doc").source("foo", "bar"), RequestOptions.DEFAULT);
        }
        client.indices().refresh(new RefreshRequest("test"), RequestOptions.DEFAULT);
    
        SearchRequest searchRequest = new SearchRequest("test").scroll(TimeValue.timeValueSeconds(30L));
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        String scrollId = searchResponse.getScrollId();
    
        System.out.println("response = " + searchResponse);
    
        SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)
            .scroll(TimeValue.timeValueSeconds(30));
    
        //I was missing to wait for the results
        final CountDownLatch countDownLatch = new CountDownLatch(1);
    
        client.scrollAsync(scrollRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
          @Override
          public void onResponse(SearchResponse searchResponse) {
            System.out.println("response async = " + searchResponse);
          }
    
          @Override
          public void onFailure(Exception e) {
          }
        });
    
        //Here we wait
        countDownLatch.await();
    
        //Clear the scroll if we finish before the time to keep it alive.
        //Otherwise it will be clear when the time is reached.
        ClearScrollRequest request = new ClearScrollRequest();
        request.addScrollId(scrollId);
    
        client.clearScrollAsync(request, new ActionListener<ClearScrollResponse>() {
          @Override
          public void onResponse(ClearScrollResponse clearScrollResponse) {
          }
    
          @Override
          public void onFailure(Exception e) {
          }
        });
    
        client.close();           
      }
    }
    

    Thanks to David Pilato elastic discussion