Search code examples
apache-kafkakafka-consumer-apiapache-kafka-connectdebezium

issue with kafka source connector


I will appreciate it if someone can help me figure out why my kafka consumer is displaying its schema "fields" as shown below, instead of to display the data type and the column names as should be.

confirmuserreg    |   schema: {
confirmuserreg    |     type: 'struct',
confirmuserreg    |     fields: [ [Object], [Object], [Object], [Object], [Object], [Object] ],
confirmuserreg    |     optional: false,
confirmuserreg    |     name: 'smartdevdbserver1.signup_db.users.Envelope'
confirmuserreg    |   },
confirmuserreg    |   payload: {
confirmuserreg    |     before: null,
confirmuserreg    |     after: {
confirmuserreg    |       id: 44,
confirmuserreg    |       email: '[email protected]',
confirmuserreg    |       password: '$2a$10$lJ5ILqdiJMXoJhHBOLmFeOAF3gppc9ZNgPrzTRnzDU18kX4lxu19C',
confirmuserreg    |       User_status: 'INACTIVE',
confirmuserreg    |       auth_token: null
confirmuserreg    |     },
confirmuserreg    |     source: {
confirmuserreg    |       version: '1.9.5.Final',
confirmuserreg    |       connector: 'mysql',
confirmuserreg    |       name: 'smartdevdbserver1',
confirmuserreg    |       ts_ms: 1666790831000,
confirmuserreg    |       snapshot: 'false',
confirmuserreg    |       db: 'signup_db',
confirmuserreg    |       sequence: null,
confirmuserreg    |       table: 'users',
confirmuserreg    |       server_id: 1,
confirmuserreg    |       gtid: '4e390d46-53b4-11ed-b7c4-0242ac140003:33',
confirmuserreg    |       file: 'binlog.000008',
confirmuserreg    |       pos: 487,
confirmuserreg    |       row: 0,
confirmuserreg    |       thread: 41,
confirmuserreg    |       query: null
confirmuserreg    |     },
confirmuserreg    |     op: 'c',
confirmuserreg    |     ts_ms: 1666790832054,
confirmuserreg    |     transaction: null
confirmuserreg    |   }

It should be something like this instead:

{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "deepak","company": "BT"}}

This is my connector config:

{
  "name": "smartdevsignupconnector112",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",  
    "database.hostname": "mysql1",  
    "database.port": "3306",
    "database.user": "clusterAdmin",
    "database.password": "xxxxxxxxxx",
    "database.server.id": "184055",  
    "database.server.name": "smartdevdbserver1",  
    "database.include.list": "signup_db",
    "database.history.kafka.bootstrap.servers": "kafka1:9092",  
    "database.history.kafka.topic": "dbhistory.smartdevdbserver1",
    "include.schema.changes": "true",
    "table.whitelist": "signup_db.users",
    "column.blacklist": "signup_db.users.fullName, signup_db.users.address, signup_db.users.phoneNo, signup_db.users.gender, signup_db.users.userRole, signup_db.users.reason_for_inactive, signup_db.users.firstvisit, signup_db.users.last_changed_PW, signup_db.users.regDate",
    "snapshot.mode": "when_needed"
  }
}

I expect record from 5 columns (email, password, User_status, auth_token, including the primary key) to be displayed and below is the table schema:

DROP TABLE IF EXISTS `users`;
CREATE TABLE IF NOT EXISTS `users` (
  `id` int NOT NULL AUTO_INCREMENT,
  `email` varchar(255) NOT NULL,
  `password` varchar(255) NOT NULL,
  `fullName` varchar(66),
  `address` varchar(77),
  `phoneNo` varchar(16),
  `gender` varchar(6),
  `userRole` enum('visitor','student','Admin') NOT NULL DEFAULT 'visitor',
  `User_status` enum('ACTIVE','INACTIVE') NOT NULL DEFAULT 'INACTIVE',
  `reason_for_inactive` enum('visitor','TERMINATED','SUSPENDED_FOR_VIOLATION') NOT NULL DEFAULT 'visitor',
  `firstvisit` varchar(3) DEFAULT NULL,
  `last_changed_PW` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `regDate` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `auth_token` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY (`email`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Solution

  • should be something like this instead

    Debezium has its own event format, so no, it shouldn't look like that.


    Seems your confirmuserreg service is a Javascript application, and you are simply having [Object] as the default console.log() output for deeply nested JS objects.

    If you don't care about the Debezium metdata, then flatten the event. That way, KSQL will read the payload.after field and be able to extract the fields within.