we have a KafkaAvroSerde
configured with multiple avroregistry url. At some point, the serde got a timeout while trying to register a schema on 1 url, but since it threw an IO exception up to the stream app, the stream thread closed. From a kafka stream app perspective, this kinds of defies the purpose of having the ability to support multiple urls when creating the avro serdes, since the runtime exception bubbling up the DSL api stack will close the Stream Thread.
couple of questions:
avroRegistry
urls? rocksDB
store, is there an added>
Exception in thread "mediafirst-npvr-adapter-program-mapping-mtrl02nsbe02.pf.spop.ca-f5e097bd-ff1b-42da-9f7d-2ab9fa5d2b70-GlobalStreamThread" org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"ProgramMapp
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002; error code: 50002
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:299)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:68)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:199)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:121)
at com.bell.cts.commons.kafka.store.custom.CustomStoreProcessor.process(CustomStoreProcessor.java:37)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl.forward(GlobalProcessorContextImpl.java:52)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:87)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:282)
From a kafka stream app perspective, this kinds of defies the purpose of having the ability to support multiple urls when creating the avro serdes, since the runtime exception bubbling up the DSL api stack will close the Stream Thread.
I disagree here: from a Kafka Streams perspective, serialization failed and thus the application does need to shut down. Note that Kafka Streams is agnostic to the Serdes you are using, and thus, does not know that your Serde is talking to a schema registry and that it could retry.
Thus, the Serde is responsible to handle retrying internally. I am not aware of a wrapper that does this, but it should not be too hard to build yourself. I'll create an internal ticket to track this feature request. I think it makes a lot of sense to add this for the out-of-the-box experience.
For RocksDB: all records that are written into RocksDB are also written into a changelog topic. Thus, to allow Kafka Streams to read this data to recover state after an error, you need to register the schemas.