Search code examples
javaelasticsearch

How to use scrolling in ElasticSearch with Java API Client (Elastic 8.x)


I need to process whole collection in Elasticsearch 8, atomicity is not required, so, please do not suggest "search after".

I used scrolls before, but struggle to properly construct code for latest Java client.

        var search = client.search(q -> q.index("addressbook")
                .scroll(Time.of(t -> t.time("1m"))), AddressBookRecord.class);
        String scrollId = search.scrollId();

        ScrollResponse scroll = null;

        do {

            scroll = client.scroll(q -> q.scrollId(scrollId), AddressBookRecord.class);

            System.out.println(scroll.hits().total().value());
            System.out.println(scroll.hits().hits().size());

        } while (scroll.hits().total().value() > 0L);

scroll.hits().hits() is always empty, even on first call.

At the same time scroll.hits().hits().size() contains right number of documents.

Also, I cannot process documents, because .hits() is empty.


Solution

  • Whether you like it or not, but scrolling over large result sets is not recommended in Elastic 8, so search-after together with point-in-time is the way to go:

    import static org.assertj.core.api.Assertions.assertThat;
    
    import java.io.IOException;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import co.elastic.clients.elasticsearch.ElasticsearchClient;
    import co.elastic.clients.elasticsearch._types.FieldValue;
    import co.elastic.clients.elasticsearch._types.Time;
    import co.elastic.clients.elasticsearch._types.query_dsl.Query;
    import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
    import co.elastic.clients.elasticsearch.core.ClosePointInTimeResponse;
    import co.elastic.clients.elasticsearch.core.OpenPointInTimeResponse;
    import co.elastic.clients.elasticsearch.core.SearchRequest;
    import co.elastic.clients.elasticsearch.core.SearchResponse;
    import co.elastic.clients.elasticsearch.core.search.Hit;
    import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    import lombok.NoArgsConstructor;
    import lombok.Setter;
    import lombok.ToString;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    @SpringBootTest
    public class PagingTest {
    
        @Autowired
        private ElasticsearchClient elasticsearchClient;
    
        @Test
        public void testPointInTime() throws IOException {
    
            // Open a point-in-time
            final Time keepAlive = new Time.Builder().time("1m").build();
            final OpenPointInTimeResponse pitResp = elasticsearchClient.openPointInTime(req -> req.index("books").keepAlive(keepAlive));
            final String pitId = pitResp.id();
    
            final Query query = QueryBuilders.match().field("author").query("Shakespeare").build()._toQuery();
    
            // Run your paginated queries
            String lastId = null;
            for (int i = 0; i < 3; i++) {
                SearchRequest.Builder searchRequest = new SearchRequest.Builder()
                        .pit(pit -> pit.id(pitId).keepAlive(keepAlive)) // It would be better to use the pit from the last result
                        .size(100)
                        .query(query)
                        .source(source -> source.filter(filter -> filter.includes("id", "title")))
                        .sort(sort -> sort.field(field -> field.field("id"))).timeout("10s");
    
                // Continue after last id       
                if (lastId != null) {
                    searchRequest = searchRequest.searchAfter(FieldValue.of(lastId));
                }
    
                final SearchResponse<Book> searchResponse = elasticsearchClient.search(searchRequest.build(), Book.class);
                final HitsMetadata<Book> hitsMetadata = searchResponse.hits();
                assertThat(hitsMetadata.hits()).isNotEmpty();
    
                for (Hit<Book> hit : hitsMetadata.hits()) {
                    final Book book = hit.source();
                    log.info("{} | {} ", i, book);
    
                    lastId = book.getId(); // remember the last retrieved id
                }
            }
    
            // Close your point-in-time to save resources
            final ClosePointInTimeResponse pitCloseResp = elasticsearchClient.closePointInTime(req -> req.id(pitId));
            assertThat(pitCloseResp.succeeded()).isTrue();
        }
    
        @Getter
        @Setter
        @AllArgsConstructor
        @NoArgsConstructor
        @ToString
        private static class Book {
    
            private String id;
            private String title;
    
        }
    }