Search code examples
sql-serverapache-kafkaconfluent-platformconnectordebezium

debezium sqlserver connector outputs encoded values for numeric/decimal fields


I have this table in SQL Server:

CREATE TABLE [dbo].[blast_info](
    [blast_id] [int] NOT NULL,
    [tnt_amount_kg] [decimal](18, 2) NOT NULL,
    [time_blasted] [datetime] NOT NULL,
    [hole_deep_ft] [numeric](9, 2) NULL,
    [hole_coord_n] [numeric](18, 6) NOT NULL,
    [hole_coord_e] [numeric](18, 6) NULL
) ON [PRIMARY]

debezium is configured to run as plugin from confluent. Data is posted into Kafka but when I read it either via python or console-consumer I see encoded values for numeric and decimal types:

{"blast_id":17,"tnt_amount_kg":"eA==","time_blasted":1585803600000,"hole_deep_ft":"AOY=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437125}
{"blast_id":16,"tnt_amount_kg":"ANw=","time_blasted":1583125200000,"hole_deep_ft":"Aa4=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437125}
{"blast_id":17,"tnt_amount_kg":"eA==","time_blasted":1585803600000,"hole_deep_ft":"AOY=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437126}
Processed a total of 38 messages

Why is this and what's would be the fix? Thanks.


Solution

  • Possible Quickfix (Not at all recommended):

    Just use real datatype in SQL Server instead of numeric or decimal, as Debezium will store real as float.

    Longtime Fix:

    As mentioned in Debezium SQL server Connector Documentation, it stores decimal and numeric values as binary that is represented by class org.apache.kafka.connect.data.Decimal.

    You can retrieve this information from message itself but for that you need to enable schema in messages. You can do this by setting key.converter.schemas.enable=true (for message key) and value.converter.schemas.enable=true (for message value).

    After changing above properties your message will have schema information. Refer to this example:

    Table Schema:

    CREATE TABLE [dbo].[kafka_datatype](
        [id] [int] IDENTITY(1,1)  PRIMARY KEY,
        [col_value] [varchar](10) NULL,
        [create_date] [datetime] NULL,
        [col_decimal] [decimal](33, 18) NULL,
        [col_double] [real] NULL,
        [comments] [varchar](5000) NULL
    ) 
    

    Kafka Message:

    {
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "col_value"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "create_date"
          },
          {
            "type": "bytes",
            "optional": true,
            "name": "org.apache.kafka.connect.data.Decimal",
            "version": 1,
            "parameters": {
              "scale": "18",
              "connect.decimal.precision": "33"
            },
            "field": "col_decimal"
          },
          {
            "type": "float",
            "optional": true,
            "field": "col_double"
          },
          {
            "type": "string",
            "optional": true,
            "field": "comments"
          }
        ],
        "optional": true,
        "name": "TEST.dbo.kafka_datatype.Value"
      },
      "payload": {
        "id": 15,
        "col_value": "test",
        "create_date": 1586335960297,
        "col_decimal": "AKg/JYrONaAA",
        "col_double": 12.12345,
        "comments": null
      }
    }
    

    Please read Debezium SQL server Connector Documentation to know how Debezium handles datatypes.

    Now coming to consumer part, use sink connectors (JDBC Sink Connector for example) as per your need. If you want to use python or console consumer, you'll need to write your own deserializer.

    P.S.:

    One issue that may arise over the time, topic size will increase as schema is stored is every message. To avoid persisting schema into message, you can use Avro Converter and Schema Registry provided by Confluent.

    Hope this helps!