I have a topic which is populated by the JDBC connector. It seems to have no kafka message key:
ksql> print 'mssql-transaction-log' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"TransID": 8789405114, "UserID": 15, "ActionCode": 80, "GameName": "thisgame", "GameID": 148362, "DataCashRef": null, "Success": "Y", "StartBalance": 188036, "Amount": -25, "EndBalance": 188011, "BonusSta
rtBalance": 10000, "BonusAmount": 0, "BonusEndBalance": 10000, "Stamp": 1583162921467, "SiteID": 6}
I have created a stream from this:
CREATE STREAM TRANSACTIONS_LOG_RAW
(
TRANSID BIGINT,
USERID INTEGER,
ACTIONCODE INTEGER,
GAMENAME STRING,
GAMEID BIGINT,
DATACASHREF STRING,
SUCCESS STRING,
STARTBALANCE INTEGER,
AMOUNT INTEGER,
ENDBALANCE INTEGER,
BONUSSTARTBALANCE INTEGER,
BONUSAMOUNT INTEGER,
BONUSENDBALANCE INTEGER,
STAMP BIGINT,
SITEID INTEGER
)
WITH (KAFKA_TOPIC='mssql-transaction-log',
VALUE_FORMAT='AVRO',
KEY='USERID');
I have created a filtered stream from this one:
CREATE STREAM GAME_PURCHASES_RAW AS
SELECT USERID,
GAMENAME,
AMOUNT,
STAMP,
TIMESTAMPTOSTRING(STAMP, 'yyyyMMddHH') HOUR_DIMENSION,
TIMESTAMPTOSTRING(STAMP, 'yyyyMMdd') DAY_DIMENSION
FROM TRANSACTIONS_LOG_RAW
WHERE ACTIONCODE = 80
PARTITION BY USERID;
When I inspect these messages, there is no kafka key:
ksql> print 'GAME_PURCHASES_RAW' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"USERID": 58, "GAMENAME": "game", "AMOUNT": -50, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 191, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 70, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898980, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
When I describe the stream, it shows a key:
ksql> describe GAME_PURCHASES_RAW;
Name : GAME_PURCHASES_RAW
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
USERID | INTEGER (key)
GAMENAME | VARCHAR(STRING)
AMOUNT | INTEGER
STAMP | BIGINT
HOUR_DIMENSION | VARCHAR(STRING)
DAY_DIMENSION | VARCHAR(STRING)
--------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
I am going to create an aggregate from this GAME_PURCHASES_RAW stream grouping by USERID. I thought for aggregates, the kafka message key cannot be null because I need to keep messages for each unique USERID on the same partition.
Why does the stream for GAME_PURCHASES_RAW not show the key in the kafka message for the topic it creates?
Am I configuring message and ksql keys correctly to ensure my final aggregation is correct?
(I suspect my understanding of kafka keys vs ksql stream keys is lacking at some fundamental level)
The issue is that you have set KEY='USERID'
in the WITH
clause of TRANSACTIONS_LOG_RAW
. Remove this and it will work.
Setting a KEY
informs KSQL that the data in the specified column is the same as the data in the row's key. But this is not the case! The key in the raw log is null
.
When you later PARTITION BY USERID
, ksqlDB does not repartition your data and set the key, as you've told it the data is already partitioned by USERID
.
You're not alone in making this mistake. The use of KEY
in the WITH
clause is common cause of confusion, which is why we're removed it in later releases. You've on quite an old version there. A lot has changed, and been improved, since that version. I'd recommend upgrading!