I'm currently prototyping a use case for ksqlDB which is supposed to read from a Kafka topic populated with String
keys and Avro
values.
I'm using the following docker-compose.yml
to setup my dev environment:
version: "3.9"
services:
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
- "29092:29092"
links:
- zookeeper
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_PORT: 9092
KAFKA_LISTENERS:
INTERNAL://:9092,
EXTERNAL://:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
INTERNAL:PLAINTEXT,
EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_LISTENERS:
INTERNAL://kafka:9092,
EXTERNAL://localhost:29092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
networks:
- network1
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
- network1
cp-schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
hostname: schema-registry
container_name: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: user
SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry-Auth
SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/jaas_config.conf
SCHEMA_REGISTRY_DEBUG: 'true'
volumes:
- './schema_registry:/etc/kafka/secrets:rw'
networks:
- network1
ksqldb-primary:
image: confluentinc/ksqldb-server:0.29.0
hostname: ksqldb-primary
depends_on:
- kafka
- cp-schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka:9092
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: user:user
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
networks:
- network1
# Access the cli by running:
# > docker compose exec ksqldb-cli ksql http://ksqldb-primary:8088
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.29.0
container_name: ksqldb-cli
depends_on:
- ksqldb-primary
entrypoint: /bin/sh
tty: true
networks:
- network1
redpanda:
image: vectorized/console:v2.3.5
ports:
- "5000:8080"
environment:
# All config properties can be entered via env
KAFKA_BROKERS: kafka:9092
KAFKA_SCHEMAREGISTRY_ENABLED: "true"
KAFKA_SCHEMAREGISTRY_URLS: http://schema-registry:8081
KAFKA_SCHEMAREGISTRY_USERNAME: user
KAFKA_SCHEMAREGISTRY_PASSWORD: user
networks:
- network1
networks:
network1:
ipam:
config:
- subnet: 172.177.0.0/16
I'm using the following services:
kafka_2.13-2.8.1
7.2.1
0.29.0
0.29.0
I created a topic called table-receipts
and registered a schema in the Confluent Schema Registry called table-receipts-value
:
GET localhost:8081/subjects/table-receipts-value/versions/1
{
"subject": "table-receipts-value",
"version": 1,
"id": 7,
"schema": "--omitted--"
}
I write this data using the io.confluent.kafka.serializers.KafkaAvroSerializer
, though I perform this serialization manually, and it works:
So far everything looks good.
After that, I try to create a stream in ksqlDB to process this topic:
CREATE STREAM table_receipts WITH (KAFKA_TOPIC='table-receipts', VALUE_FORMAT='AVRO');
and am immediately met with the following error:
Schema for message values on topic 'table-receipts' does not exist in the Schema Registry.);
Subject: table-receipts-value
Possible causes include:
- The topic itself does not exist
-> Use SHOW TOPICS; to check
- Messages on the topic are not serialized using a format Schema Registry supports
-> Use PRINT 'table-receipts' FROM BEGINNING; to verify
- Messages on the topic have not been serialized using a Confluent Schema Registry supported serializer
-> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
- The schema is registered on a different instance of the Schema Registry
-> Use the REST API to list available subjects https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects
- You do not have permissions to access the Schema Registry.
-> See https://docs.confluent.io/current/schema-registry/docs/security.html
ksql> PRINT 'table-receipts' FROM BEGINNING;
Key format: SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: does not match any supported format. It may be a STRING with encoding other than UTF8, or some other format.
rowtime: 2023/10/26 13:57:56.322 Z, key: [2c08af31-48fb-41b6-9@3546975261674595891/3774636806494052912], value: \x00\x00\x00\x00\x07\x90\xBE\x9B\xC9\xD5b\xD2\xDE\x9A\xC9\xD5b\x00H39b668b8-8bcd-406f-9449-973eac83d152\x1012345678\x06EUR\x04\x17p\x02\x02\x04\x17p\x00\x04\x17p\x00\x02\x0E50% off\x02\x04\x17p\x00\x02\x04\x02He3a7158b-0aeb-40c9-876f-0bd5b357d861\x02\x04\x1BX\x02\x0EPRODUCT\x02\x04\x0B\xB8\x00\x02H5db87916-f3ff-48c4-9163-1f6e3b99adde,Alkoholische Getr\xC3\xA4nke\x00\x04\x15\xCC\x02Hc2ac3dfc-c176-4655-b725-ae6eb01c60a7H5db87916-f3ff-48c4-9163-1f6e3b99adde\x12Cocktails\x00\x02He3a7158b-0aeb-40c9-876f-0bd5b357d861\x18Whiskey Sour\x081234Hc2ac3dfc-c176-4655-b725-ae6eb01c60a7\x00H61ef9eeb-ba7f-4e8b-98c1-fcd15a97d650H2c08af31-48fb-41b6-94b4-71b019c536f3\x020\x04\x17p\x02\x04\x15\xCCH694b9be0-903f-463b-8c47-4ab07e3b7d70\x04\x17p\x04\x1BX\x04\x01\xA4\x00\x10John Doe\x0412, partition: 9
^CTopic printing ceased
If I remove the basic auth from the schema registry, everything works:
ksql> CREATE STREAM table_receipts WITH (KAFKA_TOPIC='table-receipts', VALUE_FORMAT='AVRO');
Message
----------------
Stream created
----------------
According to this page however (bottom of section), it seems I'm using the correct settings for passing the basic auth parameters.
How can I pass basic auth credentials to the ksqlDB schema registry client? So, I guess the problem is that the ksqlDB schema registry client is not getting the correct parameters.
This ended up being a fail on my end.
The settings that are described in the docs refer to the confluentinc/cp-ksqldb-server
docker image.
I was using the "plain" confluentinc/ksqldb-server
. It seems for whatever reason, that one does not support passing in basic auth / USER_INFO credentials to the schema registry client, or at the very least I was unable to find out how this should be done.
Switching the image used to confluentinc/cp-ksqldb-server:7.5.1
made it so I could create the stream without any issues.
I'll leave this open for a bit longer in case anyone else would like to weigh in. I guess sometimes it helps to just go through the entire process in the head from the beginning.