In my local Confluent Platform, I have 1 topic call "FOO_02", I have manually insert some records to it, thus, I can print it from beginning base on the following command:
print 'FOO_02' from beginning;
Can I do something like, I only want to pull the record where COL1 = 1? Something like we can execute a select statement with where condition to pull the data from normal database like db2
.
I have try the following command but I believe it will only get the new data, because I am getting empty record for this command:
ksql> select * from FOO_02 WHERE COL1=1 EMIT CHANGES;
I'm presuming that you've already done
CREATE STREAM FOO_02 WITH (KAFKA_TOPIC='FOO_02', FORMAT='AVRO');
because otherwise your SELECT
would have failed.
So since the PRINT
shows successfully that there is data in the topic you can query the stream using the predicate you want. The only thing you need to do is tell ksqlDB to process all the data in the topic and not just the new records (which is what from beginning
does in the PRINT
statement). To do this, run:
SET 'auto.offset.reset' = 'earliest';
and then run the SELECT
.
Edit
Can we select the only latest records? For example, I have multiple data being push to topic for COL1=0, but I only want to grab the latest one because its the newest data and it is the only correct one. something like where rowtime = max?
What you're describing is a TABLE
: the latest value for a given key.
CREATE TABLE FOO AS
SELECT COL1,
LATEST_BY_OFFSET(COL2) AS COL2
FROM FOO_02
WHERE COL1=0
GROUP BY COL1;
The resulting table will have a single entry for COL1
, with the latest value of COL2
as new messages are received.
Any way to bring the old data in the Stream into the table as well?
To process existing data too set the offset back to earliest before running the CREATE
statement
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE FOO AS
[…]