I am using this local environment:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.5.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
schema-registry:
image: confluentinc/cp-schema-registry:7.5.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://broker:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_DEBUG: 'true'
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.5.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.5.1
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
I have created these schemas:
{
"$id": "http://schema-registry:8081/schemas/ids/1",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Location",
"type": "object",
"properties": {
"profileId": {
"type":"string"
}
}
}
Which I post using this file schema-key-registry.json
{
"schemaType":"JSON",
"schema":"{\"$id\":\"http://schema-registry:8081/schemas/ids/1\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"title\":\"Location\",\"type\":\"object\",\"properties\":{\"profileId\":{\"type\":\"string\"}}}"
}
{
"$id": "http://schema-registry:8081/schemas/ids/1",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Location",
"type": "object",
"properties": {
"profileId": {
"type": "string",
"description": "The id of the location."
},
"latitude": {
"type": "number",
"minimum": -90,
"maximum": 90,
"description": "The location's latitude."
},
"longitude": {
"type": "number",
"minimum": -180,
"maximum": 180,
"description": "The location's longitude."
}
}
}
Which I post using this file schema-value-registry.json
{
"schemaType":"JSON",
"schema":"{\"$id\":\"http://schema-registry:8081/schemas/ids/1\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"title\":\"Location\",\"type\":\"object\",\"properties\":{\"profileId\":{\"type\":\"string\",\"description\":\"The id of the location.\"},\"latitude\":{\"type\":\"number\",\"minimum\":-90,\"maximum\":90,\"description\":\"The location's latitude.\"},\"longitude\":{\"type\":\"number\",\"minimum\":-180,\"maximum\":180,\"description\":\"The location's longitude.\"}}}"
}
and registerd them
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "$(cat schema-key-registry.json)" \
http://localhost:8081/subjects/locations-key/versions
{"id":1}
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "$(cat schema-value-registry.json)" \
http://localhost:8081/subjects/locations/versions
{"id":2}
Producing messages works and fails when the message violates the schema.
kafka-json-schema-console-producer \
--bootstrap-server broker:9092 \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema.id=2 \
--property key.schema.id=1 \
--property key.separator='|' \
--property parse.key=true \
--topic locations
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-180.000}
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-179}
Consuming works too
kafka-json-schema-console-consumer \
--bootstrap-server broker:9092 \
--from-beginning \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--property key.separator='|' \
--topic locations
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-180.000}
{"profileId":"asdfghjkl"}|{"profileId":"asdfghjkl","latitude":90.000,"longitude":-179}
Using ksqldb
I create a table:
ksql> CREATE TABLE loc WITH (
>KAFKA_TOPIC = 'locations',
>KEY_FORMAT = 'JSON_SR',
>KEY_SCHEMA_ID = 1,
>VALUE_FORMAT = 'JSON_SR',
>VALUE_SCHEMA_ID = 2
>);
Which looks like this:
ksql> describe loc;
Name : LOC
Field | Type
-------------------------------------------------------------
ROWKEY | STRUCT<profileId VARCHAR(STRING)> (primary key)
profileId | VARCHAR(STRING)
latitude | DOUBLE
longitude | DOUBLE
-------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
When I try to define a push query I get this error:
ksql> SELECT * FROM loc EMIT CHANGES;
+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|ROWKEY |profileId |latitude |longitude |
+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=locations, partition=0, offset=3, stacktrace=io.confluent.ksql.serde.KsqlSerializationException: Error serializing message to topic: _confluent-ksql-default_transient_transient_LOC_4102565114369480088_1698924612467-KsqlTopic-Reduce-changelog. Mismatching schema.
Hint: You probably forgot to add VALUE_SCHEMA_ID when creating the source.
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56)
at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
at io.confluent.ksql.serde.unwrapped.UnwrappedSerializer.serialize(UnwrappedSerializer.java:56)
at io.confluent.ksql.serde.unwrapped.UnwrappedSerializer.serialize(UnwrappedSerializer.java:31)
at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:174)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.keyBytes(MeteredKeyValueStore.java:431)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:330)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:877)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:330)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:131)
at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.put(KeyValueStoreWrapper.java:86)
at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:152)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:792)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:877)
at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:792)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:723)
at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
Caused by: org.apache.kafka.connect.errors.DataException: Mismatching schema.
at io.confluent.connect.json.JsonSchemaData.fromConnectData(JsonSchemaData.java:524)
at io.confluent.connect.json.JsonSchemaConverter.fromConnectData(JsonSchemaConverter.java:94)
at io.confluent.connect.json.JsonSchemaConverter.fromConnectData(JsonSchemaConverter.java:85)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:53)
... 33 more
What am I doing wrong?
Your schema with id=2 is missing -value
in the subject name, therefore you probably have a different one with id=3, hence the error "mismatched schema"
Note: kafka-json-schema-console-producer
will automatically register the schemas for you
Also, if the key is really "one string field", you don't really need a schema/object for it - just use that value itself. That will also assist with KSQL / Kafka Streams key comparison weirdness (i.e. JSON can include whitespace, and {"id":"foo"}
will not be in the same partition and compared to { "id" : "foo" }
; using just "foo"
would be much better). You can pass --key-serializer
in the console producer to use StringSerializer, for example.