Search code examples
spring-bootapache-kafkamicroserviceskafka-consumer-apispring-kafka

Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to double


I am using annotation based spring kafka listener to consume the kafka messages, and code is as below

  1. Consuming Employee Object
Class Employee{
private String name;
private String address;
private Object account;
//getters
//setters
}
  1. Account object decides on runtime whether it's Saving Account or Current Account etc.
Class SavingAcc{
private BigDecimal balance;

}
Class CurrentAcc{
private BigDecimal balance;
private BigDecimal limit;
}

  1. Saving & Current Account having BigDecimal Fields to store balance.
  2. Hence while sending Employee object from Kafka producer, all the fields are correctly mapped and appears in correct format of BigDecimal, etc.
  3. But while consuming the Employee object in another service, account object is appearing as LinkedHashMap and BigDecimal fields are converted to Double. which is causing issues.
  4. As per my understanding, the main reason can be as a) Declaration of account as Object type instead of specific type b) Or the deserializer should be provided more specifically. [I have already give Employee.class as type to kafka receiver deserializer, so Employee fields are correctly mapped but account fields wrong].
@Bean
public ConsumerFactory<String, Employee> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Employee.class));
}

Need help on how to map or how to get the account fields properly deserialize.


Solution

  • Use Generics and a custom JavaType method.

    Class Employee<T> {
    private String name;
    private String address;
    private T account;
    //getters
    //setters
    }
    
    JavaType withCurrent = TypeFactory.defaultInstance().constructParametricType(Employee.class, CurrentAcc.class);
    
    JavaType withSaving = TypeFactory.defaultInstance().constructParametricType(Employee.class, SavingAcc.class);
    
    public static JavaType determineType(String topic, byte[] data, Headers headers) {
        // If it's a current account
            return withCurrent;
        // else 
            return withSaving;
    }
    

    If you construct the deserializer yourself use

    deser.setTypeResolver(MyClass::determineType);
    

    When configuring with properties.

    spring.json.value.type.method=com.mycompany.MyCass.determineType
    

    You have to inspect the data or headers (or topic) to determine which type you want.

    EDIT

    Here is a complete example. In this case, I pass a type hint in the Account object, but an alternative would be to set a header on the producer side.

    @SpringBootApplication
    public class JacksonApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(JacksonApplication.class, args);
        }
    
        @Data
        public static class Employee<T extends Account> {
            private String name;
            private T account;
        }
    
        @Data
        public static abstract class Account {
            private final String type;
            protected Account(String type) {
                this.type = type;
            }
        }
    
        @Data
        public static class CurrentAccount extends Account {
            private BigDecimal balance;
            private BigDecimal limit;
            public CurrentAccount() {
                super("C");
            }
        }
    
        @Data
        public static class SavingAccount extends Account {
            private BigDecimal balance;
            public SavingAccount() {
                super("S");
            }
        }
    
        @KafkaListener(id = "empListener", topics = "employees")
        public void listen(Employee<Account> e) {
            System.out.println(e);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("employees").partitions(1).replicas(1).build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
            return args -> {
                Employee<CurrentAccount> emp1 = new Employee<>();
                emp1.setName("someOneWithACurrentAccount");
                CurrentAccount currentAccount = new CurrentAccount();
                currentAccount.setBalance(BigDecimal.ONE);
                currentAccount.setLimit(BigDecimal.TEN);
                emp1.setAccount(currentAccount);
                template.send("employees", emp1);
                Employee<SavingAccount> emp2 = new Employee<>();
                emp2.setName("someOneWithASavingAccount");
                SavingAccount savingAccount = new SavingAccount();
                savingAccount.setBalance(BigDecimal.ONE);
                emp2.setAccount(savingAccount);
                template.send("employees", emp2);
            };
        }
    
        private static final JavaType withCurrent = TypeFactory.defaultInstance()
                .constructParametricType(Employee.class, CurrentAccount.class);
    
        private static final JavaType withSaving = TypeFactory.defaultInstance()
                .constructParametricType(Employee.class, SavingAccount.class);
    
        public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
            if (JsonPath.read(new ByteArrayInputStream(data), "$.account.type").equals("C")) {
                return withCurrent;
            }
            else {
                return withSaving;
            }
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.JacksonApplication.determineType
    

    Result

    JacksonApplication.Employee(name=someOneWithACurrentAccount, account=JacksonApplication.CurrentAccount(balance=1, limit=10))
    JacksonApplication.Employee(name=someOneWithASavingAccount, account=JacksonApplication.SavingAccount(balance=1))
    

    POM

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.5.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>jackson</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>11</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </dependency>
            <dependency>
                <groupId>com.jayway.jsonpath</groupId>
                <artifactId>json-path</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    EDIT2

    And here is an example that conveys the type hint in a header instead...

    @SpringBootApplication
    public class JacksonApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(JacksonApplication.class, args);
        }
    
        @Data
        public static class Employee<T extends Account> {
            private String name;
            private T account;
        }
    
        @Data
        public static abstract class Account {
        }
    
        @Data
        public static class CurrentAccount extends Account {
            private BigDecimal balance;
            private BigDecimal limit;
        }
    
        @Data
        public static class SavingAccount extends Account {
            private BigDecimal balance;
        }
    
        @KafkaListener(id = "empListener", topics = "employees")
        public void listen(Employee<Account> e) {
            System.out.println(e);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("employees").partitions(1).replicas(1).build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
            return args -> {
                Employee<CurrentAccount> emp1 = new Employee<>();
                emp1.setName("someOneWithACurrentAccount");
                CurrentAccount currentAccount = new CurrentAccount();
                currentAccount.setBalance(BigDecimal.ONE);
                currentAccount.setLimit(BigDecimal.TEN);
                emp1.setAccount(currentAccount);
                template.send("employees", emp1);
                Employee<SavingAccount> emp2 = new Employee<>();
                emp2.setName("someOneWithASavingAccount");
                SavingAccount savingAccount = new SavingAccount();
                savingAccount.setBalance(BigDecimal.ONE);
                emp2.setAccount(savingAccount);
                template.send("employees", emp2);
            };
        }
    
        private static final JavaType withCurrent = TypeFactory.defaultInstance()
                .constructParametricType(Employee.class, CurrentAccount.class);
    
        private static final JavaType withSaving = TypeFactory.defaultInstance()
                .constructParametricType(Employee.class, SavingAccount.class);
    
        public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
            if (headers.lastHeader("accountType").value()[0] == 'C') {
                return withCurrent;
            }
            else {
                return withSaving;
            }
        }
    
        public static class MySerializer extends JsonSerializer<Employee<?>> {
    
            @Override
            public byte[] serialize(String topic, Headers headers, Employee<?> emp) {
                headers.add(new RecordHeader("accountType",
                        new byte[] { (byte) (emp.getAccount() instanceof CurrentAccount ? 'C' : 'S')}));
                return super.serialize(topic, headers, emp);
            }
    
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.producer.value-serializer=com.example.demo2.JacksonApplication.MySerializer
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo2.JacksonApplication.determineType