Search code examples
jacksonspring-cloudspring-kafkaspring-cloud-stream

How to send multiple types of classes into Spring stream source


I upgraded my spring stream from 1.3.0 to 2.1.2 and the default serializer was changed from Kyro (deprecated) into Jackson.

I have a kafka topic that more than one type of messages can be sent to. With Kyro I used to deserialize it into Object.class and then cast it to the relevant type of class.

With jackson I can't achieve this functionality, because I have to specify the type of class I want to deserialize to in advance, otherwise, it's been deserialized into a string.

I tried to find an example but couldn't find anything. Any ideas how can I achieve the same functionality? I want to make it as efficient as possible.


Solution

  • You can add hints to the Jackson encoding so it is decoded to the right concrete type:

    @JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")

    @SpringBootApplication
    @EnableBinding(Processor.class)
    public class So56753956Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So56753956Application.class, args);
        }
    
        @StreamListener(Processor.INPUT)
        public void listen(Foo foo) {
            System.out.println(foo);
        }
    
    
        @Bean
        public ApplicationRunner runner(MessageChannel output) {
            return args -> {
                output.send(new GenericMessage<>(new Bar("fiz")));
                output.send(new GenericMessage<>(new Baz("buz")));
            };
        }
    
        @JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
        public static abstract class Foo {
    
            private String bar;
    
            public Foo() {
                super();
            }
    
            public Foo(String bar) {
                this.bar = bar;
            }
    
            public String getBar() {
                return this.bar;
            }
    
            public void setBar(String bar) {
                this.bar = bar;
            }
    
            @Override
            public String toString() {
                return getClass().getName() + " [bar=" + this.bar + "]";
            }
    
        }
    
        public static class Bar extends Foo {
    
            public Bar() {
                super();
            }
    
            public Bar(String bar) {
                super(bar);
            }
    
        }
    
        public static class Baz extends Foo {
    
            public Baz() {
                super();
            }
    
            public Baz(String bar) {
                super(bar);
            }
    
        }
    
    }
    

    and

    com.example.So56753956Application$Bar [bar=fiz]
    com.example.So56753956Application$Baz [bar=buz]
    

    See here.