What I would like to achieve:
What I have tried:
Using this code:
public class BulkInsertOfFluxUsingWebClientBulkRestApi {
public static void main(String[] args) throws InterruptedException {
WebClient client = WebClient.create("").mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().wiretap(true))).build();
Flux<String> createCommandFlux = Flux.interval(Duration.ofMillis(100))
.map(i -> {
try {
Foo onePojo = new Foo(LocalDateTime.now().toString(), String.valueOf(i));
String jsonStringOfOnePojo = new ObjectMapper().writeValueAsString(onePojo);
String bulkCreateCommande = "{ \"create\" : {} }\n" + jsonStringOfOnePojo + "\n";
return bulkCreateCommande;
} catch (Exception e) {
return "";
Disposable disposable = createCommandFlux
.flatMap(windowFlux -> client
.body(windowFlux, Foo.class)
.doOnNext(response -> System.out.println(response))
.flatMap(clientResponse -> clientResponse.bodyToMono(String.class)))
Unfortunately, this code is currently returning http 400 bad request.
WebClient elasticWebClient() {
return WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
@Autowired ObjectMapper mapper;
Mono<Map> write() throws IOException {
Function<Object, String> mapToString = new Function<Object, String>() {
public String apply(Object o) {
try {
return mapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
final Map<String, Map<String, String>> headingJson = Map.of("index", Map.of("_index", "new_index"));
final Map<String, String> value1 = Map.of("name", "name1", "description", "description1");
final Map<String, String> value2 = Map.of("name", "name2", "description", "description2");
final Map<String, String> value3 = Map.of("name", "name3", "description", "description3");
final Map<String, String> value4 = Map.of("name", "name4", "description", "description4");
final Map<String, String> value5 = Map.of("name", "name5", "description", "description5");
final Map<String, String> value6 = Map.of("name", "name6", "description", "description6");
final String requestBody = Stream.of(value1, value2, value3, value4, value5, value6)
.flatMap(val -> Stream.of(headingJson, val))
.collect(Collectors.joining("\n", "", "\n"));
return client
.uri(u -> u.path("_bulk").build())
.exchangeToMono(clientResponse -> clientResponse.toEntity(Map.class))