Search code examples
apache-kafkaksqldb

ksqldb keeps saying - VALUE_FORMAT should support schema inference when VALUE_SCHEMA_ID is provided. Current format is JSON


I'm trying to create a stream in ksqldb to a topic in Kafka using an avro schema.

The command looks like this:

CREATE STREAM customer_stream WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='JSON', VALUE_SCHEMA_ID=1);

Topic customers looks like this: Using the command - print 'customers';

Key format: ¯_(ツ)_/¯ - no data processed Value format: JSON or KAFKA_STRING rowtime: 2022/09/29 12:34:53.440 Z, key: , value: {"Name":"John Smith","PhoneNumbers":["212 555-1111","212 555-2222"],"Remote":false,"Height":"62.4","FicoScore":" > 640"}, partition: 0 rowtime: 2022/09/29 12:34:53.440 Z, key: , value: {"Name":"Jane Smith","PhoneNumbers":["269 xxx-1111","269 xxx-2222"],"Remote":false,"Height":"69.9","FicoScore":" > 690"}, partition: 0

To this topic an avro schema has been added.

{
    "type": "record",
    "name": "Customer",
    "namespace": "com.acme.avro",
    "fields": [{
            "name": "ficoScore",
            "type": ["null", "string"],
            "default": null
        }, {
            "name": "height",
            "type": ["null", "double"],
            "default": null
        }, {
            "name": "name",
            "type": ["null", "string"],
            "default": null
        }, {
            "name": "phoneNumbers",
            "type": ["null", {
                    "type": "array",
                    "items": ["null", "string"]
                }
            ],
            "default": null
        }, {
            "name": "remote",
            "type": ["null", "boolean"],
            "default": null
        }
    ]
}

When I run the command below I got this reply:

CREATE STREAM customer_stream WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='JSON', VALUE_SCHEMA_ID=1);

VALUE_FORMAT should support schema inference when VALUE_SCHEMA_ID is provided. Current format is JSON.

Any suggestion?


Solution

  • JSON doesn't use schema IDs. JSON_SR format does, but if you want Avro, then you need to use AVRO as the format.

    You dont "add schemas" to topics. You can only register them in the registry.

    Example of converting JSON to Avro with kSQL:

    CREATE STREAM sensor_events_json (sensor_id VARCHAR, temperature INTEGER, ...)
      WITH (KAFKA_TOPIC='events-topic', VALUE_FORMAT='JSON');
    
    CREATE STREAM sensor_events_avro WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM sensor_events_json;
    

    Notice that you dont need to refer to any ID as the serializer will auto-register the necessary schema.