Search code examples
javaelasticsearchfluxspring-webclient

Spring WebClient to send Flux to ElasticSearch using /_bulk api


What I would like to achieve:

  • Use Spring WebClient to send a Flux to ElasticSearch leveraging the /_bulk api.

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

What I have tried:

Using this code:

public class BulkInsertOfFluxUsingWebClientBulkRestApi {

    public static void main(String[] args) throws InterruptedException {
        WebClient client = WebClient.create("http://127.0.0.1:9200/").mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().wiretap(true))).build();
        Flux<String> createCommandFlux = Flux.interval(Duration.ofMillis(100))
                .map(i -> {
                    try {
                        Foo onePojo = new Foo(LocalDateTime.now().toString(), String.valueOf(i));
                        String jsonStringOfOnePojo = new ObjectMapper().writeValueAsString(onePojo);
                        String bulkCreateCommande = "{ \"create\" : {} }\n" + jsonStringOfOnePojo + "\n";
                        return bulkCreateCommande;
                    } catch (Exception e) {
                        e.printStackTrace();
                        return "";
                    }
                });
        
        Disposable disposable = createCommandFlux
                .window(100) 
                .flatMap(windowFlux -> client
                        .post()
                        .uri("my_index/_bulk")
                        .contentType(MediaType.APPLICATION_NDJSON)
                        .body(windowFlux, Foo.class)
                        .exchange()
                        .doOnNext(response -> System.out.println(response))
                        .flatMap(clientResponse -> clientResponse.bodyToMono(String.class)))
                .subscribe();
        Thread.sleep(1000000);
        disposable.dispose();
    }

Note:

  • This is just using reactive Spring WebClient, not another http client, not the ElsaticSearch java client etc.
  • It is trying to save a Flux (which can be infinite) inside ElasticSearch
  • I would like to avoid making one http request per object, therefore, "grouping" them and sending them as bulk leveraging the /_bulk api.

Issue:

Unfortunately, this code is currently returning http 400 bad request.


Solution

  • 
    
        @Bean
        WebClient elasticWebClient() {
            return WebClient.builder()
                    .baseUrl("http://localhost:9200")
                    .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                    .build();
        }
    
    
      @Autowired ObjectMapper mapper;
        @GetMapping("write")
        Mono<Map> write() throws IOException {
    
            Function<Object, String> mapToString = new Function<Object, String>() {
                @Override
                public String apply(Object o) {
                    try {
                        return mapper.writeValueAsString(o);
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
    
            final Map<String, Map<String, String>> headingJson = Map.of("index", Map.of("_index", "new_index"));
            final Map<String, String> value1 = Map.of("name", "name1", "description", "description1");
            final Map<String, String> value2 = Map.of("name", "name2", "description", "description2");
            final Map<String, String> value3 = Map.of("name", "name3", "description", "description3");
            final Map<String, String> value4 = Map.of("name", "name4", "description", "description4");
            final Map<String, String> value5 = Map.of("name", "name5", "description", "description5");
            final Map<String, String> value6 = Map.of("name", "name6", "description", "description6");
    
            final String requestBody = Stream.of(value1, value2, value3, value4, value5, value6)
                    .flatMap(val -> Stream.of(headingJson, val))
                    .map(mapToString)
                    .collect(Collectors.joining("\n", "", "\n"));
    
    
            return client
                    .post()
                    .uri(u -> u.path("_bulk").build())
                    .bodyValue(requestBody)
                    .exchangeToMono(clientResponse -> clientResponse.toEntity(Map.class))
                    .map(HttpEntity::getBody);