Search code examples
elasticsearchapache-kafkaapache-kafka-connect

How can I change index name that created by kafka-connect with connect config


I want to change name of index that has been already created by kafka-connect as a whatever I want.

Here my kafka connector configuration ;

curl --location --request POST 'localhost:8084/connectors' --header 'Content-Type: application/json' --data-raw '
{
 "name": "PROXY_HTTP_TRACE",
   "config": {
     "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
     "tasks.max": "1",
     "key.ignore": "true",
     "schema.ignore": "true",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "key.converter.schemas.enable": "false",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "value.converter.schemas.enable": "false",
     "connection.url": "http://localhost:9200",
     "connection.username" :"elastic",
     "connection.password":"password",
     "type.name": "_doc",
     "name": "PROXY_HTTP_TRACE",
     "topics": "PROXY_HTTP_TRACE",
     "transforms":"dropPrefix,routeTS",
     "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
     "transforms.dropPrefix.regex":"(.*)",
     "transforms.dropPrefix.replacement":"$1",
     "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
     "transforms.routeTS.topic.format":"kafka-${topic}-${timestamp}",
     "transforms.routeTS.timestamp.format":"YYYYMM"
}
}'

Solution

  • If you can't change the connector configuration you can use elasticsearch ingest pipeline to manipulate the incoming data.

    #Create an ingest pipeline
    PUT _ingest/pipeline/update_index_name
    {
      "processors": [
        {
          "set": {
            "field": "_index",
            "value": "new_index_name"
          }
        }
      ]
    }
    
    #Put default pipeline to existing index
    #make sure that your index name is proxy_http_trace. 
    PUT proxy_http_trace/_settings
    {
      "settings": {
        "index.default_pipeline": "update_index_name"
      }
    }
    
    #Push the data
    POST proxy_http_trace/_doc
    {
      "field": "test"
    }
    

    enter image description here