Search code examples
apache-kafkaapache-kafka-streamsksqldb

How to use few fields in a key of kafka stream, joining stream and table?


I have stream with Coupon info from topic:

CREATE STREAM personal_coupons
(Coupon VARCHAR KEY,
CouponType VARCHAR,
MarketingArea VARCHAR,
CouponCode VARCHAR,
CouponName VARCHAR) WITH
(KAFKA_TOPIC = 'Coupons_Personal',
VALUE_FORMAT = 'JSON');

And I have table with two fields - Coupon and GUID

CREATE TABLE coupon_and_guid(Coupon varchar PRIMARY KEY,
guid varchar)
WITH (KAFKA_TOPIC = 'Coupon_GUID',VALUE_FORMAT = 'JSON');

I try to join it with:

CREATE STREAM coupon_with_guid WITH (KEY_FORMAT = 'JSON', VALUE_FORMAT = 'JSON') AS
SELECT
personal_coupons.Coupon,
COUPON_AND_GUID.guid,
CouponType,
MarketingArea,
AS_VALUE(personal_coupons.Coupon),
CouponCode,
CouponName
FROM personal_coupons
LEFT JOIN coupon_and_guid ON personal_coupons.Coupon = coupon_and_guid.coupon
PARTITION BY personal_coupons.Coupon,COUPON_AND_GUID.guid EMIT CHANGES;

And I've got message with format:

key: {"PERSONAL_COUPONS_COUPON":"{\"Coupon\":\"1-2NAZTM69\"}","GUID":null}
value: {"COUPONTYPE":"MULTI","COUPONCONTACTRELATIONSHIPTYPE":"03","MARKETINGAREA":"VKUSOMANIA","KSQL_COL_0":"{\"Coupon\":\"1-2NAZTM69\"}","COUPONORIGIN":"Siebel","COUPONSTATUS":"01","LANGUAGE":"RU","COUPONCODE":"9001196300379670","COUPONNAME":"1000275479000214"}

But I want to get:

 key: {"Coupon":"1-2NAZTM69","GUID":null}
 value: {"COUPONTYPE":"MULTI","COUPONCONTACTRELATIONSHIPTYPE":"03","MARKETINGAREA":"VKUSOMANIA","Coupon":"1-2NAZTM69","COUPONORIGIN":"Siebel","COUPONSTATUS":"01","LANGUAGE":"RU","COUPONCODE":"9001196300379670","COUPONNAME":"1000275479000214"}

What I did wrong and how can i fix it?


Solution

  • I found this solution - using EXTRACTJSONFIELD ksql function

    CREATE STREAM coupon_with_guid WITH (KEY_FORMAT = 'JSON', VALUE_FORMAT = 'JSON') AS
    SELECT
    EXTRACTJSONFIELD(personal_coupons.Coupon, '$.Coupon') as Coupon,
    COUPON_AND_GUID.guid,
    CouponType,
    MarketingArea,
    AS_VALUE(personal_coupons.Coupon),
    CouponCode,
    CouponName
    FROM personal_coupons
    LEFT JOIN coupon_and_guid ON personal_coupons.Coupon = coupon_and_guid.coupon
    PARTITION BY EXTRACTJSONFIELD(personal_coupons.Coupon, '$.Coupon'),COUPON_AND_GUID.guid EMIT CHANGES;