Search code examples
ksqldb

How come KSQLDB push query outputs chunked json?


I'm new to KsqlDB so I might be missing something obvious. My question is related to the chunked JSON output of a never-ending push-query not being valid JSON. Let me elaborate.

In short, my setup is as follows. From a typescript/node process I've created a push query on a ksql stream as such:

CREATE STREAM events (id VARCHAR, timestamp VARCHAR, location VARCHAR, events ARRAY<VARCHAR>) WITH (kafka_topic='mytopic', value_format='json', partitions=1);

The push query itself is created as a long-running REST stream (using axios):

const response = await axios.post(
    `http://ksqldb-server:8088/query-stream`,
    {
        sql: `SELECT * FROM events EMIT CHANGES;`,
        streamsProperties: {}
    },
    {
        headers: {
            'Content-Type': 'application/vnd.ksql.v1+json',
            Accept: 'application/vnd.ksql.v1+json',
        },
        responseType: 'stream',
    }
);

This works. When run, I first get the header row:

[{"header":{"queryId":"transient_EVENTS_2815830975103425962","schema":"`ID` STRING, `TIMESTAMP` STRING, `LOCATION` STRING, `EVENTS` ARRAY<STRING>"}}

Followed by new rows coming in one-by-one based on real-world events:

{"row":{"columns":["b82baad7-a87e-4617-b18a-1782b4cb49ce","2022-05-16 08:03:03","Home",["EventA","EventD"]]}},\n

Now, if this query would ever complete it would probably end up as valid JSON when concatenated together (although the header row is missing a , at the end). Since it's a push query however, it never completes and as such I won't receive the closing ] - which means it will never be valid JSON. Also, I'm looking to process events in real-time, otherwise I could have written a pull query instead.

My expectations were that each new row would be parseble by itself using JSON.parse(). Instead, I've ended up having to JSON.parse(data.slice(0, -2)) to get rid of the additional ,\n. However, it does not feel right to put this into production.

What is the rational behind outputting chunked JSON on push queries? It seems an illogical format to me for any use-case.

And is there a way to alter the output of ksql events to what I would expect? Maybe some header or attribute I'm missing?

Thanks for your insights!


Solution

  • You explicitly set application/vnd.ksql.v1+json as your desired response format in the headers:

    headers: {
                'Content-Type': 'application/vnd.ksql.v1+json',
                Accept: 'application/vnd.ksql.v1+json',
            },
    

    application/vnd.ksql.v1+json means that the complete response will be a valid JSON doc.

    As you pointed out, this is impractical as the push query never completes. You should remove the headers or set them explicitly to the default application/vnd.ksqlapi.delimited.v1. application/vnd.ksqlapi.delimited.v1 means that the every returned row is going to be valid JSON.

    See https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#executing-pull-or-push-queries for more details.