Search code examples
apache-kafkaksqldb

KSQL and how to use kafka-console-consumer to get data from a KTable


I have a KTable ("person_table") that is correctly updated as new JSON data is produced.

When I execute a query like

select * from person_table;

The updated data is shown correctly.

For example, if my topic ("person") initially has the following data lines

{"id": 1, "name": "john"}
{"id": 2, "name": "mary"}

and subsequently an update to one of the records occurs, that is, a new update event is sent to the topic, the topic appears correctly as:

{"id": 1, "name": "john"}
{"id": 2, "name": "mary"}
{"id": 2, "name": "mary anne"}

The table correctly shows:

select * from person_table;

-----------------
id name
-----------------
1 John
2 Mary Anne

The problem is with the use of the consumer "kafka-console-consumer". This consumer always shows the total content of the topic, and I would like it to show the same information as the KTable, that is, for the consumer to show only the most up-to-date data in the console.

Could anyone point out where I'm going wrong?

The command to run the consumer is as follows:

docker-compose exec kafka kafka-console-consumer --topic person_with_id --from-beginning --bootstrap-server kafka:9092

The command to create the Ktable is as follows:

create table person_table with(kafka_topic='person_with_id') 
as 
select id, latest_by_offset(name) as name
from person_key_stream 
group by id 
emit changes;

The command to create the stream is as follows:

create stream person_stream(id bigint, name varchar)
  with(kafka_topic='person', value_format='JSON');

create stream person_key_stream
  as select * from person_stream partition by id;

Solution

  • It's not possible to query a KTable from kafka builtin tooling. Using any consumer will always scan the entire topic.

    You'll instead need to use ksql REST API to query its HTTP server

    This is explained in more detail in the Kafka documentation under "Interactive Queries" section of the Kafka docs