Search code examples
javaapache-kafkakafka-consumer-apiavroconfluent-schema-registry

How to utilize existing avro schema for my kafka consumer?


I am using Debezium SQL Server Connector to do change data capture, and the connector automatically generates the schema and registers the schema in schema registry, which means I don't have the avro schema file. In this case, how can I write a consumer reading data with this schema? I have seen a lot of articles using the avro schema file to read the data for consumers, and there will only be one schema for this payload in the schema registry.

If I create an avro file locally and let my consumer use it, then I have to register a duplicate schema with a different name.

My question is how to write a Java consumer API using this schema registered by a kafka connector. Thank you so much.

Here is my value schema:

{"subject":"new.dbo.locations-value","version":1,"id":102,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"new.dbo.locations\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"display_id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"location_id\",\"type\":\"string\"},{\"name\":\"location_name\",\"type\":\"string\"},{\"name\":\"location_sub_type_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"location_time_zone\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"parent_organization_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"new.dbo.locations.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.sqlserver\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"change_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_serial_no\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.sqlserver.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"new.dbo.locations.Envelope\"}"}%

Solution

  • You don't need a local schema file. You can consume using KafkaConsumer<?, GenericRecord>, which will let the deserializer download and cache the respective ID+schema for each message.

    Downside of this approach is that you need to be careful about parsing the data (much like raw JSON)

    If you need a static schema and the compiled class which will allow for strict type-checking, then download it from the registry at /subjects/:name/versions/latest