I'm developing a stream processing application which joins two streams and outputs a new record to a different topic. Below is my configuration
@Bean
public BiFunction<KStream<String, TestRecord>, KStream<String, TestRecord>, KStream<String, ModifiedTestPayload>> process() {
return (walletCreated, placementCreated) ->
placementCreated
.selectKey((key, val) -> val.getClientId())
.join(walletCreated.selectKey((s, testPayload) -> testPayload.getClientId()), (ts, ts1) -> new ModifiedTestPayload(ts.getClientId()),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.of(2, ChronoUnit.MINUTES)),
StreamJoined.with(Serdes.String(), CustomSerdes.TestRecord(), CustomSerdes.TestRecord()));
}
Both topics will input streams to produces data in the same structure. Below are the Custom Serdes that i have defined.
public class CustomSerdes {
public static Serde<TestRecord> TestRecord() {
return new TestRecordSerde();
}
public static class TestRecordSerde extends Serdes.WrapperSerde<TestRecord> {
public TestRecordSerde() {
super(new TestRecordSerializer(), new TestRecordDeserializer());
}
}
public static class TestRecordSerializer implements Serializer<TestRecord> {
private final KafkaAvroSerializer inner;
public TestRecordSerializer() {
this.inner = new KafkaAvroSerializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(configs, isKey);
}
@Override
public byte[] serialize(String topic, TestRecord data) {
return inner.serialize(topic, data);
}
@Override
public void close() {
inner.close();
}
}
public static class TestRecordDeserializer implements Deserializer<TestRecord> {
private final KafkaAvroDeserializer inner;
public TestRecordDeserializer() {
this.inner = new KafkaAvroDeserializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(configs, isKey);
}
@Override
public TestRecord deserialize(String topic, byte[] data) {
return (TestRecord) inner.deserialize(topic, data);
}
@Override
public void close() {
inner.close();
}
}
public static Serde<ModifiedTestPayload> ModifiedTestPayload() {
return new ModifiedTestPayloadSerde();
}
public static class ModifiedTestPayloadSerde extends Serdes.WrapperSerde<ModifiedTestPayload> {
public ModifiedTestPayloadSerde() {
super(new ModifiedTestPayloadSerializer(), new ModifiedTestPayloadDeserializer());
}
}
public static class ModifiedTestPayloadSerializer implements Serializer<ModifiedTestPayload> {
private final KafkaAvroSerializer inner;
public ModifiedTestPayloadSerializer() {
this.inner = new KafkaAvroSerializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(configs, isKey);
}
@Override
public byte[] serialize(String topic, ModifiedTestPayload data) {
return inner.serialize(topic, data);
}
@Override
public void close() {
inner.close();
}
}
public static class ModifiedTestPayloadDeserializer implements Deserializer<ModifiedTestPayload> {
private final KafkaAvroDeserializer inner;
public ModifiedTestPayloadDeserializer() {
this.inner = new KafkaAvroDeserializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(configs, isKey);
}
@Override
public ModifiedTestPayload deserialize(String topic, byte[] data) {
return (ModifiedTestPayload) inner.deserialize(topic, data);
}
@Override
public void close() {
inner.close();
}
}
}
and my application.yml below
spring:
kafka:
streams:
application-id: test-app
cloud:
function:
definition: process
stream:
kafka:
streams:
bindings:
process-in-0:
consumer:
key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
process-in-1:
consumer:
key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
process-out-0:
producer:
key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value-serde: ai.wownettests.kf.kafkastreamtestone.CustomSerdes$ModifiedTestPayloadSerde
configuration:
schema.registry.url: http://localhost:8081
binder:
brokers: localhost:9092
configuration:
schema.registry.url: http://localhost:8081
specific.avro.reader: true
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
process-in-0:
destination: testkf.placements.created
process-in-1:
destination: testkf.wallet.created
process-out-0:
destination: testkf.client.profile_completed
logging:
level:
root: info
And I'm getting an error like below when I start up my application (I have some data on the kafka topics which this app tries to process)
org.apache.kafka.streams.errors.StreamsException: Unable to serialize record. ProducerRecord(topic=[test-app-KSTREAM-KEY-SELECT-0000000003-repartition], partition=[null], timestamp=[1706526131230]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:231) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) ~[kafka-streams-3.6.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) ~[kafka-streams-3.6.1.jar:na]
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: You must configure() before serialize() or use serializer constructor with SchemaRegistryClient
This was fixed by creating a bean of the custom serde
@Bean
public Serde<TestRecord> testRecordSerde() {
Map<String, ?> config = Collections.singletonMap("schema.registry.url", "http://localhost:8081");
var r = CustomSerdes.TestRecord();
r.configure(config, false);
return r;
}