I got two different topics with their own schema in avro format, X and Y. Both of these topics have a lot of fields. I want to create a table-stream join relation between them and output it to another topic in the following format:
{
id, // the id used to join them
x_name : X,
y_name: Y
}
With other words, I want to join these two with each source nested. I am able to join them in a normal way, however all fields are flattened out. Can this be achieved with KsqlDB? I've tried to find a good way of doing this without success.
EDIT:
Adding more information and example. Say I have two topics with this type of data.
product_supply
{
"product_id": 1,
"name": "name",
"stock": 11
"price": "141",
"storage_ids": [1, 2, 3]
}
product_information
{
"product_id": 1,
"description": "151",
"manufacturer": "ABC"
"Vendor_id": "5"
}
I'd like to use KsqlDB to join these tables in a unflattened manner and publish to a topic, like this:
{
"product_id": 1,
"product_information": {
"product_id": 1,
"description": "151",
"manufacturer": "ABC"
"Vendor_id": "5"
}
"product_supply": {
"product_id": 1,
"name": "name",
"stock": 11
"price": "141",
"storage_ids": [1, 2, 3]
}
}
I've added schema for each topic and wish if possible, to use the schemas without having to explicitly define each field in ksql.
There's a good guide here on working with structured data in ksqlDB. Based on this I was able to get this to work:
Create sample data
CREATE STREAM PRODUCT_SUPPLY (PRODUCT_ID INT, NAME VARCHAR, STOCK INT, PRICE INT, STORAGE_IDS ARRAY<INT>) WITH (KAFKA_TOPIC='PRODUCT_SUPPLY', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO');
CREATE TABLE PRODUCT_INFORMATION (PRODUCT_ID INT PRIMARY KEY, DESCRIPTION VARCHAR, MANUFACTURER VARCHAR, VENDOR_ID INT) WITH (KAFKA_TOPIC='PRODUCT_INFO', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO');
INSERT INTO PRODUCT_SUPPLY VALUES(1,'NAME',11,141,ARRAY[1,2,3]);
INSERT INTO PRODUCT_INFORMATION values (1,'151','abc',5);
Query the data
SET 'auto.offset.reset' = 'earliest';
SELECT PS.PRODUCT_ID AS PRODUCT_ID,
STRUCT(NAME := PS.NAME,
STOCK := PS.STOCK,
PRICE := PS.PRICE,
STORAGE_IDS := PS.STORAGE_IDS) AS PRODUCT_SUPPLY,
STRUCT(DESCRIPTION := PI.DESCRIPTION,
MANUFACTURER := PI.MANUFACTURER,
VENDOR_ID := PI.VENDOR_ID) AS PRODUCT_INFORMATION
FROM PRODUCT_SUPPLY PS
LEFT JOIN PRODUCT_INFORMATION PI
ON PS.PRODUCT_ID=PI.PRODUCT_ID
EMIT CHANGES LIMIT 1;
+-------------------------+-------------------------+-------------------------+
|PRODUCT_ID |PRODUCT_SUPPLY |PRODUCT_INFORMATION |
+-------------------------+-------------------------+-------------------------+
|1 |{NAME=NAME, STOCK=11, PRI|{DESCRIPTION=151, MANUFAC|
| |CE=141, STORAGE_IDS=[1, 2|TURER=abc, VENDOR_ID=5} |
| |, 3]} | |
Limit Reached
Query terminated