The description for replaceField SMT says it can Filter or rename fields within a Struct or Map.
However I can't find any working example for replacing or renaming fields within a struct.
I've got data in a topic being written into ElasticSearch using Kafka Connect Elasticsearch Sink. For simplicity, assume the format of the data looks like this.
{
'ID':22,
'ITEM': 'Shampoo'
'USER':{
'NAME': 'jon',
'AGE':25
}
}
So if I'm trying to rename/replace USER.NAME
or USER.AGE
, how would I configure that in the connector? (I've written everything in ksqldb). This is my current config where I rename ITEM
to product
and ID
to id
CREATE SINK CONNECTOR ELASTIC_SINK WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://host.docker.internal:9200',
'type.name' = '_doc',
'topics' = 'ELASTIC_TOPIC',
'key.ignore' = 'false',
'schema.ignore' = 'true',
'transforms' = 'RenameField',
'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.RenameField.renames' = 'ITEM:product,ID:id',
);
Take a look at the existing SO question and answer: https://stackoverflow.com/a/56601093/4778022
You can provide the path to the field to rename, with parts separated by periods.
CREATE SINK CONNECTOR ELASTIC_SINK WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://host.docker.internal:9200',
'type.name' = '_doc',
'topics' = 'ELASTIC_TOPIC',
'key.ignore' = 'false',
'schema.ignore' = 'true',
'transforms' = 'RenameField',
'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.RenameField.renames' = 'USER.NAME:name,ITEM:product,ID:id',
);