What I would like to achieve:
/_bulk
api.https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
What I have tried:
Using this code:
public class BulkInsertOfFluxUsingWebClientBulkRestApi {
public static void main(String[] args) throws InterruptedException {
WebClient client = WebClient.create("http://127.0.0.1:9200/").mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().wiretap(true))).build();
Flux<String> createCommandFlux = Flux.interval(Duration.ofMillis(100))
.map(i -> {
try {
Foo onePojo = new Foo(LocalDateTime.now().toString(), String.valueOf(i));
String jsonStringOfOnePojo = new ObjectMapper().writeValueAsString(onePojo);
String bulkCreateCommande = "{ \"create\" : {} }\n" + jsonStringOfOnePojo + "\n";
return bulkCreateCommande;
} catch (Exception e) {
e.printStackTrace();
return "";
}
});
Disposable disposable = createCommandFlux
.window(100)
.flatMap(windowFlux -> client
.post()
.uri("my_index/_bulk")
.contentType(MediaType.APPLICATION_NDJSON)
.body(windowFlux, Foo.class)
.exchange()
.doOnNext(response -> System.out.println(response))
.flatMap(clientResponse -> clientResponse.bodyToMono(String.class)))
.subscribe();
Thread.sleep(1000000);
disposable.dispose();
}
Note:
/_bulk
api.Issue:
Unfortunately, this code is currently returning http 400 bad request.
@Bean
WebClient elasticWebClient() {
return WebClient.builder()
.baseUrl("http://localhost:9200")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
@Autowired ObjectMapper mapper;
@GetMapping("write")
Mono<Map> write() throws IOException {
Function<Object, String> mapToString = new Function<Object, String>() {
@Override
public String apply(Object o) {
try {
return mapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
};
final Map<String, Map<String, String>> headingJson = Map.of("index", Map.of("_index", "new_index"));
final Map<String, String> value1 = Map.of("name", "name1", "description", "description1");
final Map<String, String> value2 = Map.of("name", "name2", "description", "description2");
final Map<String, String> value3 = Map.of("name", "name3", "description", "description3");
final Map<String, String> value4 = Map.of("name", "name4", "description", "description4");
final Map<String, String> value5 = Map.of("name", "name5", "description", "description5");
final Map<String, String> value6 = Map.of("name", "name6", "description", "description6");
final String requestBody = Stream.of(value1, value2, value3, value4, value5, value6)
.flatMap(val -> Stream.of(headingJson, val))
.map(mapToString)
.collect(Collectors.joining("\n", "", "\n"));
return client
.post()
.uri(u -> u.path("_bulk").build())
.bodyValue(requestBody)
.exchangeToMono(clientResponse -> clientResponse.toEntity(Map.class))
.map(HttpEntity::getBody);