I am trying to create a kafka table using the (Confluent) ksqldb-server via its REST interface using the following code (bash script):
KSQLDB_COMMAND="CREATE TABLE sample_table \
(xkey VARCHAR, \
xdata VARCHAR) \
WITH (KAFKA_TOPIC=\'sample-topic\', \
VALUE_FORMAT=\'JSON\', \
KEY=\'xkey\'); "
COMMAND="curl -X 'POST' '$KSQLDB_SERVER' \
-H 'Content-Type: application/vnd.ksql.v1+json; charset=utf-8' \
-d '{ \"ksql\": \"$KSQLDB_COMMAND\" }' "
eval $COMMAND
The following error output message is returned:
{"@type":"statement_error","error_code":40001,"message":"Failed to prepare statement: Invalid config variable(s) in the WITH clause: KEY","statementText":"CREATE TABLE sample_table (xkey VARCHAR, xdata VARCHAR) WITH (KAFKA_TOPIC='sample-topic', VALUE_FORMAT='JSON', KEY='xkey');","entities":[]}%
The error suggests an error in the actual statement, in particular with the KEY attribute.
I can get basic commands ("LIST STREAMS" etc) working using the REST interface but can not create tables, so I figure this is a problem in the KSQL statement or how I am create the bash command (in "COMMAND" variable).
Any help is appreciated.
I spent a fair bit of time experimenting and got this simple example working (my original attempt required too many bash variable substitutions to make it useful/maintainable, so this version is simplified quite a bit). I also found that KSQLDB table names must follow regular SQL naming conventions for table names (ie. alpha, underscores, etc... but no hyphens, which caused a bunch of errors in my original question... I should have read the documentation more carefully).
The following works (you may need to change your KSQLDB server address)... and with minimal changes, just about any KSQLDB command can be executed:
####
# NOTE: table MUST be alpha (underscores are OK)... hyphens are not allowed
####
KSQLDB_SERVER="http://localhost:8088/ksql"
KSQLDB_TABLE="some_table"
KSQLDB_TOPIC="some_topic"
VALUE_FORMAT="JSON"
FMT="{ \"ksql\": \"CREATE TABLE %s (key VARCHAR PRIMARY KEY, data VARCHAR) WITH (KAFKA_TOPIC='%s', VALUE_FORMAT='%s');\" }"
JSON_DATA=$(printf "$FMT" "$KSQLDB_TABLE" "$KSQLDB_TOPIC" "$VALUE_FORMAT")
curl -X "POST" "$KSQLDB_SERVER" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d "$JSON_DATA"