I am using the Jest client for Elastic to browse an index of document to update one field. My workflow is to run an empty query with paging and look if I can compute the extra field. If I can, I update the relevant documents in one bulk update.
Pseudo-code
private void process() {
int from = 0
int size = this.properties.batchSize
boolean moreResults = true
while (moreResults) {
moreResults = handleBatch(from, this.properties.batchSize)
from += size
}
}
private boolean handleBatch(int from, int size) {
log.info("Processing records $from to " + (from + size))
def result = search(from, size)
if (result.isSucceeded()) {
// Check each element and perform an upgrade
}
// return true if the query returned at least one item
}
private SearchResult search(int from, int size) {
String query =
'{ "from": ' + from + ', ' +
'"size": ' + size + '}'
Search search = new Search.Builder(query)
.addIndex("my-index")
.addType('my-document')
.build();
jestClient.execute(search)
}
I don't have any error but when I run the batch several times, it looks like is finding "new" documents to upgrade while the total number of documents hasn't changed. I got the suspicion that an updated document was processed several times which I could confirm by checking the processed IDs.
How can I run a query so that the original documents are the ones processed and any update wouldn't interfere with it?
Instead of running a normal search (i.e. using from
+size
), you need to run a scroll
search query instead. The main difference is that the scroll will freeze a given snapshot of documents (at the time of the query) and query them. Whatever changes happen after the first scroll query, won't be considered.
Using Jest, you need to modify your code to look more like this:
// 1. Initiate the scroll request
Search search = new Search.Builder(searchSourceBuilder.toString())
.addIndex("my-index")
.addType("my-document")
.addSort(new Sort("_doc"))
.setParameter(Parameters.SIZE, size)
.setParameter(Parameters.SCROLL, "5m")
.build();
JestResult result = jestClient.execute(search);
// 2. Get the scroll_id to use in subsequent request
String scrollId = result.getJsonObject().get("_scroll_id").getAsString();
// 3. Issue scroll search requests until you have retrieved all results
boolean moreResults = true;
while (moreResults) {
SearchScroll scroll = new SearchScroll.Builder(scrollId, "5m")
.setParameter(Parameters.SIZE, size).build();
result = client.execute(scroll);
def hits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
moreResults = hits.size() > 0;
}
You need to modify your process
and handleBatch
methods with the above code. It should be straightforward, let me know if not.