Search code examples
apache-kafkaapache-kafka-connectconfluent-platformksqldb

How to rename/transform nested fields in json object using ElasticsearchSinkConnector


I'm using ElasticsearchSinkConnector to store data from Kafka topics to Elasticsearch indices. This is an example of a Kafka message:

{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
 "Timestamp" : "2020-11-02T12:05:57.87639003Z",
 "Type" : "CREATION",
 "PlaceType" : "home",
 "Location" : {
        "Lat" : 43.7575119,
        "Lon" : 11.2921363
      },
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}

I would like to represent the Location object as geo_point in ES but Lat/Lon must be lowercase to be geo_point objects. I am using ReplaceField$Value to rename Location to "location" but I cannot rename the nested field, Lat/Lon. This is my snippet for renaming Location, Lat and Lon:

transforms: 'RenameField'
transforms.RenameField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames: 'Location:location,location.Lat:lat,location.Lon:lon'

Location works, but Lat/Lon doesn't. In brief, I'd like to have the following result in ES:

{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
 "Timestamp" : "2020-11-02T12:05:57.87639003Z",
 "Type" : "CREATION",
 "PlaceType" : "home",
 "location" : {
        "lat" : 43.7575119,
        "lon" : 11.2921363
      },
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}

UPDATE

Awesome, thank you very much. There was a problem creating my target stream in ksql-cli.

ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS "location"
>    FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS 'location'
>    FROM PLACES_EVENT;
line 3:64: mismatched input ''location'' expecting {'NO', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException

I tried to set the STRUCT name without quotes but ksql throws me an error like the first one.

ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS GeoPointLocation
>    FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'

Can you help me?


Solution

  • I hit this exact same problem - and I'm not aware of an existing Single Message Transform that can help. You have a couple of options:

    1. Write your own Single Message Transform to do this

    2. Use ksqlDB to wrangle the schema, which is the route I chose

       CREATE STREAM OUTPUT_STREAM AS
           SELECT *,
           STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location"
         FROM SOURCE_STREAM
         EMIT CHANGES;
      

    You will also want to create a mapping template to prepare the Elasticsearch index if you haven't already


    To expand on the ksqlDB example:

    1. Populate source topic with the sample data:

      kafkacat -b localhost:9092 -P -t input_topic <<EOF
      { "ID": "7d6203f4-3ae7-4daa-af03-71f98d619f7e", "Timestamp": "2020-11-02T12:05:57.87639003Z", "Type": "CREATION", "PlaceType": "home", "Location": { "Lat": 43.7575119, "Lon": 11.2921363 }, "Created": "2020-11-02T12:05:57.876390266Z", "LastUpdated": "2020-11-02T12:05:57.876390398Z" }
      EOF
      
    2. Taking a source topic of source, declare the ksqlDB STREAM object (which is basically Kafka topic with a schema overlaid):

      CREATE STREAM SOURCE_STREAM (ID VARCHAR,
                                  Timestamp VARCHAR,
                                  Type VARCHAR,
                                  PlaceType VARCHAR,
                                  Location STRUCT<Lat DOUBLE, Lon DOUBLE>,
                                  Created VARCHAR,
                                  LastUpdated VARCHAR)
              WITH (KAFKA_TOPIC='input_topic', 
                  VALUE_FORMAT='JSON');
      
    3. Confirm that the stream's schema is valid by selecting fields from the first message:

      ksql> SET 'auto.offset.reset' = 'earliest';
      >
      Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
      
      ksql> SELECT ID, PLACETYPE, LOCATION->LAT, LOCATION->LON FROM SOURCE_STREAM EMIT CHANGES LIMIT 1;
      +---------------------------------------+----------+-----------+-----------+
      |ID                                     |PLACETYPE |LAT        |LON        |
      +---------------------------------------+----------+-----------+-----------+
      |7d6203f4-3ae7-4daa-af03-71f98d619f7e   |home      |43.7575119 |11.2921363 |
      Limit Reached
      Query terminated
      
    4. Create a target stream, mapping the lat/lon fields to lower-case names. Here I'm also showing the alternative approach of concatenating them, which Elasticsearch will also accept:

      CREATE STREAM TARGET_STREAM WITH (KAFKA_TOPIC='target_topic') AS
          SELECT *, 
              STRUCT("lat" := LOCATION->LAT, "lon":= LOCATION->LON) AS "location_example_01",
              CAST(LOCATION->LAT AS VARCHAR)  + ',' + CAST(LOCATION->LON AS VARCHAR) AS "location_example_02"
          FROM SOURCE_STREAM;
      
    5. Create an index template for Elasticsearch if the index does not already have the geo_point mapping declared. Here it'll match any index created that begins with target

      curl --silent --show-error -XPUT -H 'Content-Type: application/json' \
          http://localhost:9200/_index_template/rmoff_template01/ \
          -d'{
              "index_patterns": [ "target*" ],
              "template": {
                  "mappings": {
                      "properties": {
                          "location_example_01": {
                              "type": "geo_point"
                          },
                          "location_example_02": {
                              "type": "geo_point"
                          }
                      }
                  }
              } }'
      
    6. Stream the data from Kafka to Elasticsearch using Kafka Connect. You can do configure this using the native Kafka Connect REST API, or do it directly from ksqlDB itself:

      CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
      'connector.class'                     = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
      'topics'                              = 'target_topic',
      'key.converter'                       = 'org.apache.kafka.connect.storage.StringConverter',
      'value.converter'                     = 'org.apache.kafka.connect.json.JsonConverter',
      'value.converter.schemas.enable'      = 'false',
      'connection.url'                      = 'http://elasticsearch:9200',
      'type.name'                           = '_doc',
      'key.ignore'                          = 'true',
      'schema.ignore'                       = 'true');
      
    7. Inspect the mappings in the new Elasticsearch index

      curl -XGET --silent --show-error http://localhost:9200"/target_topic/_mappings"  | jq '.'
      {
      "target_topic": {
          "mappings": {
          "properties": {
              "CREATED": {
              "type": "date"
              },
              "ID": {
              "type": "text",
              "fields": {
                  "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                  }
              }
              },
              "LASTUPDATED": {
              "type": "date"
              },
              "LOCATION": {
              "properties": {
                  "LAT": {
                  "type": "float"
                  },
                  "LON": {
                  "type": "float"
                  }
              }
              },
              "PLACETYPE": {
              "type": "text",
              "fields": {
                  "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                  }
              }
              },
              "TIMESTAMP": {
              "type": "date"
              },
              "TYPE": {
              "type": "text",
              "fields": {
                  "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                  }
              }
              },
              "location_example_01": {
              "type": "geo_point"
              },
              "location_example_02": {
              "type": "geo_point"
              }
          }
          }
      }
      }
      
    8. View the data

      enter image description here

      enter image description here