I write a simple test validate duplicates are not exist, like this:
@Test
public void testSameDataNotPushedTwice() throws Exception {
// Do some logic
// index contains es index name
// adding this line fail the test
// deleteOldData(esPersistence.getESClient(), index);
esPersistence.insert(cdrData);
esPersistence.insert(cdrData);
SearchResponse searchResponse = getDataFromElastic(esPersistence.getESClient(), index);
assertThat(searchResponse.getHits().getHits().length).isEqualTo(1);
}
As you can see I push data to ES and check hits length equals 1.
Test is passed when the delete line is in commnet.
Now, I want to make sure there is no data from others tests, therefore I want to delete the index before the insert. The delete
method works but search response return 0 hits after the insert.
The delete index method:
public static void deleteOldData(RestHighLevelClient client, String index) throws IOException {
GetIndexRequest request = new GetIndexRequest(index);
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
if (exists) {
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(index);
client.indices().delete(deleteRequest, RequestOptions.DEFAULT);
}
}
Highlights:
Bottom line: How can I perform delete index --> insert --> search and found the documents?
Add insert to ES and GetSettingsRequest:
deleteOldData(esPersistence.getESClient(), index);
esPersistence.insert(testData);
GetSettingsRequest request = new GetSettingsRequest().indices(index);
GetSettingsResponse getSettingsResponse = esPersistence.getESClient().indices().getSettings(request, RequestOptions.DEFAULT);
esPersistence.insert(testData);
Insert methods:
public boolean insert(List<ProjectData> projDataList) {
// Relevant Lines
BulkRequest bulkRequest = prepareBulkRequests(projDataList, esConfiguration.getCdrDataIndexName());
insertBulk(bulkRequest)
}
private BulkRequest prepareBulkRequests(List<ProjectData> data, String indexName) {
BulkRequest bulkRequest = new BulkRequest();
for (ProjectData ProjectData : data) {
String json = jsonParser.parsePojo(ProjectData);
bulkRequest.add(new IndexRequest(indexName)
.id(ProjectData.getId())
.source(json, XContentType.JSON));
}
return bulkRequest;
}
private boolean insertBulk(BulkRequest bulkRequest) {
try {
BulkResponse bulkResponse = rhlClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
logger.error(buildCustomBulkFailedMessage(bulkResponse));
return false;
}
} catch (IOException e) {
logger.warn("Failed to insert csv fields. Error: {}", e.getMessage());
return false;
}
return true;
}
With a Speical thanks to David Pilato (from ES fourm) - need to refresh the index after the insert operation, like this:
client.indices().refresh(new RefreshRequest(index), RequestOptions.DEFAULT);
link.