Trying to implement the logic for Zero-downtime
reindexing as advised on: https://docs.jboss.org/hibernate/search/6.2/reference/en-US/html_single/#backend-elasticsearch-indexlayout-strategy-simple.
Dependency details:
Below is the pseudo code that we have implemented. Everything executes functionally.
However need some suggestions in implementing on how to initiate MassIndexer
(method processIndex
). The approach we have opted is executing MassIndexer
one per each entity.
Tried to use massindexer as a class level variable field but it fails the application start.
Thanks in advance!!
Here is the pseudo code for MassIndexServiceImpl.java
:
import MassIndexService;
import org.apache.commons.****;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.hibernate.search.backend.elasticsearch.***;
import org.hibernate.search.mapper.orm.***;
import org.hibernate.search.mapper.pojo.massindexing.***;
import org.slf4j.*;
import org.springframework.****;
import java.*****;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.IndexState;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import jakarta.persistence.EntityManager;
@Service
@Transactional
public class MassIndexServiceImpl implements MassIndexService {
private final Logger log = LoggerFactory.getLogger(MassIndexServiceImpl.class);
private final EntityManager entityManager;
private final MassIndexingMonitor monitor;
private final ElasticsearchClient client;
private final SearchSession searchSession;
public MassIndexServiceImpl(EntityManager entityManager) {
this.entityManager = entityManager;
// Create the low-level client
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
// And create the API client
client = new ElasticsearchClient(transport);
monitor = new PojoMassIndexingLoggingMonitor(1000);
searchSession = Search.session(entityManager);
}
/**
* Zero-Downtime Implementation Method for MassIndexing All Entities in the running instance
*
* @param
* @return void.
*/
@Async
@Override
public void reindexAliases() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
SearchMapping mapping = Search.mapping(entityManager.getEntityManagerFactory());
AtomicInteger successful = new AtomicInteger();
AtomicInteger failures = new AtomicInteger();
mapping
.allIndexedEntities()
.stream()
.forEach(entity -> {
processIndex(successful, failures, entity);
});
stopWatch.stop();
log.error(
"Parallel reIndexing complete - in {} with {} successful and {} failed indexes of the Total: {}",
stopWatch.formatTime(),
successful.get(),
failures.get(),
mapping.allIndexedEntities().size()
);
}
/**
* Zero-Downtime Implementation Method for MassIndexing Selected few Entities in the running instance
*
* @param types
* @return void.
*/
@Async
@Override
public void reindexAliasList(List<String> types) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
SearchMapping mapping = Search.mapping(entityManager.getEntityManagerFactory());
AtomicInteger successful = new AtomicInteger();
AtomicInteger failures = new AtomicInteger();
mapping
.allIndexedEntities()
.stream()
.filter(sEntity -> types.stream().anyMatch(e -> e.equals(sEntity.jpaName())))
.forEach(entity -> {
processIndex(successful, failures, entity);
});
stopWatch.stop();
log.error(
"Parallel reIndexing complete - in {} with {} successful and {} failed indexes of the Total: {}",
stopWatch.formatTime(),
successful.get(),
failures.get(),
mapping.allIndexedEntities().size()
);
}
/**
* Zero-Downtime Implementation Utility for MassIndexing in the running instance
*
* @param successful
* @param failures
* @param entity
* @return void.
*/
public void processIndex(AtomicInteger successful, AtomicInteger failures, SearchIndexedEntity<?> entity) {
/**
* Get the Alias names from the entity descriptor {@Link: ElasticsearchIndexDescriptor}
*/
ElasticsearchIndexDescriptor descriptor = entity.indexManager().unwrap(ElasticsearchIndexManager.class).descriptor();
String readName = descriptor.readName();
String writeName = descriptor.writeName();
try {
/**
* Get the Original index name from Elastic search client {@link: co.elastic.clients.elasticsearch.ElasticsearchClient}
*/
Optional<String> iName = MapUtils
.emptyIfNull(client.indices().getAlias(b -> b.name(writeName)).result())
.entrySet()
.stream()
.findAny()
.map(entry -> entry.getKey());
iName.ifPresent(origIndexName -> {
try {
String newIndexName = StringUtils.substringBefore(origIndexName, "-") + "-" + Instant.now().getEpochSecond();
IndexState indexState = client.indices().get(g -> g.index(origIndexName)).get(origIndexName);
/**
* Remove the alias association for write index to present index
*/
client
.indices()
.updateAliases(ub -> ub.actions(ab -> ab.remove(rb -> rb.index(origIndexName).alias(writeName))))
.acknowledged();
/**
* Create new Index by copying all settings, mappings and aliases to new Index
*/
client
.indices()
.create(ci ->
ci
.index(newIndexName)
.mappings(m -> m.allField(indexState.mappings().allField()))
.mappings(indexState.mappings())
.settings(sb ->
sb
.analysis(indexState.settings().index().analysis())
.mapping(m ->
m
.totalFields(t -> t.limit(200000))
.depth(t -> t.limit(20))
.nestedFields(t -> t.limit(200000))
.nestedObjects(t -> t.limit(20000))
)
)
.aliases(indexState.aliases())
)
.index();
log.debug("indexing for: {}", entity.javaClass());
/**
* Initiate massIndexing
*/
MassIndexer massIndexer = searchSession.massIndexer(entity.javaClass());
massIndexer
.monitor(monitor)
.batchSizeToLoadObjects(500)
.threadsToLoadObjects(2)
.typesToIndexInParallel(1)
.start()
.thenRun(() -> {
try {
/**
* 1. Remove the alias association for read index to present index
* 2. Delete the original Index
*/
client
.indices()
.updateAliases(ub -> ub.actions(ab -> ab.remove(rb -> rb.index(origIndexName).alias(readName))))
.acknowledged();
client.indices().delete(d -> d.index(List.of(origIndexName))).acknowledged();
successful.addAndGet(1);
log.debug("Indexing successful for: {}", entity.javaClass().getName());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.exceptionally(throwable -> {
failures.addAndGet(1);
try {
/**
* 1. Delete the new index
* 2. Create alias association back for write alias to the original Index
*/
client.indices().delete(d -> d.index(List.of(newIndexName))).acknowledged();
client
.indices()
.updateAliases(ub -> ub.actions(ab -> ab.add(rb -> rb.index(origIndexName).alias(writeName))))
.acknowledged();
} catch (IOException e) {
throw new RuntimeException(e);
}
log.error("Mass indexing failed!: {} {}", entity.javaClass().getName(), throwable);
return null;
});
} catch (IOException e) {
log.error("Mass indexing attempt failed!: {} {}", entity.javaClass().getName(), e);
throw new RuntimeException(e);
}
});
} catch (IOException e) {
log.error("Mass indexing attempt failed!: {} {}", entity.javaClass().getName(), e);
throw new RuntimeException(e);
}
}
}
This will effectively reindex all your types in parallel, in the background, since MassIndexer#start
does not block your thread. You're starting as many mass indexers in parallel as you have types, so yes this will lead to heavily parallel reindexing, regardless of of typesToIndexInParallel()
.
If you want to index one type at a time, use MassIndexer#startAndWait()
.
If you want to index N
types at a time, in parallel, then split your code differently:
.massIndexer(Object.class)
) with typesToIndexInParallel(N)