Search code examples
jsonapache-kafkaconfluent-schema-registryksqldb

How to create a ksqlDB table that deserizalies from a topic using schema-registry?


I am using this local environment:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.5.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://broker:9092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_DEBUG: 'true'

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.5.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.5.1
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

I have created these schemas:

{
  "$id": "http://schema-registry:8081/schemas/ids/1",
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "Location",
  "type": "object",
  "properties": {
    "profileId": {
        "type":"string"
    }
  }
}

Which I post using this file schema-key-registry.json

{
  "schemaType":"JSON",
  "schema":"{\"$id\":\"http://schema-registry:8081/schemas/ids/1\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"title\":\"Location\",\"type\":\"object\",\"properties\":{\"profileId\":{\"type\":\"string\"}}}"
}
{
  "$id": "http://schema-registry:8081/schemas/ids/1",
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "Location",
  "type": "object",
  "properties": {
    "profileId": {
      "type": "string",
      "description": "The id of the location."
    },
    "latitude": {
      "type": "number",
      "minimum": -90,
      "maximum": 90,
      "description": "The location's latitude."
    },
    "longitude": {
      "type": "number",
      "minimum": -180,
      "maximum": 180,
      "description": "The location's longitude."
    }
  }
}

Which I post using this file schema-value-registry.json

{
  "schemaType":"JSON",
  "schema":"{\"$id\":\"http://schema-registry:8081/schemas/ids/1\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"title\":\"Location\",\"type\":\"object\",\"properties\":{\"profileId\":{\"type\":\"string\",\"description\":\"The id of the location.\"},\"latitude\":{\"type\":\"number\",\"minimum\":-90,\"maximum\":90,\"description\":\"The location's latitude.\"},\"longitude\":{\"type\":\"number\",\"minimum\":-180,\"maximum\":180,\"description\":\"The location's longitude.\"}}}"
}

and registerd them

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data "$(cat schema-key-registry.json)" \
  http://localhost:8081/subjects/locations-key/versions
{"id":1}

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data "$(cat schema-value-registry.json)" \
  http://localhost:8081/subjects/locations/versions
{"id":2}

Producing messages works and fails when the message violates the schema.

kafka-json-schema-console-producer \
--bootstrap-server broker:9092 \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema.id=2 \
--property key.schema.id=1 \
--property key.separator='|' \
--property parse.key=true \
--topic locations
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-180.000}
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-179}

Consuming works too

kafka-json-schema-console-consumer \
--bootstrap-server broker:9092  \
--from-beginning \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--property key.separator='|' \
--topic locations 
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-180.000}
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-179}

Using ksqldb I create a table:

ksql> CREATE TABLE loc WITH (
>KAFKA_TOPIC = 'locations',
>KEY_FORMAT = 'JSON_SR',
>KEY_SCHEMA_ID = 1,
>VALUE_FORMAT = 'JSON_SR',
>VALUE_SCHEMA_ID = 2
>);

Which looks like this:

ksql> describe loc;

Name                 : LOC
 Field     | Type
-------------------------------------------------------------
 ROWKEY    | STRUCT<profileId VARCHAR(STRING)> (primary key)
 profileId | VARCHAR(STRING)
 latitude  | DOUBLE
 longitude | DOUBLE
-------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

When I try to define a push query I get this error:

ksql> SELECT * FROM loc EMIT CHANGES;
+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|ROWKEY                                                                                                 |profileId                                                                                              |latitude                                                                                               |longitude                                                                                              |
+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=locations, partition=0, offset=3, stacktrace=io.confluent.ksql.serde.KsqlSerializationException: Error serializing message to topic: _confluent-ksql-default_transient_transient_LOC_4102565114369480088_1698924612467-KsqlTopic-Reduce-changelog. Mismatching schema.
Hint: You probably forgot to add VALUE_SCHEMA_ID when creating the source.
        at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56)
        at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
        at io.confluent.ksql.serde.unwrapped.UnwrappedSerializer.serialize(UnwrappedSerializer.java:56)
        at io.confluent.ksql.serde.unwrapped.UnwrappedSerializer.serialize(UnwrappedSerializer.java:31)
        at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
        at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
        at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:174)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.keyBytes(MeteredKeyValueStore.java:431)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:330)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:877)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:330)
        at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:131)
        at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.put(KeyValueStoreWrapper.java:86)
        at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:124)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:152)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:792)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:877)
        at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:792)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:723)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
Caused by: org.apache.kafka.connect.errors.DataException: Mismatching schema.
        at io.confluent.connect.json.JsonSchemaData.fromConnectData(JsonSchemaData.java:524)
        at io.confluent.connect.json.JsonSchemaConverter.fromConnectData(JsonSchemaConverter.java:94)
        at io.confluent.connect.json.JsonSchemaConverter.fromConnectData(JsonSchemaConverter.java:85)
        at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:53)
        ... 33 more

What am I doing wrong?


Solution

  • Your schema with id=2 is missing -value in the subject name, therefore you probably have a different one with id=3, hence the error "mismatched schema"

    Note: kafka-json-schema-console-producer will automatically register the schemas for you

    Also, if the key is really "one string field", you don't really need a schema/object for it - just use that value itself. That will also assist with KSQL / Kafka Streams key comparison weirdness (i.e. JSON can include whitespace, and {"id":"foo"} will not be in the same partition and compared to { "id" : "foo" }; using just "foo" would be much better). You can pass --key-serializer in the console producer to use StringSerializer, for example.