Search code examples
apache-kafkaapache-kafka-connectksqldb

Kafka source connector encoding issue | Oracle JDBC Connector


I am trying to connect to Oracle server using Kafka connect JDBC connector, here is my connector config:

CREATE SOURCE CONNECTOR hsr_source_connector2
WITH (
  'connection.url' = 'jdbc:oracle:thin:@//HOSTIP:PORT/Service',
  'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.user' = '{User}',
  'connection.password' = '{PASS}',
  'mode' = 'bulk',
  'query' = 'select * from <Tablename>',
  'topic.prefix' = 'newuser_',
  'characterEncoding'='UTF-8',
  'transforms' = 'createKey,extractInt',
  'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
  'transforms.createKey.fields' = 'USER_ID',
  'transforms.extractInt.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
  'transforms.extractInt.field' = 'USER_ID'
);

The topic gets created and it also reads the data, but the data comes up with junk character.

rowtime: 2024/04/18 17:08:51.028 Z, key: VN185082, value: ☺►VN185082►VN185082♠FSC☻☻W
rowtime: 2024/04/18 17:08:51.028 Z, key: MD250626, value: ☺►MD250626►MD250626♠SME☻☻R

Stream Definition:

CREATE STREAM hsr_users
 (
 user_id varchar,
 user_name varchar,
 pswd varchar,
 user_role varchar,
 access_level varchar
 )WITH (kafka_topic='newuser_', value_format='JSON', partitions=1);

Can you suggest on how to apply encoding and get the right documentation for this. Referring to this document https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html

Thanks in advance!


Solution

  • Got the answer, this can be corrected by adding these properties:

    CREATE SOURCE CONNECTOR hsr_source_connector3
    WITH (
    'connection.url' = 'jdbc:oracle:thin:@//HOSTIP:PORT/Service',
      'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
      'connection.user' = '{User}',
      'connection.password' = '{PASS}',
      'mode' = 'bulk',
      'query' = 'select * from <Tablename>',
      'topic.prefix' = 'newEncode_',
      'characterEncoding' = 'AL32UTF8', -- Updated: Added charset parameter
      'transforms' = 'createKey,extractInt',
      'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
      'transforms.createKey.fields' = 'USER_ID',
      'transforms.extractInt.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
      'key.converter' = 'org.apache.kafka.connect.json.JsonConverter',
      'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
      'transforms.extractInt.field' = 'USER_ID',
      'key.converter.schemas.enable'='false',
      'value.converter.schemas.enable'='false',
      'errors.tolerance' = 'all',
      'errors.log.enable' = 'true',
      'errors.log.include.messages' = 'true'
    );
    

    This will give data in this format:

    Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 2024/04/19 11:08:19.755 Z, key: "HG185035", value: {"USER_ID":"HG185035","USER_NAME":"HG185035","PSWD":null,"USER_ROLE":"SME","ACCESS_LEVEL":"R"}
    

    Basically, I was missing out the key.convertor and value.convertor properties. I got the right properties configured from the confluent documentation: https://www.confluent.io/en-gb/blog/kafka-connect-deep-dive-converters-serialization-explained/