Search code examples
apache-kafkaapache-kafka-connectksqldb

ksql - CREATE TABLE results in table with null values even though kafka topic is populated


Using ksqlDB, I have created a JDBC connector with a custom query. Then, from the resulting kafka topic, I have created a table. However, selecting from the table returns data only for the PRIMARY KEY, while returning null for all other values. The postgres database that I am connecting to has its sales table constantly updated with new data, which I am trying to stream using ksql.

ksql> CREATE SOURCE CONNECTOR con WITH (
  'connector.class'      ='io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url'       = '....',
  'topic.prefix'         = 'sales',
  ...
  'key'                  = 'id',
  'query'                = 'SELECT id, time, price FROM sales');

Message

Created connector CON

ksql> print sales limit 1;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/11/30 09:07:55.109 Z, key: [123], value: {"schema":{"type":"struct","fields":[{"type":"string","optional":alse,"field":"id"},{"type":"int64","optional":true,"field":"time"},{"type":"float","optional":true,"field":"price"}],"optional":false},"payload":{"id":"123","time":1,"price":10.0}}
Topic printing ceased

ksql> CREATE TABLE sales_table (id VARCHAR PRIMARY KEY, time INT, price DOUBLE) WITH (kafka_topic='sales', partitions=1, value_format='JSON');

Message

Table created

ksql> SELECT * FROM sales_table EMIT CHANGES LIMIT 1;
+-----+-----+-----+
|ID   |TIME |PRICE|
+-----+-----+-----+
|123  |null |null |
Limit Reached
Query terminated

As you can see, the kafka topic has entries with proper values in the time and price fields. However, when a table is created over that topic, selecting from the table yields null time and price fields. Only the id (which is the PRIMARY KEY column) is printed correctly.

Any idea why this is happening?


Solution

  • You're using the org.apache.kafka.connect.json.JsonConverter converter in your connector with schemas.enable=true, so your schema is not (id VARCHAR PRIMARY KEY, time INT, price DOUBLE), and thus you get NULL values.

    Better is to use io.confluent.connect.avro.AvroConverter (or Protobuf, or JSON Schema) in your source connector, because then you don't even have to type in the schema for your CREATE STREAM, you just have

    CREATE TABLE sales_table  WITH (kafka_topic='sales', value_format='AVRO');
    

    You specify the alternative converter thus:

    CREATE SOURCE CONNECTOR SOURCE_01 WITH (
    …
        'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
        'value.converter'= 'io.confluent.connect.avro.AvroConverter',
        'value.converter.schema.registry.url'= 'http://schema-registry:8081'
        );
    

    But if you must use JSON, in your source connector disable schemas:

    CREATE SOURCE CONNECTOR SOURCE_01 WITH (
    …
        'value.converter.schemas.enable'= 'false'
        );
    

    Ref: https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained