Search code examples
javaapache-kafkaapache-kafka-streamsavro

org.apache.kafka.streams.errors.StreamsException: A serializer (key: BytesSerializer / value: BytesSerializer) is not compatible to the actual key


I've below three classes generated from avro schema -

public class User extends SpecificRecordBase implements SpecificRecord {
    public UserAttrs userAttrs;
    public UserKey userKey;
}

public class UserAttrs extends SpecificRecordBase implements SpecificRecord {
    public Long clicks;
    public Long purchases;
    public Long views;
}

public class UserKey extends SpecificRecordBase implements SpecificRecord {
    public String id;
    public String name;
}

I've below kafka streams code. Topology and running the code are in unit test class itself.

import com.imds.streaming.rtb.serde.avro.AvroSerdes;
import com.myorg.User;
import com.myorg.UserAttrs;
import com.myorg.UserKey;
import com.myorg.UserOutput;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import static java.util.Optional.ofNullable;

public class KafkaStreamsAggregateToStreamTest {

    private static TopologyTestDriver testDriver;
    private TestInputTopic<String, User> inputTopic;
    private TestOutputTopic<UserKey, UserOutput> outputTopic;

    @BeforeEach
    public void setup() {
        Topology topology = getUserOutputTopology();
        Properties props = new Properties();
        props.setProperty("application.id", "dev-test-streaming-rtb-fact");
        props.setProperty("default.key.serde", Serdes.Bytes().getClass().getName());
        props.setProperty("default.value.serde", Serdes.Bytes().getClass().getName());
        props.setProperty("auto.offset.reset", "latest");
        testDriver = new TopologyTestDriver(topology, props);
        inputTopic = testDriver.createInputTopic("impression-avro-stream", Serdes.String().serializer(), AvroSerdes.get(User.class).serializer());
        outputTopic = testDriver.createOutputTopic("aggregated-rtb-facts-topic", AvroSerdes.get(UserKey.class).deserializer(), AvroSerdes.get(UserOutput.class).deserializer());
    }


    public static class UserAggregator implements Aggregator<UserKey, User, UserOutput>  {

        @Override
        public UserOutput apply(UserKey userKey, User user, UserOutput userOutput) {
            if(userOutput == null) {
                userOutput = new UserOutput();
                userOutput.id = userKey.id;
                userOutput.name = userKey.name;
                userOutput.totalClicks = user.userAttrs.clicks;
                userOutput.totalPurchases = user.userAttrs.purchases;
                userOutput.totalViews = user.userAttrs.views;
            } else {
                userOutput.totalClicks = ofNullable(userOutput.totalClicks).orElse(0L) + ofNullable(user.userAttrs.clicks).orElse(0L);
                userOutput.totalPurchases = ofNullable(userOutput.totalPurchases).orElse(0L) + ofNullable(user.userAttrs.purchases).orElse(0L);
                userOutput.totalViews = ofNullable(userOutput.totalViews).orElse(0L) + ofNullable(user.userAttrs.views).orElse(0L);
            }
            return userOutput;
        }
    }

    public static Topology getUserOutputTopology() {
                StreamsBuilder builder = new StreamsBuilder();
        Serde<User> valueSerde = AvroSerdes.get(User.class);
        KStream<String, User> impStream = builder.stream("impression-avro-stream", Consumed.with(new Serdes.StringSerde(), valueSerde));
        KGroupedStream<UserKey, User> groupedStream = impStream.groupBy( (k, v) -> v.getUserKey(), Grouped.with( AvroSerdes.get(UserKey.class), AvroSerdes.get(User.class)));

        KStream<UserKey, UserOutput> result = groupedStream
           .windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
           .aggregate(() -> new UserOutput(),  new UserOutputAggregator2(), Materialized.with(AvroSerdes.get(UserKey.class), AvroSerdes.get(UserOutput.class)))
           .toStream().map((k,v) -> KeyValue.pair(k.key(), v) );

        Materialized m = Materialized.<UserKey, UserOutput, KeyValueStore<UserKey, UserOutput>>as("result")
                .withKeySerde(AvroSerdes.get(UserKey.class))
                .withValueSerde(AvroSerdes.get(UserOutput.class));
        KTable<UserKey, UserOutput> result2 = result.groupByKey().reduce( new UserOutputReducer(), m);
        return builder.build();
    }


    @Test
    public void testRtbFacts() throws InterruptedException {
        UserKey k1 = UserKey.newBuilder().setId("123").setName("raj").build();
        User u1 = User.newBuilder().setUserKey(k1).setUserAttrs(UserAttrs.newBuilder().setClicks(5L).setViews(4L).setPurchases(1L).build()).build();
        User u2 = User.newBuilder().setUserKey(k1).setUserAttrs(UserAttrs.newBuilder().setClicks(10L).setViews(5L).setPurchases(2L).build()).build();
        inputTopic.pipeInput(k1.id, u1);
        Thread.sleep(3 * 1000 * 60);

    }

    @AfterEach
    public void teardown() {
        testDriver.close();
    }
}

I'm getting below error -

org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic dev-test-streaming-rtb-fact-result-repartition. A serializer (key: org.apache.kafka.common.serialization.BytesSerializer / value: org.apache.kafka.common.serialization.BytesSerializer) is not compatible to the actual key or value type (key type: com.myorg.UserKey / value type: com.myorg.UserOutput). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
> 
>   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
>   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
>   at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>   at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
>   at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>   at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48)
>   at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>   at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
>   at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>   at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
>   at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:197)
>   at org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:130)
>   at org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:105)
>   at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>   at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>   at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:136)
>   at org.apache.kafka.streams.state.internals.CachingWindowStore.flushCache(CachingWindowStore.java:431)
>   at org.apache.kafka.streams.state.internals.WrappedStateStore.flushCache(WrappedStateStore.java:76)
>   at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flushCache(ProcessorStateManager.java:501)
>   at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:402)
>   at org.apache.kafka.streams.TopologyTestDriver.completeAllProcessableWork(TopologyTestDriver.java:601)
>   at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:556)
>   at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:845)
>   at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
>   at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:137)
>   at com.imds.streaming.rtb.KafkaStreamsAggregateToStreamTest.testRtbFacts(KafkaStreamsAggregateToStreamTest.java:128)
>   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>   at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>   at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>   at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>   at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>   at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>   at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>   at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>   at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>   at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
>   at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
>   at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
>   at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>   at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>   at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>   at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>   at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>   at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>   at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>   at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>   at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>   at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
>   at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>   at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
>   at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>   at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>   at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>   at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>   at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>   at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
>   at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
>   at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
>   at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
>   at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
>   at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
>   at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
>   at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
>   at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: java.lang.ClassCastException: class com.myorg.UserKey cannot be cast to class org.apache.kafka.common.utils.Bytes (com.myorg.UserKey and org.apache.kafka.common.utils.Bytes are in unnamed module of loader 'app')
>   at org.apache.kafka.common.serialization.BytesSerializer.serialize(BytesSerializer.java:21)
>   at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
>   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
>   ... 106 more

The below code is giving the above error -

 KTable<UserKey, UserOutput> result2 = result.groupByKey().reduce( new UserOutputReducer(), m);

I don't know how to provide the serializer and deserializer in the aggregation. Any idea how to resolve this error.


Solution

  • The error is here

    .toStream().map((k,v) -> KeyValue.pair(k.key(), v) );
    

    You need to add Produced.with as a map argument to use the respective Avro types that you're mapping into, otherwise it defaults to bytes, as you set in the properties.

    Also, I'd recommend sticking with string keys for the data (what you start consuming as), even if that means you concatenate the UserKey id + name fields. This will make it easier to search for the data in the state store later.