Search code examples
apache-kafkaavroapache-kafka-streamsconfluent-schema-registryrocksdb

KafkaAvroSerializer with multiple avro registry urls


we have a KafkaAvroSerde configured with multiple avroregistry url. At some point, the serde got a timeout while trying to register a schema on 1 url, but since it threw an IO exception up to the stream app, the stream thread closed. From a kafka stream app perspective, this kinds of defies the purpose of having the ability to support multiple urls when creating the avro serdes, since the runtime exception bubbling up the DSL api stack will close the Stream Thread. couple of questions:

  1. Is there a good way to handle this?
  2. Do we need to enforce a retry in the app logic (which can be tricky when you simply materialize a topic into a store)?
  3. Otherwise is there an avroserde wrapper that
    could retry with the actual configure avroRegistry urls?
  4. When materializing into a local rocksDB store, is there an added
    value to register the schema in the registry or should we configure auto.register.schemas to false?

>

Exception in thread "mediafirst-npvr-adapter-program-mapping-mtrl02nsbe02.pf.spop.ca-f5e097bd-ff1b-42da-9f7d-2ab9fa5d2b70-GlobalStreamThread" org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"ProgramMapp
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002; error code: 50002
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:299)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:68)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:199)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:121)
at com.bell.cts.commons.kafka.store.custom.CustomStoreProcessor.process(CustomStoreProcessor.java:37)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl.forward(GlobalProcessorContextImpl.java:52)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:87)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:282)

Solution

  • From a kafka stream app perspective, this kinds of defies the purpose of having the ability to support multiple urls when creating the avro serdes, since the runtime exception bubbling up the DSL api stack will close the Stream Thread.

    I disagree here: from a Kafka Streams perspective, serialization failed and thus the application does need to shut down. Note that Kafka Streams is agnostic to the Serdes you are using, and thus, does not know that your Serde is talking to a schema registry and that it could retry.

    Thus, the Serde is responsible to handle retrying internally. I am not aware of a wrapper that does this, but it should not be too hard to build yourself. I'll create an internal ticket to track this feature request. I think it makes a lot of sense to add this for the out-of-the-box experience.

    For RocksDB: all records that are written into RocksDB are also written into a changelog topic. Thus, to allow Kafka Streams to read this data to recover state after an error, you need to register the schemas.