Search code examples
javaelasticsearchelasticsearch-client

ElasticSearch Java API Client - Send already serialized data and avoid serialization


I have a Kafka Topic wit JSON data. Now im trying to send those JSON strings to an ES topic using the new "Java API Client" (https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/index.html), but im running into a parser exception:

co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/index] failed: [mapper_parsing_exception] failed to parse
at co.elastic.clients.transport.rest_client.RestClientTransport.getHighLevelResponse(RestClientTransport.java:281)
at co.elastic.clients.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:147)
at co.elastic.clients.elasticsearch.ElasticsearchClient.index(ElasticsearchClient.java:953)

This exception occurs in the last line of the following code:

final IndexRequest<String> request =
          new IndexRequest.Builder<String>()
              .index("myIndex")
              .id(String.valueOf(UUID.randomUUID()))
              .document(consumerRecord.value()) //already serialized json data
              .build();
elasticsearchClient.index(request);

As far as I understand this exception occurs, because the ES client tries to serialize the data im providing, which is already serialized, resulting in a malformed JSON string.

Is there anyway to get around this and just send simple JSON strings? Also I believe this was possible with the earlier "Low Level Java Library", right? And yes, I know there are ways to allow communication between Kafka and ES without writing a Consumer.

Thanks for any hints.


Solution

  • If you use a JacksonJsonpMapper when creating your ElasticsearchTransport, you can use a custom PreserializedJson class to send already-serialized JSON.

    ElasticsearchTransport transport = new RestClientTransport(
        createLowLevelRestClient(), // supply your own!
        new JacksonJsonpMapper()
    );
    
    ElasticsearchClient client = new ElasticsearchClient(transport);
    
    IndexResponse response = client.index(indexReq -> indexReq
        .index("my-index")
        .id("docId")
        .document(new PreserializedJson("{\"foo\":\"bar\"}"))
    );
    System.out.println(response);
    

    Here is the source for PreserializedJson:

    import com.fasterxml.jackson.core.JsonGenerator;
    import com.fasterxml.jackson.databind.SerializerProvider;
    import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    import com.fasterxml.jackson.databind.ser.std.StdSerializer;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    
    import static java.util.Objects.requireNonNull;
    
    @JsonSerialize(using = PreserializedJson.Serializer.class)
    public class PreserializedJson {
      private final String value;
    
      public PreserializedJson(String value) {
        this.value = requireNonNull(value);
      }
    
      public PreserializedJson(byte[] value) {
        this(new String(value, StandardCharsets.UTF_8));
      }
    
      public static class Serializer extends StdSerializer<PreserializedJson> {
        public Serializer() {
          super(PreserializedJson.class);
        }
    
        @Override
        public void serialize(PreserializedJson value, JsonGenerator gen, SerializerProvider provider) throws IOException {
          gen.writeRaw(value.value);
        }
      }
    }