Search code examples
apache-kafkaavroconfluent-schema-registryksqldb

Ksqldb: connect to Confluent Schema Registry with Basic Auth


I'm currently prototyping a use case for ksqlDB which is supposed to read from a Kafka topic populated with String keys and Avro values.

I'm using the following docker-compose.yml to setup my dev environment:

version: "3.9"

services:

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
      - "29092:29092"
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_PORT: 9092
      KAFKA_LISTENERS:
        INTERNAL://:9092,
        EXTERNAL://:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
        INTERNAL:PLAINTEXT,
        EXTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_LISTENERS:
        INTERNAL://kafka:9092,
        EXTERNAL://localhost:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
    networks:
      - network1

  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      - network1

  cp-schema-registry:
    image: confluentinc/cp-schema-registry:7.2.1
    hostname: schema-registry
    container_name: schema-registry
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
      SCHEMA_REGISTRY_AUTHENTICATION_ROLES: user
      SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry-Auth
      SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/jaas_config.conf
      SCHEMA_REGISTRY_DEBUG: 'true'
    volumes:
      - './schema_registry:/etc/kafka/secrets:rw'
    networks:
      - network1

  ksqldb-primary:
    image: confluentinc/ksqldb-server:0.29.0
    hostname: ksqldb-primary
    depends_on:
      - kafka
      - cp-schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka:9092
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
      KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: user:user
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
    networks:
      - network1

  # Access the cli by running:
  # > docker compose exec ksqldb-cli  ksql http://ksqldb-primary:8088
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.29.0
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-primary
    entrypoint: /bin/sh
    tty: true
    networks:
      - network1


  redpanda:
    image: vectorized/console:v2.3.5
    ports:
      - "5000:8080"
    environment:
      # All config properties can be entered via env
      KAFKA_BROKERS: kafka:9092
      KAFKA_SCHEMAREGISTRY_ENABLED: "true"
      KAFKA_SCHEMAREGISTRY_URLS: http://schema-registry:8081
      KAFKA_SCHEMAREGISTRY_USERNAME: user
      KAFKA_SCHEMAREGISTRY_PASSWORD: user
    networks:
      - network1


networks:
  network1:
    ipam:
      config:
        - subnet: 172.177.0.0/16

I'm using the following services:

  • Kafka kafka_2.13-2.8.1
  • Zookeeper
  • Confluent Schema Registry 7.2.1
  • ksqlDB 0.29.0
  • ksqlDB CLI 0.29.0
  • Redpanda Console UI for a nice Kafka UI to see my topics and configs.

I created a topic called table-receipts and registered a schema in the Confluent Schema Registry called table-receipts-value:

GET localhost:8081/subjects/table-receipts-value/versions/1
{
    "subject": "table-receipts-value",
    "version": 1,
    "id": 7,
    "schema": "--omitted--"
}

I write this data using the io.confluent.kafka.serializers.KafkaAvroSerializer, though I perform this serialization manually, and it works:

enter image description here

So far everything looks good.

After that, I try to create a stream in ksqlDB to process this topic:

CREATE STREAM table_receipts WITH (KAFKA_TOPIC='table-receipts', VALUE_FORMAT='AVRO');

and am immediately met with the following error:

Schema for message values on topic 'table-receipts' does not exist in the Schema Registry.);
Subject: table-receipts-value
Possible causes include:
- The topic itself does not exist
        -> Use SHOW TOPICS; to check
- Messages on the topic are not serialized using a format Schema Registry supports
        -> Use PRINT 'table-receipts' FROM BEGINNING; to verify
- Messages on the topic have not been serialized using a Confluent Schema Registry supported serializer
        -> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
- The schema is registered on a different instance of the Schema Registry
        -> Use the REST API to list available subjects  https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects
- You do not have permissions to access the Schema Registry.
        -> See https://docs.confluent.io/current/schema-registry/docs/security.html
ksql>  PRINT 'table-receipts' FROM BEGINNING; 

Key format: SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format:  does not match any supported format. It may be a STRING with encoding other than UTF8, or some other format.
rowtime: 2023/10/26 13:57:56.322 Z, key: [2c08af31-48fb-41b6-9@3546975261674595891/3774636806494052912], value: \x00\x00\x00\x00\x07\x90\xBE\x9B\xC9\xD5b\xD2\xDE\x9A\xC9\xD5b\x00H39b668b8-8bcd-406f-9449-973eac83d152\x1012345678\x06EUR\x04\x17p\x02\x02\x04\x17p\x00\x04\x17p\x00\x02\x0E50% off\x02\x04\x17p\x00\x02\x04\x02He3a7158b-0aeb-40c9-876f-0bd5b357d861\x02\x04\x1BX\x02\x0EPRODUCT\x02\x04\x0B\xB8\x00\x02H5db87916-f3ff-48c4-9163-1f6e3b99adde,Alkoholische Getr\xC3\xA4nke\x00\x04\x15\xCC\x02Hc2ac3dfc-c176-4655-b725-ae6eb01c60a7H5db87916-f3ff-48c4-9163-1f6e3b99adde\x12Cocktails\x00\x02He3a7158b-0aeb-40c9-876f-0bd5b357d861\x18Whiskey Sour\x081234Hc2ac3dfc-c176-4655-b725-ae6eb01c60a7\x00H61ef9eeb-ba7f-4e8b-98c1-fcd15a97d650H2c08af31-48fb-41b6-94b4-71b019c536f3\x020\x04\x17p\x02\x04\x15\xCCH694b9be0-903f-463b-8c47-4ab07e3b7d70\x04\x17p\x04\x1BX\x04\x01\xA4\x00\x10John Doe\x0412, partition: 9
^CTopic printing ceased

If I remove the basic auth from the schema registry, everything works:

ksql> CREATE STREAM table_receipts WITH (KAFKA_TOPIC='table-receipts', VALUE_FORMAT='AVRO');

 Message        
----------------
 Stream created 
----------------

According to this page however (bottom of section), it seems I'm using the correct settings for passing the basic auth parameters.

How can I pass basic auth credentials to the ksqlDB schema registry client? So, I guess the problem is that the ksqlDB schema registry client is not getting the correct parameters.


Solution

  • This ended up being a fail on my end.

    The settings that are described in the docs refer to the confluentinc/cp-ksqldb-server docker image.

    I was using the "plain" confluentinc/ksqldb-server. It seems for whatever reason, that one does not support passing in basic auth / USER_INFO credentials to the schema registry client, or at the very least I was unable to find out how this should be done.

    Switching the image used to confluentinc/cp-ksqldb-server:7.5.1 made it so I could create the stream without any issues.

    I'll leave this open for a bit longer in case anyone else would like to weigh in. I guess sometimes it helps to just go through the entire process in the head from the beginning.