I'm trying to figure out how to make http2 requests to ksqldb. I found this project which successfully does it (I adapted to it to js code below). However, this code is using async generators while I want to create a simple client which makes a one-off request and that's it. This is my adaption:
const http2 = require('http2');
const DELIMITED_CONTENT_TYPE = 'application/vnd.ksqlapi.delimited.v1';
const parseChunk = (buf) => {
return buf
.toString()
.split('\n')
.filter((str) => str);
};
const main = () => {
const session = http2.connect('http://localhost:8088');
const stream = session.request({
[http2.constants.HTTP2_HEADER_PATH]: '/query-stream',
[http2.constants.HTTP2_HEADER_METHOD]: 'POST',
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: DELIMITED_CONTENT_TYPE,
});
const query = {
sql: `SELECT * FROM test_view where name='john';`,
};
const reqPayload = Buffer.from(JSON.stringify(query));
stream.end(reqPayload);
stream
.on('error', (error) => {
console.error(error);
})
.on('close', () => console.log('close'))
.on('abort', () => console.log('abort'))
.on('timeout', () => console.log('timeout'));
let chunk;
let nextLines = '';
console.log('before while');
while ((chunk = stream.read())) {
nextLines += parseChunk(chunk);
console.log('nextLines', !!nextLines);
}
};
main();
Unfortunately this code doesn't work: chunk
is always null. I don't understand what I am missing from the original project. I adapted the Typescript file to a Javascript file in order to easily run it (it works):
const http2 = require('http2');
const DELIMITED_CONTENT_TYPE = 'application/vnd.ksqlapi.delimited.v1';
class QueryStream {
closed = false;
constructor(session, queryStreamArgs, mapRow) {
this.session = session;
this.queryStreamArgs = queryStreamArgs;
this.mapRow = mapRow;
}
headers() {
return {
[http2.constants.HTTP2_HEADER_PATH]: '/query-stream',
[http2.constants.HTTP2_HEADER_METHOD]: 'POST',
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: DELIMITED_CONTENT_TYPE,
};
}
parseChunk(buf) {
return buf
.toString()
.split('\n')
.filter((str) => str);
}
initQueryStream() {
const stream = this.session.request(this.headers());
// we write params into the request stream, then end the request stream.
// if we don't end the request stream, the req isn't routed on the server.
// note that the _response_ stream does not close, so we still get results.
const reqPayload = Buffer.from(JSON.stringify(this.queryStreamArgs));
stream.end(reqPayload);
return stream;
}
}
class AsyncIteratorQueryStream extends QueryStream {
[Symbol.asyncIterator]() {
// array of resolve/reject tuples represents pending work
// const promised: [(ret: any) => void, (err: any) => void][] = [];
const promised = [];
// unprocessed query response lines returned by the server
const received = [];
const stream = this.initQueryStream();
const destroyStream = (err) => {
// close existing promises
for (const [resolve, reject] of promised) {
if (err) {
reject(err);
} else {
const [error] = received;
try {
const parsedError = error && JSON.parse(error);
if (parsedError?.status === 'error') {
reject(new Error(parsedError.message));
} else {
return resolve({ value: received, done: true });
}
} catch (e) {
reject(new Error(e.message));
}
}
}
stream.destroy();
this.closed = true;
};
stream
.on('error', (error) => destroyStream(error))
.on('close', () => destroyStream())
.on('abort', () => destroyStream(new Error('abort')))
.on('timeout', () => destroyStream(new Error('timeout')));
// the work loop delivers query result data by delimited row.
// given demand, reads next buffer from the stream if available.
const doWork = () => {
if (this.closed) {
return;
}
// process available query response lines
while (promised.length && received.length) {
const [resolve] = promised.shift();
const rawJson = received.shift();
resolve(rawJson);
}
if (promised.length) {
// pending work is unfulfilled; try to read it from stream
const next = stream.read();
if (next != null) {
const nextLines = this.parseChunk(next);
received.push(...nextLines);
}
// loop work
setImmediate(() => doWork());
}
};
// enqueue work to be handled by the work loop
const nextPromise = () =>
new Promise((resolve, reject) => promised.push([resolve, reject]));
// the first promise parses query response metadata and returns col names.
const getRowKeys = nextPromise().then((rawMeta) => {
try {
let meta = {};
if (typeof rawMeta !== 'string' && rawMeta.value) {
meta = JSON.parse(rawMeta.value[0]);
} else if (typeof rawMeta === 'string') {
meta = JSON.parse(rawMeta);
}
if (meta.status === 'error') {
const err = meta;
destroyStream(err);
throw err;
}
return meta.columnNames;
} catch (e) {
destroyStream(new Error(e.message));
}
});
doWork();
// return async iterator contract
return {
next: () => {
if (this.closed) {
return Promise.resolve({ value: undefined, done: true });
}
// enqueue the next row handler
return getRowKeys.then((ks) => {
const enqueued = nextPromise().then((rawRow) => {
const value = this.mapRow(rawRow, ks);
return { value, done: false };
});
doWork();
return enqueued;
});
},
return: () => {
destroyStream();
return Promise.resolve({ value: undefined, done: true });
},
};
}
}
const asyncIteratorQueryStream = (session, queryStreamArgs, nameKey) => {
const mapRow = (rawRow, ks) => {
let row = [];
try {
row = JSON.parse(rawRow);
} catch (e) {
row = [rawRow];
}
return ks.reduce(
(acc, k, i) => {
acc[nameKey][k] = row[i];
return acc;
},
{ [nameKey]: {} }
);
};
return new AsyncIteratorQueryStream(session, queryStreamArgs, mapRow);
};
const main = async () => {
const session = http2.connect('http://localhost:8088');
const q = asyncIteratorQueryStream(session, {
sql: `SELECT * FROM test_view where name='john';`,
});
for await (const row of q) {
console.log('row', JSON.stringify(row));
}
};
main().catch((e) => console.error(e));
// usage
//
// const session = http2.connect('https://localhost:8089');
// const q = asyncIteratorQueryStream(session, { sql: 'select * from foo' });
// (async () => {
// for await (const row: Record<string, any> of q) {
// // ...
// }
// })();
I finally wrote a code version which works (nodejs version 14):
class KsqlDBClient {
constructor(ksqlDBBaseUrl) {
this.client = http2.connect(ksqlDBBaseUrl);
this.client.on('error', (err) => console.error(err));
}
request(query) {
const session = this.client.request({
[http2.constants.HTTP2_HEADER_PATH]: '/query-stream',
[http2.constants.HTTP2_HEADER_METHOD]: 'POST',
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]:
'application/vnd.ksql.v1+json',
});
session.setEncoding('utf8');
session.on('data', (queryResult) => {
console.log('queryResult', queryResult);
});
const payload = Buffer.from(JSON.stringify(query));
session.end(payload);
}
}
const query = {
sql: `SELECT * FROM test_view emit changes;`,
};
const client = new KsqlDBClient('http://localhost:8088');
client.request(query);