Search code examples
javareactive-programmingspring-data-elasticsearch

How I can wait until all item were consumed


My task is simple, I need to query elasticsearch rolling index with reactive way.

As the @Document didn't support index name with Spring EL, like @Document(index = "indexName-#(new Date().format(yyyy-MM-dd))")

So I am trying to call elasticsearch with ReactiveElasticsearchTemplate which support me changing index name in runtime.

But as the data volume is larger than 10000, so I need to use scroll to repeat query until we get all data.

I have finished the first query and scroll query and it could return value.

But I need to combine all result and then return.

How can I do that? For now when the consumer still working, the empty result have been return to frontend. How can I ask the thread wait until the consumer finish the elasticsearch return all data? Thanks.

public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to)
  throws Exception {
List<ELKModel> result = new ArrayList<ELKModel>();
List<Long> total = new ArrayList<>();
List<Long> currentSize = new ArrayList<>();
List<String> scrollId = new ArrayList<>();

NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
sourceBuilder.withQuery(
    QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
NativeSearchQuery query = sourceBuilder.build();
 elasticsearchSupport
    .scrollStart(query, ELKModel.class)
    .map(ELKModelWrapper::valueFrom).
        subscribe(
        wrapper -> {
          total.add(wrapper.getTotal());l
          currentSize.add(wrapper.getCurrentSize());
          result.addAll(wrapper.getResults());
          scrollId.add(wrapper.getScrollId());
        }).dispose();

while (currentSize.size() == 1 && total.size() == 1 && currentSize.get(0) < total.get(0)) {
    elasticsearchSupport
      .scrollContinue(scrollId.get(0), ELKModel.class)
      .map(ELKModelWrapper::valueFrom)
      .subscribe(
          wrapper -> {
            currentSize.add(0, currentSize.get(0) + wrapper.getCurrentSize());
            result.addAll(wrapper.getResults());
            scrollId.add(0, wrapper.getScrollId());
          }).dispose();
          
}

return Flux.fromIterable(result);

}


Solution

  • You must use a pretty outdated version of Spring Data Elasticsearch. SpEL support for index names in the @Document annotation was introduced 4 years ago, you can see how this can be used in my post at https://www.sothawo.com/2020/07/how-to-provide-a-dynamic-index-name-in-spring-data-elasticsearch-using-spel/.

    As for the reactive thing: you must never block a thread in reactive code. And the reactive code does this scrolling under the hood, I don't see why you want to do this by yourself.