I need to process whole collection in Elasticsearch 8, atomicity is not required, so, please do not suggest "search after".
I used scrolls before, but struggle to properly construct code for latest Java client.
var search = client.search(q -> q.index("addressbook")
.scroll(Time.of(t -> t.time("1m"))), AddressBookRecord.class);
String scrollId = search.scrollId();
ScrollResponse scroll = null;
do {
scroll = client.scroll(q -> q.scrollId(scrollId), AddressBookRecord.class);
System.out.println(scroll.hits().total().value());
System.out.println(scroll.hits().hits().size());
} while (scroll.hits().total().value() > 0L);
scroll.hits().hits()
is always empty, even on first call.
At the same time scroll.hits().hits().size()
contains right number of documents.
Also, I cannot process documents, because .hits()
is empty.
Whether you like it or not, but scrolling over large result sets is not recommended in Elastic 8, so search-after together with point-in-time is the way to go:
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch.core.ClosePointInTimeResponse;
import co.elastic.clients.elasticsearch.core.OpenPointInTimeResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@SpringBootTest
public class PagingTest {
@Autowired
private ElasticsearchClient elasticsearchClient;
@Test
public void testPointInTime() throws IOException {
// Open a point-in-time
final Time keepAlive = new Time.Builder().time("1m").build();
final OpenPointInTimeResponse pitResp = elasticsearchClient.openPointInTime(req -> req.index("books").keepAlive(keepAlive));
final String pitId = pitResp.id();
final Query query = QueryBuilders.match().field("author").query("Shakespeare").build()._toQuery();
// Run your paginated queries
String lastId = null;
for (int i = 0; i < 3; i++) {
SearchRequest.Builder searchRequest = new SearchRequest.Builder()
.pit(pit -> pit.id(pitId).keepAlive(keepAlive)) // It would be better to use the pit from the last result
.size(100)
.query(query)
.source(source -> source.filter(filter -> filter.includes("id", "title")))
.sort(sort -> sort.field(field -> field.field("id"))).timeout("10s");
// Continue after last id
if (lastId != null) {
searchRequest = searchRequest.searchAfter(FieldValue.of(lastId));
}
final SearchResponse<Book> searchResponse = elasticsearchClient.search(searchRequest.build(), Book.class);
final HitsMetadata<Book> hitsMetadata = searchResponse.hits();
assertThat(hitsMetadata.hits()).isNotEmpty();
for (Hit<Book> hit : hitsMetadata.hits()) {
final Book book = hit.source();
log.info("{} | {} ", i, book);
lastId = book.getId(); // remember the last retrieved id
}
}
// Close your point-in-time to save resources
final ClosePointInTimeResponse pitCloseResp = elasticsearchClient.closePointInTime(req -> req.id(pitId));
assertThat(pitCloseResp.succeeded()).isTrue();
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
private static class Book {
private String id;
private String title;
}
}