Search code examples
apache-kafkaconfluent-platformksqldb

KSQL streams - EXPLODE null issue


I am working on a JSON stream related to Malaria medicine availability in Zambia and have come across an issue I can't seem to find an answer for online. I am being sent JSON that looks like the one below.

{
    "Country": "Zambia",
    "City": "Lusaka",
    "Area": [
        "Northmead"
    ],
    "MalariaMedicine": [
        {
            "pharmacyName": "Northmead Health",
            "brand": "Chloroquin",
            "quantity": 65,
            "batchNumber": "CHLORO 628 C",
            "bestBeforeDate": "2025-05-23",
            "expired": false,
            "batchInformation": {
                "number": "CHLORO 628 C",
                "expiration": "2025-01-23"
            }
        },
        {
            "pharmacyName": "Prime Pharmacy",
            "brand": "Quinin",
            "quantity": 205,
            "batchNumber": "QUIN 560 Q",
            "bestBeforeDate": "2028-01-01",
            "expired": false,
            "batchInformation": {
                "number": "QUIN 560 Q",
                "expiration": "2028-01-01"
            }
        }
    ]
}

I have pushed the JSON into a topic called Malaria and I used the code below to create a JSON stream.

CREATE STREAM MALARIASTREAM 
(
    COUNTRY STRING, 
    CITY STRING, 
    AREA ARRAY<STRING>, 
    MALARIAMEDICINE ARRAY<STRUCT<PHARMACYNAME STRING, BRAND STRING, QUANTITY INTEGER, BATCHNUMBER STRING, BESTBEFOREDATE STRING, EXPIRED BOOLEAN, BATCHINFORMATION STRUCT<NUMBER STRING, EXPIRATION STRING>>>
) 
WITH (KAFKA_TOPIC='Malaria', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

The issue I have comes when I try to extract the data using the SELECT statement below

SELECT 
    COUNTRY, 
    CITY,
    EXPLODE(AREA) AS AREA,
    EXPLODE(MALARIAMEDICINE)->pharmacyName,
    EXPLODE(MALARIAMEDICINE)->brand,
    EXPLODE(MALARIAMEDICINE)->quantity,
    EXPLODE(MALARIAMEDICINE)->batchNumber,
    EXPLODE(MALARIAMEDICINE)->bestBeforeDate,
    EXPLODE(MALARIAMEDICINE)->expired 
FROM 
    MalariaStream EMIT CHANGES;

In the result set returned, the value of the AREA column is NULL for the second row. Both pharmacies are in the Northmead area so I want the second row to say Northmead as well. enter image description here

How do I get the second row to also say Northmead?


Solution

  • If you know that you will always have one element arrays, you could use ELT(1, Area) to select the first element of that singleton array.

    https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#elt