Search code examples
jsonapache-kafkaapache-kafka-connectjsonschemaconfluent-platform

Kafka Connect schema format for JSONConverter


I am using Kafka Connect to retrieve an existing schema from the schema registry and then trying to convert the returned schema string using JSONConverter (org.apache.kafka.connect.json.JSONConverter).

Unfortunately, I get an error from JSONConverter:

org.apache.kafka.connect.errors.DataException: Unknown schema type: object

I viewed the JSONConverter code and the error occurs because the schema "type" returned from the schema registry is "object" (see below) but JSONConverter does not recognize that type.

Questions:

  1. Is the retrieved schema usable for JSONConverter? If yes, am I using this incorrectly?
  2. Is JSONConverter expecting a different format? If yes, does someone know what the format JSONConverter is expecting?
  3. Is there a different method of concerting the schema registry response into a "Schema"?

Here are the relevant artifacts:

schema registry response (when querying for a particular schema):

[{"subject":"test-schema","version":1,"id":1,"schemaType":"JSON","schema":"{\"title\":\"test-schema\",\"type\":\"object\",\"required\":[\"id\"],\"additionalProperties\":false,\"properties\":{\"id\":{\"type\":\"integer\"}}}"}]

When the text above is cleaned up a bit, the relevant schema component ("schema") is shown below:

{
  "title":"test-schema",
  "type":"object",
  "required":["id"],
  "additionalProperties":false,
  "properties":{"id":{"type":"integer"}}
}

Solution

  • org.apache.kafka.connect.json.JSONConverter doesn't actually use "JSONSchema" specification. It has its own (not well documented) format. It also doesn't integrate at all with the Schema Registry.

    An object is struct type. - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas

    If you intend on using actual JSONSchema (and the registry), you need to use the Converter from Confluent - io.confluent.connect.json.JsonSchemaConverter

    Is there a different method of concerting the schema registry response into a "Schema"

    If you use the Schema Registry Java Client, then yes, use the getSchemaById method, then the schemaType() and rawSchema() method of that response should get you close to what you want. With that, you would pass it to some JSONSchema library (e.g. org.everit.json.schema, which is used by the registry)