I'm new to KsqlDB so I might be missing something obvious. My question is related to the chunked JSON output of a never-ending push-query not being valid JSON. Let me elaborate.
In short, my setup is as follows. From a typescript/node process I've created a push query on a ksql stream as such:
CREATE STREAM events (id VARCHAR, timestamp VARCHAR, location VARCHAR, events ARRAY<VARCHAR>) WITH (kafka_topic='mytopic', value_format='json', partitions=1);
The push query itself is created as a long-running REST stream (using axios):
const response = await axios.post(
`http://ksqldb-server:8088/query-stream`,
{
sql: `SELECT * FROM events EMIT CHANGES;`,
streamsProperties: {}
},
{
headers: {
'Content-Type': 'application/vnd.ksql.v1+json',
Accept: 'application/vnd.ksql.v1+json',
},
responseType: 'stream',
}
);
This works. When run, I first get the header row:
[{"header":{"queryId":"transient_EVENTS_2815830975103425962","schema":"`ID` STRING, `TIMESTAMP` STRING, `LOCATION` STRING, `EVENTS` ARRAY<STRING>"}}
Followed by new rows coming in one-by-one based on real-world events:
{"row":{"columns":["b82baad7-a87e-4617-b18a-1782b4cb49ce","2022-05-16 08:03:03","Home",["EventA","EventD"]]}},\n
Now, if this query would ever complete it would probably end up as valid JSON when concatenated together (although the header row is missing a ,
at the end). Since it's a push query however, it never completes and as such I won't receive the closing ]
- which means it will never be valid JSON. Also, I'm looking to process events in real-time, otherwise I could have written a pull query instead.
My expectations were that each new row would be parseble by itself using JSON.parse()
. Instead, I've ended up having to JSON.parse(data.slice(0, -2))
to get rid of the additional ,\n
. However, it does not feel right to put this into production.
What is the rational behind outputting chunked JSON on push queries? It seems an illogical format to me for any use-case.
And is there a way to alter the output of ksql events to what I would expect? Maybe some header or attribute I'm missing?
Thanks for your insights!
You explicitly set application/vnd.ksql.v1+json
as your desired response format in the headers:
headers: {
'Content-Type': 'application/vnd.ksql.v1+json',
Accept: 'application/vnd.ksql.v1+json',
},
application/vnd.ksql.v1+json
means that the complete response will be a valid JSON doc.
As you pointed out, this is impractical as the push query never completes. You should remove the headers or set them explicitly to the default application/vnd.ksqlapi.delimited.v1
. application/vnd.ksqlapi.delimited.v1
means that the every returned row is going to be valid JSON.
See https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#executing-pull-or-push-queries for more details.