I have a KTable ("person_table") that is correctly updated as new JSON data is produced.
When I execute a query like
select * from person_table;
The updated data is shown correctly.
For example, if my topic ("person") initially has the following data lines
{"id": 1, "name": "john"}
{"id": 2, "name": "mary"}
and subsequently an update to one of the records occurs, that is, a new update event is sent to the topic, the topic appears correctly as:
{"id": 1, "name": "john"}
{"id": 2, "name": "mary"}
{"id": 2, "name": "mary anne"}
The table correctly shows:
select * from person_table;
-----------------
id name
-----------------
1 John
2 Mary Anne
The problem is with the use of the consumer "kafka-console-consumer". This consumer always shows the total content of the topic, and I would like it to show the same information as the KTable, that is, for the consumer to show only the most up-to-date data in the console.
Could anyone point out where I'm going wrong?
The command to run the consumer is as follows:
docker-compose exec kafka kafka-console-consumer --topic person_with_id --from-beginning --bootstrap-server kafka:9092
The command to create the Ktable is as follows:
create table person_table with(kafka_topic='person_with_id')
as
select id, latest_by_offset(name) as name
from person_key_stream
group by id
emit changes;
The command to create the stream is as follows:
create stream person_stream(id bigint, name varchar)
with(kafka_topic='person', value_format='JSON');
create stream person_key_stream
as select * from person_stream partition by id;
It's not possible to query a KTable from kafka builtin tooling. Using any consumer will always scan the entire topic.
You'll instead need to use ksql REST API to query its HTTP server
This is explained in more detail in the Kafka documentation under "Interactive Queries" section of the Kafka docs