Search code examples
apache-kafkaspring-kafkaavrospring-cloud-stream

Unable to Serialize Spring Cloud Streams with Avro (Custom Serdes)


I'm developing a stream processing application which joins two streams and outputs a new record to a different topic. Below is my configuration

 @Bean
public BiFunction<KStream<String, TestRecord>, KStream<String, TestRecord>, KStream<String, ModifiedTestPayload>> process() {
        return (walletCreated, placementCreated) ->
                placementCreated
                        .selectKey((key, val) -> val.getClientId())
                        .join(walletCreated.selectKey((s, testPayload) -> testPayload.getClientId()), (ts, ts1) -> new ModifiedTestPayload(ts.getClientId()),
                                JoinWindows.ofTimeDifferenceWithNoGrace(Duration.of(2, ChronoUnit.MINUTES)),
                                StreamJoined.with(Serdes.String(), CustomSerdes.TestRecord(), CustomSerdes.TestRecord()));
    }

Both topics will input streams to produces data in the same structure. Below are the Custom Serdes that i have defined.

public class CustomSerdes {

    public static Serde<TestRecord> TestRecord() {
        return new TestRecordSerde();
    }

    public static class TestRecordSerde extends Serdes.WrapperSerde<TestRecord> {
        public TestRecordSerde() {
            super(new TestRecordSerializer(), new TestRecordDeserializer());
        }
    }

    public static class TestRecordSerializer implements Serializer<TestRecord> {
        private final KafkaAvroSerializer inner;

        public TestRecordSerializer() {
            this.inner = new KafkaAvroSerializer();
        }

        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            inner.configure(configs, isKey);
        }

        @Override
        public byte[] serialize(String topic, TestRecord data) {
            return inner.serialize(topic, data);
        }

        @Override
        public void close() {
            inner.close();
        }
    }

    public static class TestRecordDeserializer implements Deserializer<TestRecord> {
        private final KafkaAvroDeserializer inner;

        public TestRecordDeserializer() {
            this.inner = new KafkaAvroDeserializer();
        }

        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            inner.configure(configs, isKey);
        }

        @Override
        public TestRecord deserialize(String topic, byte[] data) {
            return (TestRecord) inner.deserialize(topic, data);
        }

        @Override
        public void close() {
            inner.close();
        }
    }




    public static Serde<ModifiedTestPayload> ModifiedTestPayload() {
        return new ModifiedTestPayloadSerde();
    }

    public static class ModifiedTestPayloadSerde extends Serdes.WrapperSerde<ModifiedTestPayload> {
        public ModifiedTestPayloadSerde() {
            super(new ModifiedTestPayloadSerializer(), new ModifiedTestPayloadDeserializer());
        }
    }

    public static class ModifiedTestPayloadSerializer implements Serializer<ModifiedTestPayload> {
        private final KafkaAvroSerializer inner;

        public ModifiedTestPayloadSerializer() {
            this.inner = new KafkaAvroSerializer();
        }

        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            inner.configure(configs, isKey);
        }

        @Override
        public byte[] serialize(String topic, ModifiedTestPayload data) {
            return inner.serialize(topic, data);
        }

        @Override
        public void close() {
            inner.close();
        }
    }

    public static class ModifiedTestPayloadDeserializer implements Deserializer<ModifiedTestPayload> {

        private final KafkaAvroDeserializer inner;

        public ModifiedTestPayloadDeserializer() {
            this.inner = new KafkaAvroDeserializer();
        }


        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            inner.configure(configs, isKey);
        }

        @Override
        public ModifiedTestPayload deserialize(String topic, byte[] data) {
            return (ModifiedTestPayload) inner.deserialize(topic, data);
        }

        @Override
        public void close() {
            inner.close();
        }
    }
}

and my application.yml below

spring:
  kafka:
    streams:
      application-id: test-app
  cloud:
    function:
      definition: process
    stream:
      kafka:
        streams:
          bindings:
            process-in-0:
              consumer:
                key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
            process-in-1:
              consumer:
                key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
            process-out-0:
              producer:
                key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value-serde: ai.wownettests.kf.kafkastreamtestone.CustomSerdes$ModifiedTestPayloadSerde
                configuration:
                  schema.registry.url: http://localhost:8081
          binder:
            brokers: localhost:9092
            configuration:
              schema.registry.url: http://localhost:8081
              specific.avro.reader: true
              default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      bindings:
        process-in-0:
          destination: testkf.placements.created
        process-in-1:
          destination: testkf.wallet.created
        process-out-0:
          destination: testkf.client.profile_completed
logging:
  level:
    root: info

And I'm getting an error like below when I start up my application (I have some data on the kafka topics which this app tries to process)

org.apache.kafka.streams.errors.StreamsException: Unable to serialize record. ProducerRecord(topic=[test-app-KSTREAM-KEY-SELECT-0000000003-repartition], partition=[null], timestamp=[1706526131230]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:231) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) ~[kafka-streams-3.6.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) ~[kafka-streams-3.6.1.jar:na]
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: You must configure() before serialize() or use serializer constructor with SchemaRegistryClient


Solution

  • This was fixed by creating a bean of the custom serde

    @Bean
    public Serde<TestRecord> testRecordSerde() {
         Map<String, ?> config = Collections.singletonMap("schema.registry.url", "http://localhost:8081");
         var r = CustomSerdes.TestRecord();
         r.configure(config, false);
         return r;
    }