Search code examples
elasticsearchapache-kafka-connect

Elasticsearch doesn't except json array string provided by Kafka connector


I'm using the elasticsearch sink connector to feed elasticsearch from kafka topic messages. My topic includes json array and when I post the request I'm getting that error as below. Other connectors which don't content array values are running.

elasticsearch: 7.17.13

kafka: 2.6.0

elasticsearch library file : https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch

My connector config;

curl --location --request POST 'localhost:8084/connectors' --header 'Content-Type: application/json' --data-raw '
{
  "name": "TEST",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "TEST",
    "key.ignore": "true",
    "schema.ignore": "true",
    "connection.compression": "true",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "connection.url": "http://localhost:9200",
    "connection.username" :"elastic",
    "connection.password":"password",
    "type.name": "_doc",
    "name": "TEST"}
  }
}'

Exception:

    org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed
    at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:443)
    at org.elasticsearch.action.bulk.BulkRequestHandler$1.onFailure(BulkRequestHandler.java:64)
    at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
    at org.elasticsearch.action.ActionListener$RunAfterActionListener.onFailure(ActionListener.java:350)
    at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
    at org.elasticsearch.action.bulk.Retry$RetryHandler.onFailure(Retry.java:123)
    at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:216)
    ... 5 more
    Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to execute bulk request due to 'org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes' after 6 attempt(s)
        at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:165)
        at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:119)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:490)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:210)
        ... 5 more
    Caused by: org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes
        at org.elasticsearch.common.compress.CompressorFactory.compressor(CompressorFactory.java:42)
        at org.elasticsearch.common.xcontent.XContentHelper.createParser(XContentHelper.java:76)
        at org.elasticsearch.client.RequestConverters.bulk(RequestConverters.java:226)
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2167)
        at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
        at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
        at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:620)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:212)
        at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:158)

Solution

  • I changed the "value.converter" parameter in the config file as follows and it worked.

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",