i am implementing kafka stream project using Processor API and Kafka StreamDSL. My process function in processor is
@Override
public void process(final String key, final T event) {
keyValueStore.put(key, event);
}
My Topology is
protected Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(stateStoreName);
StoreBuilder<KeyValueStore<String, T>> storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.serdeFrom(
new EventSerializer(streamProperties()),
new EventDeserializer(streamProperties())));
builder.addStateStore(storeBuilder);
final KStream<String, T> stream = builder.stream(inputTopic);
stream.process(() -> new Processor<>(stateStoreName), stateStoreName);
stream.to(outputTopic);
return builder.build();
}
And Lastly this is my custom EventSerializer class:
public class EventSerializer<T extends SpecificRecordBase & SpecificRecord>
implements Serializer<T> {
private final KafkaAvroSerializer inner;
public EventSerializer(Map<String, ?> properties) {
inner = new KafkaAvroSerializer();
configure(properties, false);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(EventSerdeConfig.withProducerConfig(configs), isKey);
}
@Override
public byte[] serialize(final String topic, final T record) {
return inner.serialize(topic, record);
}
}
When processor puts event into keyValueStore, I hit en error io.confluent.rest.exceptions.RestNotFoundException: Subject not found. After debugging for a while, i realized that it's because of the serializer has trouble when serializing events. The topic in the function public byte[] serialize(final String topic, final T record)
is application id-store-changelog
. It's kafka inner behavior even though i don't know why. Serialzer can't find schema for this combined topic, thus throwing an error.
Do i need to register schema for this combined topic or is there any way to pass the real consumer topic into serialzer who already has schema registered?
When you have new KafkaAvroSerializer();
, it defaults to point at localhost:8081
for the Schema Registry.
You don't need to register (although you can), since the Producer does it as part of the serialization logic with inner.serialize
Note: extending KafkaAvroSerializer
might make more sense