Search code examples
javaspringapache-kafkaapache-kafka-streamsspring-kafka

Why Kafka Streams don't work without @StreamListener?


@StreamListener are deprecated now, so I try to rewrite my consumer in functional way.

Java code

@Component
public class MyConsumer {
    public void handleMyMethod(MessageEntity message) {
        ...
    }
}


@RequiredArgsConstructor
@Configuration
public class MyConsumerConfiguration {
    private final MyConsumer myConsumer;

    @Bean
    public Consumer<MessageEntity> myMethod() {
        return myConsumer::handleMyMethod;
    }
}

application.yaml

spring.cloud :
    stream :
        kafka.binder.brokers :
            - "127.0.0.1:9092"
        function.definition : myMethod
        bindings :
            myMethod-in-0 :
                destination : service.method.update

Test

@Test
void handleMyMethod() {
    MessageEntity message = new MessageEntity();
    template.send("service.method.update", message);

    await()
        .atMost(6, TimeUnit.SECONDS)
        .untilAsserted(() -> {
            verify(myConsumer, atLeastOnce()).handleMyMethod(entityCaptor.capture());

            MessageEntity entity = entityCaptor.getValue();
            assertNotNull(entity);
        });
}

Run test. The message is coming to Kafka (I see it in Kafka Tool), but MyConsumer doesn't catch it. Test fails with the error:

myConsumer.handleMethod(
    <Capturing argument>
);
-> at com.my.project.MyConsumer.handleMethod(MyConsumer.java:33)
Actually, there were zero interactions with this mock.```


Solution

  • I found the problem. My util module, integrated in the module I described above, has its own "function.definition" property. And since the module above is configuring before the util module, the util module erases methods from the top-level module. I solved the problem in this way:

    @Autowired
    StreamFunctionProperties streamFunctionProperties;
    
    ...
    
    String definitionString = StringUtils.hasText(streamFunctionProperties.getDefinition()) ?
        streamFunctionProperties.getDefinition() + ";" :
        "";
    streamFunctionProperties.setDefinition(definitionString + "methodDeleteOut;methodUpdateOut;roleUpdateIn;roleDeleteIn");