Search code examples
javaapache-kafkaapache-kafka-streamsconfluent-schema-registry

Do i need to register schema for kafka stream changelog topic under schema registry?


i am implementing kafka stream project using Processor API and Kafka StreamDSL. My process function in processor is

@Override
public void process(final String key, final T event) {
  keyValueStore.put(key, event);
}

My Topology is

 protected Topology buildTopology() {
    final StreamsBuilder builder = new StreamsBuilder();

    KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(stateStoreName);
    StoreBuilder<KeyValueStore<String, T>> storeBuilder =
        Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.serdeFrom(
            new EventSerializer(streamProperties()),
            new EventDeserializer(streamProperties())));
    builder.addStateStore(storeBuilder);

    final KStream<String, T> stream = builder.stream(inputTopic);
    stream.process(() -> new Processor<>(stateStoreName), stateStoreName);
    stream.to(outputTopic);

    return builder.build();
  }

And Lastly this is my custom EventSerializer class:

public class EventSerializer<T extends SpecificRecordBase & SpecificRecord>
    implements Serializer<T> {
  private final KafkaAvroSerializer inner;

  public EventSerializer(Map<String, ?> properties) {
    inner = new KafkaAvroSerializer();
    configure(properties, false);
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    inner.configure(EventSerdeConfig.withProducerConfig(configs), isKey);
  }

  @Override
  public byte[] serialize(final String topic, final T record) {
    return inner.serialize(topic, record);
  }
}

When processor puts event into keyValueStore, I hit en error io.confluent.rest.exceptions.RestNotFoundException: Subject not found. After debugging for a while, i realized that it's because of the serializer has trouble when serializing events. The topic in the function public byte[] serialize(final String topic, final T record) is application id-store-changelog. It's kafka inner behavior even though i don't know why. Serialzer can't find schema for this combined topic, thus throwing an error. Do i need to register schema for this combined topic or is there any way to pass the real consumer topic into serialzer who already has schema registered?


Solution

  • When you have new KafkaAvroSerializer(); , it defaults to point at localhost:8081 for the Schema Registry.

    You don't need to register (although you can), since the Producer does it as part of the serialization logic with inner.serialize

    Note: extending KafkaAvroSerializer might make more sense