Search code examples
apache-kafkaapache-kafka-connectksqldb

How to rename/replace a field within a struct in Kafka-connect SMT?


The description for replaceField SMT says it can Filter or rename fields within a Struct or Map. However I can't find any working example for replacing or renaming fields within a struct.

I've got data in a topic being written into ElasticSearch using Kafka Connect Elasticsearch Sink. For simplicity, assume the format of the data looks like this.

{
  'ID':22, 
  'ITEM': 'Shampoo'
  'USER':{
    'NAME': 'jon', 
    'AGE':25
   }
}

So if I'm trying to rename/replace USER.NAME or USER.AGE, how would I configure that in the connector? (I've written everything in ksqldb). This is my current config where I rename ITEM to product and ID to id

CREATE SINK CONNECTOR ELASTIC_SINK WITH (
    'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'connection.url' = 'http://host.docker.internal:9200',
    'type.name' = '_doc',
    'topics' = 'ELASTIC_TOPIC',
    'key.ignore' = 'false',
    'schema.ignore' = 'true',
    'transforms' = 'RenameField',
    'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
    'transforms.RenameField.renames' = 'ITEM:product,ID:id',
);

Solution

  • Take a look at the existing SO question and answer: https://stackoverflow.com/a/56601093/4778022

    You can provide the path to the field to rename, with parts separated by periods.

    CREATE SINK CONNECTOR ELASTIC_SINK WITH (
        'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
        'connection.url' = 'http://host.docker.internal:9200',
        'type.name' = '_doc',
        'topics' = 'ELASTIC_TOPIC',
        'key.ignore' = 'false',
        'schema.ignore' = 'true',
        'transforms' = 'RenameField',
        'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
        'transforms.RenameField.renames' = 'USER.NAME:name,ITEM:product,ID:id',
    );