Search code examples
spring-bootapache-kafkaspring-cloud-streamembedded-kafka

Spring Cloud Stream deserializing invalid JSON from Kafka Topic


I'm working to integrate Spring Cloud Streams with Kafka binder. Aim is my app consumes json from the topic and deserialize it to the Java object. I am using the functional style approach instead of imperative. My code is working with well-structured json inputs.

On the other hand, when I send the invalid json, I want the error logging method to be triggered. This works in some test cases and does not work in another. My application deserializes json even if it is invalid and triggers the method which contains logic, not the error logging one.

I could not solve the problem why the framework deserialize some unstructured json input.

@Builder
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class KafkaEventRecord {

    @JsonProperty(value = "transport_metadata", required = true)
    @NonNull
    private TransportMetadata transportMetadata;

    @JsonProperty(value = "payload", required = true)
    @NonNull
    private Payload payload;
}


@Component
public class TokenEventConsumer {

    @Bean
    Consumer<KafkaEventRecord> consumer() {
        return event -> {
            log.info("Kafka Event data consumed from Kafka {}", event);
        };
    }
}

@Configuration
@Slf4j
public class CloudStreamErrorHandler {

    @ServiceActivator(inputChannel = "errorChannel")
    public void handleError(ErrorMessage errorMessage) {
            log.error("Error Message is {}", errorMessage);
    }
}

@EmbeddedKafka(topics = {"batch-in"}, partitions = 3)
@TestPropertySource(properties = {"spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"})
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles("test")
@Slf4j
public class KafkaTokenConsumerTest {

    private static String TOPIC = "batch-in";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private KafkaListenerEndpointRegistry endpointRegistry;

    @Autowired
    private ObjectMapper objectMapper;

    @SpyBean
    KafkaEventHandlerFactory kafkaEventHandlerFactory;

    @SpyBean
    CloudStreamErrorHandler cloudStreamErrorHandler;


    @BeforeEach
    void setUp() {
        for (MessageListenerContainer messageListenerContainer : endpointRegistry.getListenerContainers()) {
            ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
        }
    }

    // THIS METHOD PASSES
    @Test
    public void rejectCorruptedMessage() throws ExecutionException, InterruptedException {

        kafkaTemplate.send(TOPIC, "{{{{").get(); // synchronous call

        CountDownLatch latch = new CountDownLatch(1);
        latch.await(5L, TimeUnit.SECONDS);

        // The frame works tries two times, no idea why
        verify(cloudStreamErrorHandler, times(2)).handleError(isA(ErrorMessage.class));

    }

    // THIS METHOD FAILS
    @Test
    public void rejectCorruptedMessage2() throws ExecutionException, InterruptedException {

        kafkaTemplate.send(TOPIC, "{}}}").get(); // synchronous call

        CountDownLatch latch = new CountDownLatch(1);
        latch.await(5L, TimeUnit.SECONDS);

        // The frame works tries two times, no idea why
        verify(cloudStreamErrorHandler, times(2)).handleError(isA(ErrorMessage.class));

    }
}
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

// Producer only for testing purpose
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

json in the rejectCorruptedMessage test method, triggers handleError(ErrorMessage errorMessage) method, which is expected because it is invalid json. On the other hand, json in the rejectCorruptedMessage2 test method, triggers Consumer<KafkaEventRecord> consumer() method in the TokenEventConsumer class, which is not expected behaviour, however, I get the KafkaEventRecord object with null values.


Solution

  • Jackson does not consider that to be invalid JSON, it just ignores the trailing }} and decodes the {} as an empty object.

    public class So67804599Application {
    
        public static void main(String[] args) throws Exception {
            ObjectMapper mapper = new ObjectMapper();
            JavaType type = mapper.constructType(Foo.class);
            Object foo = mapper.readerFor(Foo.class).readValue("{\"bar\":\"baz\"}");
            System.out.println(foo);
            foo = mapper.readerFor(Foo.class).readValue("{}}}");
            System.out.println(foo);
        }
    
        public static class Foo {
    
            String bar;
    
            public String getBar() {
                return this.bar;
            }
    
            public void setBar(String bar) {
                this.bar = bar;
            }
    
            @Override
            public String toString() {
                return "Foo [bar=" + this.bar + "]";
            }
    
        }
    
    }
    
    Foo [bar=baz]
    Foo [bar=null]