Search code examples
apache-kafkaapache-kafka-connect

Extracting Key and Appending It to the Message Value in Kafka Connect


I want to extract key and append it to the value by Kafka Connect. I read SMT and test some SMT but I can not do it.

I send this record value :

{"name":"ali"}

by this key :

Person

and I want this JSON store to the openserach (or elastic search)

{"name":"ali","key":"Person"}

this is my connector and I know this is wrong but I don't know how can I fix it :

{
    "name": "aaa",
    "config": {
        "name": "aaa",
        "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
        "tasks.max": "1",
        "topics": "test_x",
        "transforms": "InsertKeyToValue",
        "transforms.InsertKeyToValue.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertKeyToValue.static.field": "MessageSource",
        "transforms.InsertKeyToValue.static.value": "key",
        // "transforms.InsertKeyToValue.field": "key",
        // "transforms.InsertKeyToValue.key.field": "key",
        "key.ignore": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "key.converter.schemas.enable": "false",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connection.url": "https://localhost:9200",
        "connection.username": "test",
        "connection.password": "test",
        "batch.size": "10000",
        "linger.ms": "1000",
        "errors.tolerance": "all",
        "errors.log.include.messages": "true",
        "errors.log.enable": "true"
    }
}

Solution

  • @Akshay Bande thanks.

    I've created a simple transformation for this task.