Using ksqlDB, I have created a JDBC connector with a custom query. Then, from the resulting kafka topic, I have created a table. However, selecting from the table returns data only for the PRIMARY KEY, while returning null for all other values. The postgres database that I am connecting to has its sales table constantly updated with new data, which I am trying to stream using ksql.
ksql> CREATE SOURCE CONNECTOR con WITH (
'connector.class' ='io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = '....',
'topic.prefix' = 'sales',
...
'key' = 'id',
'query' = 'SELECT id, time, price FROM sales');
Message
Created connector CON
ksql> print sales limit 1;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/11/30 09:07:55.109 Z, key: [123], value: {"schema":{"type":"struct","fields":[{"type":"string","optional":alse,"field":"id"},{"type":"int64","optional":true,"field":"time"},{"type":"float","optional":true,"field":"price"}],"optional":false},"payload":{"id":"123","time":1,"price":10.0}}
Topic printing ceased
ksql> CREATE TABLE sales_table (id VARCHAR PRIMARY KEY, time INT, price DOUBLE) WITH (kafka_topic='sales', partitions=1, value_format='JSON');
Message
Table created
ksql> SELECT * FROM sales_table EMIT CHANGES LIMIT 1;
+-----+-----+-----+
|ID |TIME |PRICE|
+-----+-----+-----+
|123 |null |null |
Limit Reached
Query terminated
As you can see, the kafka topic has entries with proper values in the time and price fields. However, when a table is created over that topic, selecting from the table yields null time and price fields. Only the id (which is the PRIMARY KEY column) is printed correctly.
Any idea why this is happening?
You're using the org.apache.kafka.connect.json.JsonConverter
converter in your connector with schemas.enable=true
, so your schema is not (id VARCHAR PRIMARY KEY, time INT, price DOUBLE)
, and thus you get NULL values.
Better is to use io.confluent.connect.avro.AvroConverter
(or Protobuf, or JSON Schema) in your source connector, because then you don't even have to type in the schema for your CREATE STREAM
, you just have
CREATE TABLE sales_table WITH (kafka_topic='sales', value_format='AVRO');
You specify the alternative converter thus:
CREATE SOURCE CONNECTOR SOURCE_01 WITH (
…
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);
But if you must use JSON, in your source connector disable schemas:
CREATE SOURCE CONNECTOR SOURCE_01 WITH (
…
'value.converter.schemas.enable'= 'false'
);
Ref: https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained